Pipeline Module

April 13, 2026 · View on GitHub

Import: from selectools.pipeline import Pipeline, step

Stability: beta

from selectools.pipeline import Pipeline, step

@step
def clean(text: str) -> str:
    """Strip whitespace and normalize."""
    return text.strip().lower()

@step
def word_count(text: str) -> str:
    """Count words in the text."""
    count = len(text.split())
    return f"'{text}' contains {count} words."

# Compose steps with the | operator
pipeline = clean | word_count

result = pipeline.run("  Hello World from Selectools  ")
print(result.output)     # "'hello world from selectools' contains 4 words."
print(result.steps_run)  # 2

# Inspect the per-step trace
for entry in result.trace:
    print(f"  {entry['step']}: {entry['status']} ({entry['duration_ms']:.1f}ms)")

!!! tip "See Also" - Orchestration - AgentGraph for multi-agent graph workflows - Agent - The Agent class that powers individual steps


Added in: v0.18.0 (type-safe contracts, retry(), cache_step() added in v0.18.x) Package: src/selectools/pipeline.py Classes: Pipeline, Step, StepResult Functions: step(), parallel(), branch(), retry(), cache_step()

Table of Contents

  1. Overview
  2. Quick Start
  3. @step Decorator
  4. | Operator
  5. parallel()
  6. branch()
  7. retry()
  8. cache_step()
  9. Streaming
  10. Type-Safe Step Contracts
  11. Pipeline as AgentGraph Node
  12. Error Handling
  13. API Reference
  14. Examples

Overview

The pipeline module provides composable data pipelines built from plain Python functions. Steps are connected with the | operator and execute in sequence, with each step receiving the previous step's output.

The Anti-LCEL

This module exists because LangChain's LCEL (LangChain Expression Language) is the wrong abstraction. Pipelines should be plain functions, not magic Runnables with invisible state.

selectools PipelineLangChain LCEL
StepsPlain Python functionsRunnable subclasses
Compositionstep_a | step_b (thin sugar)chain = prompt | model | parser (deep magic)
Debuggingprint() works, breakpoints workCustom tracing required
Type checkingStandard type hints, validated at build timeNo static checking
DependenciesZerolangchain-core, plus per-component packages
TracingAuto-traced with duration and statusRequires LangSmith (paid)

Design Philosophy

  • Steps are plain functions. A @step-decorated function is still callable as f(x). The decorator adds |, retry, and tracing -- nothing else.
  • | is thin sugar. It creates a Pipeline that calls each function in order. No Pregel, no compilation, no runtime magic.
  • Every step auto-traces. Each step records its name, duration, and status. No external tracing service required.
  • Type contracts are opt-in. Annotate your functions with type hints and the pipeline validates adjacent step compatibility at build time.

Quick Start

from selectools import step, Pipeline, parallel, branch

@step
def summarize(text: str) -> str:
    return agent.run(f"Summarize: {text}").content

@step
def translate(text: str) -> str:
    return agent.run(f"Translate to Spanish: {text}").content

# Compose with |
pipeline = summarize | translate
result = pipeline.run("Long article about quantum computing...")

print(result.output)       # Spanish summary
print(result.steps_run)    # 2
print(result.trace)        # [{"step": "summarize", ...}, {"step": "translate", ...}]

@step Decorator

The @step decorator wraps a plain function as a composable Step. The wrapped function remains directly callable -- the decorator only adds composition (|), retry logic, and tracing.

Basic Usage

from selectools import step

@step
def clean(text: str) -> str:
    return text.strip().lower()

# Still callable as a normal function
result = clean("  Hello World  ")  # "hello world"

With Options

@step(name="custom_name", retry=3, on_error="skip")
def flaky_api_call(query: str) -> str:
    return external_api.search(query)

Parameters

ParameterTypeDefaultDescription
nameOptional[str]Function nameOverride the step name in traces.
retryint0Number of retry attempts on failure.
on_errorstr"raise"Error handling: "raise" or "skip".

