Dotflow

April 12, 2026 · View on GitHub

import dotflow. @action. deploy. Done.

PyPI Python Downloads Stars

Website · Documentation · PyPI


Dotflow

Dotflow is a lightweight Python library for execution pipelines. Define tasks with decorators, chain them together, and deploy to any cloud — with built-in retry, parallel execution, storage, observability, and cloud deployment.

Why Dotflow?

  • Simple@action decorator + workflow.start(). That's it.
  • Resilient — Retry, backoff, timeout, checkpoints, and error handling out of the box.
  • Observable — OpenTelemetry traces, metrics, and logs. Sentry error tracking.
  • Deployabledotflow deploy --platform lambda ships your pipeline to AWS in one command.
  • Portable — Same code runs on Lambda, ECS, Cloud Run, Alibaba FC, Kubernetes, Docker, or GitHub Actions.

Install

pip install dotflow

Quick Start

from dotflow import DotFlow, action

@action
def extract():
    return {"users": 150}

@action
def transform(previous_context):
    total = previous_context.storage["users"]
    return {"users": total, "active": int(total * 0.8)}

@action
def load(previous_context):
    print(f"Loaded {previous_context.storage['active']} active users")

workflow = DotFlow()
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)

workflow.start()

Deploy anywhere

Write your pipeline once. Deploy to any cloud with a single command.

dotflow init
dotflow deploy --platform lambda --project my_pipeline

Supported platforms

PlatformDeploy method
Dockerdocker compose up
AWS Lambdadotflow deploy
AWS Lambda + EventBridgedotflow deploy --schedule
AWS Lambda + S3 Triggerdotflow deploy
AWS Lambda + SQS Triggerdotflow deploy
AWS Lambda + API Gatewaydotflow deploy
AWS ECS Fargatedotflow deploy
AWS ECS + EventBridgedotflow deploy --schedule
Google Cloud Rundotflow deploy
Cloud Run + Schedulerdotflow deploy --schedule
Kuberneteskubectl apply
Alibaba Cloud FCdotflow deploy
Alibaba Cloud FC + Timerdotflow deploy --schedule
GitHub Actionsdotflow deploy

See all 34+ platforms →

Optional extras

pip install dotflow[aws]            # S3 storage
pip install dotflow[gcp]            # Google Cloud Storage
pip install dotflow[scheduler]      # Cron scheduler
pip install dotflow[otel]           # OpenTelemetry
pip install dotflow[sentry]         # Sentry error tracking
pip install dotflow[deploy-aws]      # AWS deploy (Lambda, ECS)
pip install dotflow[deploy-gcp]      # GCP deploy (Cloud Run)
pip install dotflow[deploy-alibaba]  # Alibaba Cloud deploy (FC)
pip install dotflow[deploy-github]   # GitHub Actions deploy

Read the full documentation →

Documentation

SectionDescription
ConceptsWorkflows, tasks, context, providers, process modes
How-to GuidesStep-by-step tutorials for workflows, tasks, and CLI
Cloud DeploymentDeploy to AWS, GCP, Alibaba, Kubernetes, Docker, GitHub Actions
IntegrationsOpenTelemetry, Sentry, Telegram, Discord, S3, GCS, Server
ExamplesReal-world pipelines: ETL, health checks, async, scheduler
ReferenceAPI reference for all classes and providers
Custom ProvidersBuild your own storage, notify, log, tracer, or metrics provider

Features

Observability

OpenTelemetry docs | Sentry docs | Tracer docs | Metrics docs

Built-in support for OpenTelemetry and Sentry:

from dotflow import Config
from dotflow.providers import LogOpenTelemetry, TracerOpenTelemetry, MetricsOpenTelemetry

# OpenTelemetry: traces, metrics, and structured logs
config = Config(
    log=LogOpenTelemetry(service_name="my-pipeline"),
    tracer=TracerOpenTelemetry(service_name="my-pipeline"),
    metrics=MetricsOpenTelemetry(service_name="my-pipeline"),
)
from dotflow.providers import LogSentry, TracerSentry

# Sentry: error tracking + performance monitoring
config = Config(
    log=LogSentry(dsn="https://xxx@sentry.io/123"),
    tracer=TracerSentry(),
)
Execution Modes

Process Mode docs

Dotflow supports 4 execution strategies out of the box:

Sequential (default)

Tasks run one after another. The context from each task flows to the next.

workflow.task.add(step=task_a)
workflow.task.add(step=task_b)

workflow.start()  # or mode="sequential"
flowchart LR
    A[task_a] --> B[task_b] --> C[Finish]

Background

Same as sequential, but runs in a background thread — non-blocking.

workflow.start(mode="background")

Parallel

Every task runs simultaneously in its own process.

workflow.task.add(step=task_a)
workflow.task.add(step=task_b)
workflow.task.add(step=task_c)

workflow.start(mode="parallel")
flowchart TD
    S[Start] --> A[task_a] & B[task_b] & C[task_c]
    A & B & C --> F[Finish]

Parallel Groups

Assign tasks to named groups. Groups run in parallel, but tasks within each group run sequentially.

workflow.task.add(step=fetch_users, group_name="users")
workflow.task.add(step=save_users, group_name="users")
workflow.task.add(step=fetch_orders, group_name="orders")
workflow.task.add(step=save_orders, group_name="orders")

workflow.start()
flowchart TD
    S[Start] --> G1[Group: users] & G2[Group: orders]
    G1 --> A[fetch_users] --> B[save_users]
    G2 --> C[fetch_orders] --> D[save_orders]
    B & D --> F[Finish]

Retry, Timeout & Backoff

Retry docs | Backoff docs | Timeout docs

The @action decorator supports built-in resilience options:

@action(retry=3, timeout=10, retry_delay=2, backoff=True)
def unreliable_api_call():
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()
    return response.json()
ParameterTypeDefaultDescription
retryint1Number of attempts before failing
timeoutint0Max seconds per attempt (0 = no limit)
retry_delayint1Seconds to wait between retries
backoffboolFalseExponential backoff (delay doubles each retry)

Context System

Context docs | Previous Context | Many Contexts

Tasks communicate through a context chain. Each task receives the previous task's output and can access its own initial context.

@action
def step_one():
    return "Hello"

@action
def step_two(previous_context, initial_context):
    greeting = previous_context.storage   # "Hello"
    name = initial_context.storage        # "World"
    return f"{greeting}, {name}!"

workflow = DotFlow()
workflow.task.add(step=step_one)
workflow.task.add(step=step_two, initial_context="World")
workflow.start()

Each Context object contains:

  • storage — the return value from the task
  • task_id — the task identifier
  • workflow_id — the workflow identifier
  • time — timestamp of execution

Checkpoint & Resume

Checkpoint docs

Resume a workflow from where it left off. Requires a persistent storage provider and a fixed workflow_id.

from dotflow import DotFlow, Config, action
from dotflow.providers import StorageFile

config = Config(storage=StorageFile())

workflow = DotFlow(config=config, workflow_id="my-pipeline-v1")
workflow.task.add(step=step_a)
workflow.task.add(step=step_b)
workflow.task.add(step=step_c)

# First run — executes all tasks and saves checkpoints
workflow.start()

# If step_c failed, fix and re-run — skips step_a and step_b
workflow.start(resume=True)

Storage Providers

Storage docs

Choose where task results are persisted:

In-Memory (default)

from dotflow import DotFlow

workflow = DotFlow()  # uses StorageDefault (in-memory)

File System

from dotflow import DotFlow, Config
from dotflow.providers import StorageFile

config = Config(storage=StorageFile(path=".output"))
workflow = DotFlow(config=config)

AWS S3

pip install dotflow[aws]
from dotflow import DotFlow, Config
from dotflow.providers import StorageS3

config = Config(storage=StorageS3(bucket="my-bucket", prefix="pipelines/", region="us-east-1"))
workflow = DotFlow(config=config)

Google Cloud Storage

pip install dotflow[gcp]
from dotflow import DotFlow, Config
from dotflow.providers import StorageGCS

config = Config(storage=StorageGCS(bucket="my-bucket", prefix="pipelines/", project="my-project"))
workflow = DotFlow(config=config)

Notifications

Telegram docs | Discord docs

Get notified about task status changes via Telegram or Discord.