Async Steps

Async functions work transparently:

@step
async def fetch_data(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.json()

Async steps are awaited during arun() and run via asyncio.run() during sync run().


| Operator {: #pipe-operator }

The pipe operator creates a Pipeline from two or more steps. Each step receives the previous step's output as its first argument.

pipeline = step_a | step_b | step_c

This is equivalent to:

pipeline = Pipeline(steps=[step_a, step_b, step_c])

Composing with Plain Functions

Undecorated callables are auto-wrapped as Step instances:

@step
def clean(text: str) -> str:
    return text.strip()

# str.upper is auto-wrapped
pipeline = clean | str.upper
result = pipeline.run("  hello  ")  # "HELLO"

Composing Pipelines

Pipelines can be composed with other pipelines or steps:

preprocess = clean | normalize
postprocess = format_output | validate

full = preprocess | translate | postprocess

parallel()

Run multiple steps concurrently on the same input. Returns a dict mapping step names to their results.

from selectools import parallel

@step
def search_web(query: str) -> str:
    return web_api.search(query)

@step
def search_docs(query: str) -> str:
    return doc_store.search(query)

@step
def search_db(query: str) -> str:
    return database.query(query)

# Fan out to all three, then merge
research = parallel(search_web, search_docs, search_db)
result = research("quantum computing")
# result == {"search_web": "...", "search_docs": "...", "search_db": "..."}

In a Pipeline

@step
def merge(results: dict) -> str:
    return "\n".join(results.values())

pipeline = parallel(search_web, search_docs) | merge | summarize
result = pipeline.run("quantum computing")

Async Execution

When any step in the group is async, parallel() uses asyncio.gather for true concurrent execution during arun(). In sync run(), steps execute sequentially.

Branch Isolation (v0.22.0 — BUG-30)

Each parallel branch receives its own deep copy of the input. Mutations in one branch do NOT affect sibling branches — even under asyncio.gather where branches interleave at await points. This prevents non-deterministic state corruption that previously occurred when any branch mutated its input (e.g., data["key"] = value).

Parameters

ParameterTypeDescription
*steps_or_fnsUnion[Step, Callable]Steps or callables to run in parallel.

Returns: Step -- a step whose output is Dict[str, Any] keyed by step names.


branch()

Route input to one of several named steps based on a classifier function.

from selectools import branch

@step
def classify(text: str) -> str:
    if "bug" in text.lower():
        return "technical"
    return "general"

@step
def technical_review(text: str) -> str:
    return agent.run(f"Technical review: {text}").content

@step
def general_response(text: str) -> str:
    return agent.run(f"Respond to: {text}").content

pipeline = classify | branch(
    technical=technical_review,
    general=general_response,
)
result = pipeline.run("There's a bug in the login page")

With Custom Router

pipeline = branch(
    router=lambda x: x["category"],
    technical=code_review,
    creative=copyedit,
    default=passthrough,
)

Routing Logic

  1. If router is provided, it is called with the input and must return a branch name (string).
  2. If no router, the input itself is used as the branch key (must be a str, or a dict with a "branch" key).
  3. If the key matches no branch, the default branch is used.
  4. If no default exists, a KeyError is raised.

Parameters

ParameterTypeDescription
routerOptional[Callable]Function that takes input and returns a branch name.
**named_stepsUnion[Step, Callable]Named branches. Key = branch name, value = step.

Returns: Step


retry()

Wrap a step with retry logic. A convenience wrapper that sets the retry count without modifying the original step.

from selectools import retry

@step
def flaky_call(text: str) -> str:
    return unreliable_api.process(text)

# Retry up to 3 times on failure (4 total attempts)
pipeline = preprocess | retry(flaky_call, attempts=3) | postprocess

Parameters

ParameterTypeDefaultDescription
step_or_fnUnion[Step, Callable](required)Step or callable to wrap.
attemptsint3Number of retry attempts.

Returns: Step with retry configured.


cache_step()

Wrap a step with LRU + TTL result caching. Same input produces the cached output without re-executing the function.

from selectools import cache_step

@step
def expensive_embedding(text: str) -> list:
    return embedding_model.embed(text)

# Cache results for 10 minutes, max 500 entries
pipeline = preprocess | cache_step(expensive_embedding, ttl=600, max_size=500) | classify

Cache Behavior

  • Key: String representation of the input value.
  • Eviction: LRU (oldest entries evicted when max_size is reached).
  • TTL: Entries expire after ttl seconds.
  • Scope: Cache is per-step instance (not shared across pipeline copies).

Parameters

ParameterTypeDefaultDescription
step_or_fnUnion[Step, Callable](required)Step or callable to wrap.
ttlint300Cache time-to-live in seconds.
max_sizeint1000Maximum cache entries before LRU eviction.

Returns: Step with caching configured.


Streaming

Stream the pipeline's final step output as it is produced. Earlier steps run to completion; only the last step streams.

pipeline = preprocess | summarize | translate

async for chunk in pipeline.astream("Long article..."):
    print(chunk, end="")

How It Works

  1. All steps except the last execute normally via arun().
  2. The final step's function is inspected:
    • Async generator: Chunks are yielded as produced.
    • Sync generator: Chunks are yielded as produced.
    • Regular function: The complete output is yielded as a single chunk.

Generator Step Example

@step
def stream_translate(text: str):
    """A generator step that yields chunks."""
    for sentence in text.split(". "):
        yield translate_sentence(sentence) + ". "

pipeline = summarize | stream_translate

async for chunk in pipeline.astream("Long article..."):
    print(chunk, end="", flush=True)

Type-Safe Step Contracts

Annotate functions with type hints and the pipeline validates type compatibility between adjacent steps at build time.

Automatic Inference

Type hints are extracted automatically from function signatures:

@step
def parse(raw: str) -> dict:
    return json.loads(raw)

@step
def extract(data: dict) -> list:
    return data.get("items", [])

@step
def count(items: list) -> int:
    return len(items)

# Types are validated at pipeline creation:
# parse (str -> dict) | extract (dict -> list) | count (list -> int)
pipeline = parse | extract | count  # No warnings

Type Mismatch Warning

When adjacent steps have incompatible types, a warning is emitted at pipeline creation:

@step
def to_int(text: str) -> int:
    return int(text)

@step
def join_words(words: list) -> str:
    return " ".join(words)

# Warning: Pipeline type mismatch: 'to_int' outputs int but 'join_words' expects list
pipeline = to_int | join_words

Type checking is advisory -- the pipeline still runs. This catches common mistakes without blocking execution.

Explicit Type Contracts

Override inferred types when needed:

custom_step = Step(
    my_function,
    name="custom",
    input_type=str,
    output_type=dict,
)

Generic Types

Generic types (Dict[str, Any], List[int], etc.) are accepted but not deeply validated -- the system cannot verify generic type parameters at runtime.


Pipeline as AgentGraph Node

Every Pipeline implements __call__(state), making it usable as an AgentGraph callable node. This bridges the composition and orchestration modules.

from selectools import AgentGraph, step, Pipeline

@step
def preprocess(text: str) -> str:
    return text.strip().lower()

@step
def enrich(text: str) -> str:
    return f"[enriched] {text}"

preprocessing = preprocess | enrich

# Use pipeline as a graph node
graph = AgentGraph()
graph.add_node("preprocess", preprocessing)  # Pipeline as callable node
graph.add_node("agent", my_agent)
graph.add_edge("preprocess", "agent")
graph.add_edge("agent", AgentGraph.END)
graph.set_entry("preprocess")

result = graph.run("  Raw User Input  ")

How the Bridge Works

When a Pipeline receives a GraphState:

  1. Extracts last_output from state.data (or the last message content as fallback).
  2. Runs the pipeline with that input.
  3. Writes the pipeline output back to state.data[STATE_KEY_LAST_OUTPUT].
  4. Returns the modified state.

Error Handling

on_error="raise" (Default)

Exceptions propagate immediately. The pipeline stops and the exception is raised to the caller.

@step(on_error="raise")
def strict_step(x):
    raise ValueError("failed")

pipeline = strict_step | next_step
pipeline.run("input")  # Raises ValueError

on_error="skip"

The failed step is skipped and the pipeline continues with the previous output.

@step(on_error="skip")
def optional_step(x):
    raise ValueError("not critical")

pipeline = optional_step | next_step
result = pipeline.run("input")  # next_step receives "input" unchanged

Retry + Skip

Combine retry with skip for maximum resilience:

@step(retry=3, on_error="skip")
def resilient_step(x):
    return unreliable_api.call(x)

This retries 3 times, then skips if all attempts fail.

Trace Inspection

Every step records its status in the trace, including errors and retries:

result = pipeline.run("input")
for entry in result.trace:
    print(f"{entry['step']}: {entry['status']} ({entry['duration_ms']:.1f}ms)")
    if entry.get("error"):
        print(f"  Error: {entry['error']}")
    if entry.get("retry"):
        print(f"  Retry #{entry['retry']}")

API Reference

Step.init()

ParameterTypeDefaultDescription
fnCallable(required)The function to wrap.
nameOptional[str]Function nameStep name for traces.
retryint0Retry attempts on failure.
on_errorstr"raise"Error handling: "raise" or "skip".
input_typeOptional[type]Auto-inferredExpected input type (for contract validation).
output_typeOptional[type]Auto-inferredDeclared output type (for contract validation).

Pipeline.init()

ParameterTypeDefaultDescription
stepsOptional[Sequence[Union[Step, Pipeline, Callable]]]NoneOrdered list of steps.
namestr"pipeline"Pipeline name.

Pipeline Methods

MethodDescription
run(input, **kwargs)Execute synchronously. Returns StepResult.
arun(input, **kwargs)Execute asynchronously. Returns StepResult.
astream(input, **kwargs)Async generator. Yields chunks from the final step.
stepsProperty. Read-only list of steps in the pipeline.

StepResult

FieldTypeDescription
outputAnyThe final output of the pipeline.
traceList[Dict[str, Any]]Per-step trace entries with step, duration_ms, status, and optional error/retry.
steps_runintNumber of steps that executed successfully.

step()

ParameterTypeDefaultDescription
fnOptional[Callable]NoneFunction to wrap (when used without parens).
nameOptional[str]NoneOverride step name.
retryint0Retry attempts.
on_errorstr"raise"Error handling strategy.

Returns: Step (or decorator Callable[[Callable], Step] when called with arguments).

parallel()

ParameterTypeDescription
*steps_or_fnsUnion[Step, Callable]Steps to run concurrently.

Returns: Step whose output is Dict[str, Any].

branch()

ParameterTypeDescription
routerOptional[Callable]Routing function.
**named_stepsUnion[Step, Callable]Named branch targets.

Returns: Step

retry()

ParameterTypeDefaultDescription
step_or_fnUnion[Step, Callable](required)Step to wrap.
attemptsint3Retry count.

Returns: Step

cache_step()

ParameterTypeDefaultDescription
step_or_fnUnion[Step, Callable](required)Step to wrap.
ttlint300Cache TTL in seconds.
max_sizeint1000Max cache entries.

Returns: Step


Examples

ExampleFileDescription
6666_pipeline_basics.pyStep decorator, pipe operator, run/arun
6767_pipeline_parallel_branch.pyparallel(), branch(), retry(), cache_step()
6868_pipeline_graph_bridge.pyUsing Pipeline as an AgentGraph node

Further Reading


Next Steps: Learn about multi-agent orchestration in the Orchestration Module.


#ScriptDescription
6666_streaming_pipeline.pyStreaming pipeline with the @step decorator and | operator
6767_type_safe_pipeline.pyType-safe step contracts with parallel() and branch()