from dotflow import Config
from dotflow.providers import NotifyTelegram

config = Config(notify=NotifyTelegram(
    token="YOUR_BOT_TOKEN",
    chat_id=123456789,
))
from dotflow.providers import NotifyDiscord

config = Config(notify=NotifyDiscord(
    webhook_url="https://discord.com/api/webhooks/...",
))

Class-Based Steps

Return a class instance from a task, and Dotflow will automatically discover and execute all @action-decorated methods in source order.

from dotflow import action

class ETLPipeline:
    @action
    def extract(self):
        return {"raw": [1, 2, 3]}

    @action
    def transform(self, previous_context):
        data = previous_context.storage["raw"]
        return {"processed": [x * 2 for x in data]}

    @action
    def load(self, previous_context):
        print(f"Loaded: {previous_context.storage['processed']}")

@action
def run_pipeline():
    return ETLPipeline()

workflow = DotFlow()
workflow.task.add(step=run_pipeline)
workflow.start()

Task Groups

Groups docs

Organize tasks into named groups for parallel group execution.

workflow.task.add(step=scrape_site_a, group_name="scraping")
workflow.task.add(step=scrape_site_b, group_name="scraping")
workflow.task.add(step=process_data, group_name="processing")
workflow.task.add(step=save_results, group_name="processing")

workflow.start()  # groups run in parallel, tasks within each group run sequentially

Callbacks

Task Callback docs | Workflow Callback docs

Execute a function after each task completes — useful for logging, alerting, or side effects.

def on_task_done(task):
    print(f"Task {task.task_id} finished with status: {task.status}")

workflow.task.add(step=my_step, callback=on_task_done)

Workflow-level callbacks for success and failure:

def on_success(*args, **kwargs):
    print("All tasks completed!")

def on_failure(*args, **kwargs):
    print("Something went wrong.")

workflow.start(on_success=on_success, on_failure=on_failure)

Error Handling

Error Handling docs | Keep Going docs

Control whether the workflow stops or continues when a task fails:

# Stop on first failure (default)
workflow.start(keep_going=False)

# Continue executing remaining tasks even if one fails
workflow.start(keep_going=True)

Each task tracks its errors with full detail:

  • Attempt number
  • Exception type and message
  • Traceback

Access results after execution:

for task in workflow.result_task():
    print(f"Task {task.task_id}: {task.status}")
    if task.errors:
        print(f"  Errors: {task.errors}")

Async Support

Async docs

@action automatically detects and handles async functions:

import httpx
from dotflow import DotFlow, action

@action(timeout=30)
async def fetch_data():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data")
        return response.json()

workflow = DotFlow()
workflow.task.add(step=fetch_data)
workflow.start()

Scheduler / Cron

Cron scheduler docs | Default scheduler | Cron overlap (concepts)

Schedule workflows to run automatically using cron expressions.

pip install dotflow[scheduler]
from dotflow import DotFlow, Config, action
from dotflow.providers import SchedulerCron

@action
def sync_data():
    return {"synced": True}

config = Config(scheduler=SchedulerCron(cron="*/5 * * * *"))

workflow = DotFlow(config=config)
workflow.task.add(step=sync_data)
workflow.schedule()

Overlap Strategies

Control what happens when a new execution triggers while the previous one is still running:

StrategyDescription
skipDrops the new run if the previous is still active (default)
queueBuffers one pending run, executes when the current finishes
parallelRuns up to 10 concurrent executions via semaphore
from dotflow.providers import SchedulerCron

# Queue overlapping executions
scheduler = SchedulerCron(cron="*/5 * * * *", overlap="queue")

# Allow parallel executions
scheduler = SchedulerCron(cron="*/5 * * * *", overlap="parallel")

The scheduler handles graceful shutdown via SIGINT/SIGTERM signals automatically.


CLI

CLI docs

Run workflows directly from the command line:

# Simple execution
dotflow start --step my_module.my_task

# With initial context
dotflow start --step my_module.my_task --initial-context '{"key": "value"}'

# With callback
dotflow start --step my_module.my_task --callback my_module.on_done

# With execution mode
dotflow start --step my_module.my_task --mode parallel

# With file storage
dotflow start --step my_module.my_task --storage file --path .output

# With S3 storage
dotflow start --step my_module.my_task --storage s3

# With GCS storage
dotflow start --step my_module.my_task --storage gcs

# Schedule with cron
dotflow schedule --step my_module.my_task --cron "*/5 * * * *"

# Schedule with overlap strategy
dotflow schedule --step my_module.my_task --cron "0 * * * *" --overlap queue

# Schedule with resume
dotflow schedule --step my_module.my_task --cron "0 */6 * * *" --storage file --resume

Available CLI commands:

CommandDescription
dotflow initScaffold a new project with cloud support
dotflow startRun a workflow
dotflow scheduleRun a workflow on a cron schedule
dotflow logsView execution logs
dotflow cloud listShow available cloud platforms
dotflow cloud generate --platform <name>Generate deployment files
dotflow deploy --platform <name> --project <name>Deploy to cloud

Server Provider

Server docs

Send workflow and task execution data to a remote API (e.g. dotflow-api) in real time.

from dotflow import DotFlow, Config, action
from dotflow.providers import ServerDefault

@action
def my_task():
    return {"result": "ok"}

config = Config(
    server=ServerDefault(
        base_url="http://localhost:8000/api/v1",
        user_token="your-api-token",
    )
)

workflow = DotFlow(config=config)
workflow.task.add(step=my_task)
workflow.start()
ParameterTypeDefaultDescription
base_urlstr""API base URL
user_tokenstr""API token (X-User-Token header)
timeoutfloat5.0HTTP request timeout in seconds

The server provider automatically:

  • Creates the workflow on DotFlow() init
  • Creates each task on task.add()
  • Updates task status on each transition (In progress, Completed, Failed, Retry)
  • Updates workflow status on completion (In progress → Completed)

Dependency Injection via Config

The Config class lets you swap providers for storage, notifications, logging, scheduling, and server:

from dotflow import DotFlow, Config
from dotflow.providers import StorageFile, NotifyTelegram, LogDefault, SchedulerCron, ServerDefault

config = Config(
    storage=StorageFile(path=".output"),
    notify=NotifyTelegram(token="...", chat_id=123),
    log=LogDefault(),
    scheduler=SchedulerCron(cron="0 * * * *"),
    server=ServerDefault(base_url="...", user_token="..."),
)

workflow = DotFlow(config=config)

Extend Dotflow by implementing the abstract base classes:

ABCMethodsPurpose
Storagepost, get, keyCustom storage backends
Notifyhook_status_taskCustom notification channels
Loginfo, error, warning, debugCustom logging
Schedulerstart, stopCustom scheduling strategies
Tracerstart_workflow, end_workflow, start_task, end_taskDistributed tracing
Metricsworkflow_started, workflow_completed, workflow_failed, task_completed, task_failed, task_retriedCounters and histograms
Servercreate_workflow, update_workflow, create_task, update_taskRemote API communication

Results & Inspection

After execution, inspect results directly from the workflow object:

workflow.start()

# List of Task objects
tasks = workflow.result_task()

# List of Context objects (one per task)
contexts = workflow.result_context()

# List of storage values (raw return values)
storages = workflow.result_storage()

# Serialized result (Pydantic model)
result = workflow.result()

Task builder utilities:

workflow.task.count()     # Number of tasks
workflow.task.clear()     # Remove all tasks
workflow.task.reverse()   # Reverse execution order
workflow.task.schema()    # Pydantic schema of the workflow

Dynamic Module Import

Reference tasks and callbacks by their module path string instead of importing them directly:

workflow.task.add(step="my_package.tasks.process_data")
workflow.task.add(step="my_package.tasks.save_results", callback="my_package.callbacks.notify")

More Examples

All examples are available in the docs_src/ directory.

Commit Style

IconTypeDescription
⚙️FEATURENew feature
📝PEP8Formatting fixes following PEP8
📌ISSUEReference to issue
🪲BUGBug fix
📘DOCSDocumentation changes
📦PyPIPyPI releases
❤️️TESTAutomated tests
⬆️CI/CDChanges in continuous integration/delivery
⚠️SECURITYSecurity improvements

License

GitHub License

This project is licensed under the terms of the Apache License 2.0.