CLAUDE.md

May 8, 2026 · View on GitHub

This file teaches you how to build agents and multi-agent systems with the Swarms framework. Read it before writing any code in this repo.


Installation & Setup

pip install swarms

Set your LLM API key as an environment variable before running:

export OPENAI_API_KEY="sk-..."        # OpenAI / GPT models
export ANTHROPIC_API_KEY="sk-ant-..." # Claude models
export GROQ_API_KEY="..."             # Groq
# Any provider supported by LiteLLM works

All imports come from the top-level swarms package:

from swarms import (
    Agent,
    SequentialWorkflow,
    ConcurrentWorkflow,
    AgentRearrange,
    GraphWorkflow,
    SwarmRouter,
    MixtureOfAgents,
    HierarchicalSwarm,
    GroupChat,
    MajorityVoting,
    # ...
)

Project Layout

swarms/
├── swarms/
│   ├── structs/         # All agent + multi-agent structures (61 files)
│   │   ├── agent.py             # Core Agent class
│   │   ├── conversation.py      # Conversation / memory management
│   │   ├── sequential_workflow.py
│   │   ├── concurrent_workflow.py
│   │   ├── agent_rearrange.py
│   │   ├── graph_workflow.py
│   │   ├── swarm_router.py      # Single-entry-point router
│   │   ├── mixture_of_agents.py
│   │   ├── hiearchical_swarm.py
│   │   ├── groupchat.py
│   │   ├── majority_voting.py
│   │   ├── council_as_judge.py
│   │   ├── debate_with_judge.py
│   │   ├── heavy_swarm.py
│   │   ├── round_robin.py
│   │   ├── planner_worker_swarm.py
│   │   ├── auto_swarm_builder.py
│   │   └── multi_agent_exec.py  # run_agents_concurrently + friends
│   ├── tools/           # Tool utilities, MCP, schema conversion
│   └── utils/           # Logging, formatting helpers
├── examples/            # 586 runnable examples
│   ├── single_agent/
│   ├── multi_agent/
│   ├── tools/
│   └── guides/
└── v12_examples/        # New v12 feature examples

Look in examples/ first before writing new code — there is almost certainly an existing example close to what you need.


Core Primitive: Agent

Agent is the single building block everything else composes. All multi-agent structures wrap one or more Agent instances.

Minimal agent

from swarms import Agent

agent = Agent(
    agent_name="Analyst",
    model_name="gpt-4.1",
    max_loops=1,
)

result = agent.run("Summarise the current state of LLM research.")
print(result)

Key constructor parameters

ParameterTypeDefaultPurpose
agent_namestr"swarm-worker-01"Unique name — used for memory file paths
agent_descriptionstrgenericShown to orchestrators for routing
system_promptstrbuilt-inThe agent's persona / instructions
model_namestr"gpt-4.1"Any LiteLLM model string
max_loopsint | "auto"1Loops before returning; "auto" = autonomous until done
toolslist[Callable]NonePython functions the agent can call
streaming_onboolFalseStream tokens to stdout
interactiveboolFalseREPL mode — prompt user for input each loop
context_lengthintNoneToken budget; triggers compression at 90 %
context_compressionboolTrueAuto-summarise when near context limit (v12)
persistent_memoryboolTrueRead/write MEMORY.md across restarts (v12)
temperaturefloat0.5Sampling temperature
max_tokensint4096Max tokens per LLM call
reasoning_effortstrNone"low", "medium", "high" for reasoning models
thinking_tokensintNoneExtended thinking budget (Claude)
output_typestr"str-all-except-first"How to format returned output
mcp_urlstrNoneMCP server URL to load tools from
handoffslistNoneAgents this agent can hand off to
plan_enabledboolFalseGenerate a plan before execution
autosaveboolFalseSave agent state to disk after each run

Autonomous loop (max_loops="auto")

When max_loops="auto" the agent runs a plan→execute→reflect loop until it decides it is done. It automatically gets access to:

  • A think tool (disabled when thinking_tokens is set)
  • A grep tool for searching files (v12)
  • Bash / file tools if configured
agent = Agent(
    agent_name="Researcher",
    model_name="gpt-4.1",
    max_loops="auto",
    interactive=False,
)
result = agent.run("Research the top 5 vector databases and compare them.")

Model names

Use any LiteLLM-compatible string:

# OpenAI
model_name="gpt-4.1"
model_name="gpt-5.4-mini"
model_name="o3"

# Anthropic
model_name="claude-opus-4-7-20251001"
model_name="claude-sonnet-4-6"
model_name="claude-haiku-4-5-20251001"

# Groq
model_name="groq/llama-3.3-70b-versatile"

# Google
model_name="gemini/gemini-2.5-pro"

Running with images

result = agent.run(
    task="Describe what you see in this chart.",
    img="path/to/chart.png",   # or base64 string or URL
)

Memory & Persistence (v12)

persistent_memory=True (default)

On startup the agent reads {workspace}/agents/{agent_name}/MEMORY.md and injects it as a system preamble. On each response it appends to that file. State survives process restarts automatically.

agent = Agent(
    agent_name="ProjectAssistant",
    model_name="gpt-4.1",
    persistent_memory=True,   # default
)
# First run: agent has no prior context
agent.run("My project is called Helios. Remember that.")

# New process, same agent_name → agent remembers "Helios"
agent2 = Agent(agent_name="ProjectAssistant", model_name="gpt-4.1")
agent2.run("What is my project called?")

persistent_memory=False

Fully stateless — no disk reads or writes. Use for short, isolated tasks where carry-over would be harmful.

agent = Agent(
    agent_name="OneShot",
    model_name="gpt-4.1",
    persistent_memory=False,
)

context_compression=True (default)

ContextCompressor fires automatically when token usage crosses 90 % of context_length. It summarises and rewrites MEMORY.md in place so long sessions never hit the context wall.

agent = Agent(
    agent_name="LongSession",
    model_name="gpt-4.1",
    context_length=32000,
    context_compression=True,   # default
)

Conversation.compact()

Manually collapse history to a single summary; creates a timestamped archive before rewriting:

from swarms.structs.conversation import Conversation

conv = Conversation(agent_name="MyAgent", system_prompt="You are helpful.")
conv.add("user", "Tell me about X")
conv.add("assistant", "X is ...")

# Collapse history, archive the full log
conv.compact(summary="User asked about X. Assistant explained X.")

Tools

Python functions as tools

Decorate any Python function with a docstring — the framework converts it to an OpenAI function-calling schema automatically:

import yfinance as yf
from swarms import Agent

def get_stock_price(ticker: str) -> str:
    """Fetch the current stock price for a given ticker symbol.

    Args:
        ticker: Stock ticker symbol, e.g. 'AAPL'.

    Returns:
        Current price as a formatted string.
    """
    data = yf.Ticker(ticker)
    price = data.fast_info["last_price"]
    return f"{ticker}: ${price:.2f}"

agent = Agent(
    agent_name="StockAnalyst",
    model_name="gpt-4.1",
    tools=[get_stock_price],
    max_loops=3,
)
result = agent.run("What is the current price of Apple and Microsoft?")

Multiple tools

agent = Agent(
    agent_name="ResearchAgent",
    model_name="gpt-4.1",
    tools=[search_web, get_stock_price, read_file, write_file],
    max_loops="auto",
)

Tool schema from Pydantic

from swarms.tools.pydantic_to_json import base_model_to_openai_function
from pydantic import BaseModel

class WeatherQuery(BaseModel):
    city: str
    units: str = "celsius"

schema = base_model_to_openai_function(WeatherQuery)

Streaming

Stream to stdout

agent = Agent(
    agent_name="Writer",
    model_name="gpt-4.1",
    streaming_on=True,
)
agent.run("Write a short poem about distributed systems.")

Stream tokens to a callback

def handle_token(token: str) -> None:
    print(token, end="", flush=True)

agent = Agent(
    agent_name="Writer",
    model_name="gpt-4.1",
    streaming_callback=handle_token,
)
agent.run("Write a haiku.")

Async streaming (arun_stream)

import asyncio
from swarms import Agent

agent = Agent(agent_name="AsyncWriter", model_name="gpt-4.1", streaming_on=True)

async def main():
    async for token in agent.arun_stream("Explain async/await in Python."):
        print(token, end="", flush=True)

asyncio.run(main())

Multi-Agent Structures

Sequential Workflow

Agents execute one after another. The output of each agent is passed as context to the next.

from swarms import Agent, SequentialWorkflow

researcher = Agent(agent_name="Researcher", model_name="gpt-4.1", max_loops=1)
analyst   = Agent(agent_name="Analyst",    model_name="gpt-4.1", max_loops=1)
writer    = Agent(agent_name="Writer",     model_name="gpt-4.1", max_loops=1)

pipeline = SequentialWorkflow(
    agents=[researcher, analyst, writer],
    max_loops=1,
)
result = pipeline.run("Analyse the impact of interest rate hikes on tech stocks.")

When to use: Linear pipelines where each step depends on the prior step's output. Research → Analysis → Report. Extraction → Transformation → Load.


Concurrent Workflow

All agents run in parallel on the same task. Results are collected and returned together.

from swarms import Agent, ConcurrentWorkflow

agents = [
    Agent(agent_name=f"Worker-{i}", model_name="gpt-4.1", max_loops=1)
    for i in range(5)
]

workflow = ConcurrentWorkflow(agents=agents)
results = workflow.run("List 10 use cases for multi-agent AI systems.")

When to use: Independent subtasks that can run simultaneously. Analysing multiple documents. Querying multiple data sources. Generating multiple creative variants.


AgentRearrange — Flow DSL

Define execution flow as a string using a simple DSL. Mix sequential (->) and parallel (,) execution.

from swarms import Agent, AgentRearrange

planner  = Agent(agent_name="Planner",  model_name="gpt-4.1", max_loops=1)
coder    = Agent(agent_name="Coder",    model_name="gpt-4.1", max_loops=1)
reviewer = Agent(agent_name="Reviewer", model_name="gpt-4.1", max_loops=1)
tester   = Agent(agent_name="Tester",   model_name="gpt-4.1", max_loops=1)

pipeline = AgentRearrange(
    agents=[planner, coder, reviewer, tester],
    flow="Planner -> Coder -> Reviewer, Tester",
    #        sequential  ↑      parallel  ↑
    max_loops=1,
)
result = pipeline.run("Build a Python function that validates email addresses.")

Flow DSL rules:

  • A -> B — A runs, then B receives A's output
  • A, B — A and B run concurrently with the same input
  • A -> B, C -> D — A runs first, then B and C run concurrently, then D receives their combined output
  • A -> H -> B — Insert a human-in-the-loop step (requires human_in_the_loop=True)

When to use: Any workflow where you need explicit, readable control over agent execution order and parallelism.


GraphWorkflow — DAG Execution

Full directed-acyclic-graph (DAG) execution. Nodes are agents; edges are dependencies. Topological sort ensures correct order. Supports per-node callbacks and token streaming.

from swarms import Agent, GraphWorkflow, Node, Edge, NodeType

# Build agents
analyst  = Agent(agent_name="Analyst",  model_name="gpt-5.4-mini", max_loops=1)
writer   = Agent(agent_name="Writer",   model_name="gpt-5.4-mini", max_loops=1)
reviewer = Agent(agent_name="Reviewer", model_name="gpt-5.4-mini", max_loops=1)
publisher = Agent(agent_name="Publisher", model_name="gpt-5.4-mini", max_loops=1)

# Build graph
wf = GraphWorkflow()
wf.add_node(Node(id="analyst",   type=NodeType.AGENT, agent=analyst))
wf.add_node(Node(id="writer",    type=NodeType.AGENT, agent=writer))
wf.add_node(Node(id="reviewer",  type=NodeType.AGENT, agent=reviewer))
wf.add_node(Node(id="publisher", type=NodeType.AGENT, agent=publisher))

wf.add_edge(Edge(source="analyst",  target="writer"))
wf.add_edge(Edge(source="writer",   target="reviewer"))
wf.add_edge(Edge(source="reviewer", target="publisher"))

wf.set_entry_points(["analyst"])
wf.set_end_points(["publisher"])

# Run with callbacks
def on_done(node_name: str, result: str) -> None:
    print(f"[{node_name}] finished — {len(result)} chars")

results = wf.run(
    task="Produce a market report on AI chips.",
    on_node_complete=on_done,          # fires after each node
    streaming_callback=lambda tok: print(tok, end="", flush=True),
)

Diamond / fan-out fan-in pattern:

# analyst feeds both writer AND researcher concurrently,
# then editor combines both outputs
wf.add_edge(Edge(source="analyst",    target="writer"))
wf.add_edge(Edge(source="analyst",    target="researcher"))
wf.add_edge(Edge(source="writer",     target="editor"))
wf.add_edge(Edge(source="researcher", target="editor"))

When to use: Complex dependency graphs, fan-out/fan-in patterns, when you need precise control over which agents depend on which.


SwarmRouter — Single Entry Point

SwarmRouter is the highest-level abstraction. Pass it a list of agents and a swarm_type — it handles the rest. Use this when you want to switch architectures without rewriting orchestration code.

from swarms import Agent, SwarmRouter

agents = [
    Agent(agent_name="Analyst",  model_name="gpt-4.1", max_loops=1),
    Agent(agent_name="Writer",   model_name="gpt-4.1", max_loops=1),
    Agent(agent_name="Reviewer", model_name="gpt-4.1", max_loops=1),
]

router = SwarmRouter(
    agents=agents,
    swarm_type="SequentialWorkflow",   # swap to any SwarmType below
    max_loops=1,
)
result = router.run("Write a blog post about transformer architectures.")

All swarm_type options:

SwarmTypeBehaviour
"SequentialWorkflow"Agents run one after another
"ConcurrentWorkflow"Agents run in parallel
"AgentRearrange"Flow-DSL based execution
"MixtureOfAgents"Workers + aggregator layer
"HierarchicalSwarm"Boss delegates to workers
"GroupChat"Multi-agent round-table discussion
"MultiAgentRouter"Task routed to best-fit agent
"MajorityVoting"Agents vote; majority wins
"CouncilAsAJudge"Council deliberates; judge decides
"DebateWithJudge"Agents debate; judge rules
"HeavySwarm"Intensive multi-loop deep analysis
"RoundRobin"Round-robin task distribution
"PlannerWorkerSwarm"Planner + worker delegation
"BatchedGridWorkflow"Grid-based batch execution
"LLMCouncil"LLM-based council decisions
"AutoSwarmBuilder"Auto-configures everything
"auto"Router selects swarm_type automatically

MixtureOfAgents

Multiple worker agents each respond to the task independently, then an aggregator agent synthesises all responses into a final answer. Repeat for multiple layers.

from swarms import Agent, MixtureOfAgents

workers = [
    Agent(agent_name="Worker-GPT",    model_name="gpt-4.1",       max_loops=1),
    Agent(agent_name="Worker-Claude", model_name="claude-sonnet-4-6", max_loops=1),
    Agent(agent_name="Worker-Llama",  model_name="groq/llama-3.3-70b-versatile", max_loops=1),
]

aggregator = Agent(
    agent_name="Aggregator",
    model_name="gpt-4.1",
    system_prompt="Synthesise the following expert responses into one coherent answer.",
    max_loops=1,
)

moa = MixtureOfAgents(
    agents=workers,
    aggregator_agent=aggregator,
    layers=2,        # run worker→aggregate cycle this many times
    max_loops=1,
)
result = moa.run("What are the best practices for securing a Kubernetes cluster?")

When to use: High-stakes tasks where you want multiple independent perspectives merged into a consensus. Works especially well with diverse model providers.


HierarchicalSwarm

A director agent breaks the task into subtasks and delegates them to worker agents. Workers report back; director synthesises.

from swarms import Agent, HierarchicalSwarm

director = Agent(
    agent_name="Director",
    agent_description="Breaks complex tasks into subtasks and delegates them.",
    model_name="gpt-4.1",
    max_loops=1,
)

workers = [
    Agent(agent_name="DataWorker",    model_name="gpt-5.4-mini", max_loops=1),
    Agent(agent_name="WritingWorker", model_name="gpt-5.4-mini", max_loops=1),
    Agent(agent_name="ReviewWorker",  model_name="gpt-5.4-mini", max_loops=1),
]

swarm = HierarchicalSwarm(
    director=director,
    agents=workers,
    max_loops=2,
)
result = swarm.run("Produce a comprehensive competitive analysis of the AI chip market.")

When to use: Tasks naturally decomposed into subtasks where a coordinator must manage work allocation and synthesis.


GroupChat

Agents engage in a round-table discussion. A speaker selection function decides who speaks next. Use for brainstorming, debate, or collaborative problem-solving.

from swarms import Agent, GroupChat
from swarms.structs.groupchat import expertise_based, round_robin_speaker

optimist  = Agent(agent_name="Optimist",  system_prompt="You argue for the benefits.", model_name="gpt-4.1", max_loops=1)
pessimist = Agent(agent_name="Pessimist", system_prompt="You argue for the risks.",    model_name="gpt-4.1", max_loops=1)
realist   = Agent(agent_name="Realist",   system_prompt="You seek balanced analysis.", model_name="gpt-4.1", max_loops=1)

chat = GroupChat(
    agents=[optimist, pessimist, realist],
    speaker_fn=round_robin_speaker,   # or expertise_based, random_speaker, priority_speaker
    max_loops=3,                      # 3 rounds of discussion
)
result = chat.run("Should we adopt AI for medical diagnosis?")

Speaker functions: round_robin_speaker, expertise_based, random_speaker, priority_speaker, random_dynamic_speaker


MajorityVoting

All agents independently answer the task. The answer that appears in the majority of responses wins.

from swarms import Agent, MajorityVoting

voters = [
    Agent(agent_name=f"Voter-{i}", model_name="gpt-5.4-mini", max_loops=1)
    for i in range(5)
]

mv = MajorityVoting(agents=voters, max_loops=1)
result = mv.run("Is Python or Rust better for building a high-performance web server?")

When to use: Classification, yes/no decisions, or any task with a discrete answer set where you want noise reduction through consensus.


CouncilAsAJudge

A council of agents each deliberate, then a judge agent makes the final ruling based on the council's reasoning.

from swarms import Agent, CouncilAsAJudge

council = [
    Agent(agent_name="Expert-Security", model_name="gpt-4.1", max_loops=1),
    Agent(agent_name="Expert-Privacy",  model_name="gpt-4.1", max_loops=1),
    Agent(agent_name="Expert-Legal",    model_name="gpt-4.1", max_loops=1),
]

judge = Agent(
    agent_name="Judge",
    system_prompt="Given the council's analysis, deliver a final verdict.",
    model_name="gpt-4.1",
    max_loops=1,
)

council_swarm = CouncilAsAJudge(
    agents=council,
    judge=judge,
    max_loops=1,
)
result = council_swarm.run("Should we store user biometric data on-device only?")

DebateWithJudge

Two or more agents argue opposing positions for multiple rounds. A judge delivers a verdict at the end.

from swarms import Agent, DebateWithJudge

pro  = Agent(agent_name="Pro",  system_prompt="Argue strongly in favour.",  model_name="gpt-4.1", max_loops=1)
con  = Agent(agent_name="Con",  system_prompt="Argue strongly against.",    model_name="gpt-4.1", max_loops=1)

judge = Agent(
    agent_name="Judge",
    system_prompt="Evaluate the debate and deliver an objective verdict.",
    model_name="gpt-4.1",
    max_loops=1,
)

debate = DebateWithJudge(
    agents=[pro, con],
    judge=judge,
    max_loops=3,   # 3 rounds of argument
)
result = debate.run("Motion: Open-source LLMs will surpass closed-source models by 2027.")

HeavySwarm

Intensive multi-loop analysis. Each agent runs for many loops on the problem, producing deep reasoning. Best for research-grade analysis.

from swarms import HeavySwarm

swarm = HeavySwarm(
    num_agents=4,
    model_name="gpt-4.1",
    loops_per_agent=5,       # each agent reasons for 5 loops
    show_output=True,
)
result = swarm.run("Derive a novel approach to solving the alignment problem in AI.")

Or via SwarmRouter:

from swarms import Agent, SwarmRouter

agents = [Agent(agent_name=f"Deep-{i}", model_name="gpt-4.1", max_loops=5) for i in range(4)]
router = SwarmRouter(agents=agents, swarm_type="HeavySwarm")
result = router.run("Deep analysis: implications of AGI on global labour markets.")

RoundRobinSwarm

Distributes tasks to agents in a fixed rotation. Each agent handles every Nth task.

from swarms import Agent, RoundRobinSwarm

agents = [
    Agent(agent_name=f"Handler-{i}", model_name="gpt-5.4-mini", max_loops=1)
    for i in range(3)
]

rr = RoundRobinSwarm(agents=agents, max_loops=1)

tasks = ["Task A", "Task B", "Task C", "Task D", "Task E", "Task F"]
for task in tasks:
    result = rr.run(task)

PlannerWorkerSwarm

A planner agent generates a structured plan; worker agents execute each step.

from swarms import Agent, PlannerWorkerSwarm

planner = Agent(
    agent_name="Planner",
    system_prompt="You create detailed, step-by-step execution plans.",
    model_name="gpt-4.1",
    max_loops=1,
)

workers = [
    Agent(agent_name=f"Worker-{i}", model_name="gpt-5.4-mini", max_loops=2)
    for i in range(4)
]

swarm = PlannerWorkerSwarm(
    planner_agent=planner,
    worker_agents=workers,
    max_loops=1,
)
result = swarm.run("Build a complete go-to-market strategy for a B2B SaaS product.")

AutoSwarmBuilder

Pass a high-level description of the task — the framework automatically creates the agents, assigns roles, and runs the appropriate swarm architecture.

from swarms import AutoSwarmBuilder

builder = AutoSwarmBuilder(
    name="MarketResearchSwarm",
    description="A swarm that produces comprehensive market research reports",
    max_loops=2,
)
result = builder.run("Research the electric vehicle market and identify growth opportunities.")

When to use: Rapid prototyping, when you don't know yet which structure fits, or when you want the LLM to decide.


Utility Execution Helpers

from swarms.structs.multi_agent_exec import (
    run_agents_concurrently,
    run_agents_concurrently_async,
    run_agents_with_different_tasks,
    run_single_agent,
)

# Same task, all agents in parallel
results = run_agents_concurrently(agents=agents, task="Summarise the news today.")

# Different task per agent
task_map = {agent: task for agent, task in zip(agents, tasks)}
results = run_agents_with_different_tasks(task_map)

# Async version
import asyncio
results = asyncio.run(run_agents_concurrently_async(agents=agents, task="..."))

Async Support

import asyncio
from swarms import Agent

agent = Agent(agent_name="AsyncAgent", model_name="gpt-4.1")

async def main():
    # Standard async run
    result = await agent.arun("What is the capital of France?")
    print(result)

    # Streaming async run
    async for token in agent.arun_stream("Explain quantum entanglement."):
        print(token, end="", flush=True)

asyncio.run(main())

MCP Tool Integration

Load tools from any MCP server. The agent auto-discovers available tools on startup.

from swarms import Agent

# Single MCP server
agent = Agent(
    agent_name="MCPAgent",
    model_name="gpt-4.1",
    mcp_url="http://localhost:8000/sse",   # SSE endpoint
    max_loops="auto",
)

# Multiple MCP servers
agent = Agent(
    agent_name="MultiMCPAgent",
    model_name="gpt-4.1",
    mcp_urls=[
        "http://localhost:8000/sse",
        "http://localhost:8001/sse",
    ],
    max_loops="auto",
)

result = agent.run("Use the available tools to complete the task.")

Fetch tools manually:

from swarms.tools.mcp_client_tools import get_mcp_tools_sync, aget_mcp_tools

tools = get_mcp_tools_sync(server_url="http://localhost:8000/sse")

import asyncio
tools = asyncio.run(aget_mcp_tools(server_url="http://localhost:8000/sse"))

Conversation Management

Conversation manages message history with optional disk persistence.

from swarms.structs.conversation import Conversation

conv = Conversation(
    system_prompt="You are a helpful assistant.",
    agent_name="MyAgent",      # keys MEMORY.md to this name
    time_enabled=True,         # include ISO timestamps in history
)

conv.add("user", "What is 2+2?")
conv.add("assistant", "4.")

# Get history as string (includes timestamps in v12)
history_str = conv.return_history_as_string()

# Compact + archive
conv.compact(summary="User asked basic arithmetic. Answer: 4.")

# Pass to an agent
agent = Agent(
    agent_name="MyAgent",
    model_name="gpt-4.1",
    # agent reads MEMORY.md automatically when persistent_memory=True
)

Choosing the Right Structure

SituationUse
Simple single taskAgent
Linear A→B→C pipelineSequentialWorkflow
Same task, many agents at onceConcurrentWorkflow
Custom mix of sequential + parallelAgentRearrange
Complex dependency graph / DAGGraphWorkflow
Need per-node callbacks or streamingGraphWorkflow
Multiple models, one synthesised answerMixtureOfAgents
Manager delegates to specialistsHierarchicalSwarm
Open discussion / brainstormingGroupChat
Discrete decision via consensusMajorityVoting
High-stakes ruling with deliberationCouncilAsAJudge
Structured adversarial debateDebateWithJudge
Deep research, many loopsHeavySwarm
Don't know yet / rapid prototypingAutoSwarmBuilder or SwarmRouter(swarm_type="auto")
Need to switch architectures easilySwarmRouter

Common Patterns & Recipes

Pattern: Research → Write → Review pipeline

from swarms import Agent, SequentialWorkflow

pipeline = SequentialWorkflow(agents=[
    Agent(agent_name="Researcher", system_prompt="You research topics thoroughly.", model_name="gpt-4.1"),
    Agent(agent_name="Writer",     system_prompt="You write clear, engaging content.", model_name="gpt-4.1"),
    Agent(agent_name="Editor",     system_prompt="You improve clarity and fix errors.", model_name="gpt-4.1"),
], max_loops=1)

result = pipeline.run("Write an article about the history of neural networks.")

Pattern: Fan-out to specialists, fan-in to synthesiser

from swarms import Agent, MixtureOfAgents

specialists = [
    Agent(agent_name="TechExpert",    system_prompt="Analyse the technical aspects.", model_name="gpt-4.1"),
    Agent(agent_name="BusinessExpert",system_prompt="Analyse the business aspects.", model_name="gpt-4.1"),
    Agent(agent_name="LegalExpert",   system_prompt="Analyse the legal aspects.",   model_name="gpt-4.1"),
]
synthesiser = Agent(agent_name="Synthesiser", model_name="gpt-4.1",
                    system_prompt="Combine expert analyses into one coherent report.")

moa = MixtureOfAgents(agents=specialists, aggregator_agent=synthesiser)
result = moa.run("Evaluate the risks of launching a fintech product in the EU.")

Pattern: Autonomous agent with tools and memory

import os
from swarms import Agent

def search_web(query: str) -> str:
    """Search the web for a query and return results."""
    # your implementation
    ...

def write_file(filename: str, content: str) -> str:
    """Write content to a file."""
    with open(filename, "w") as f:
        f.write(content)
    return f"Written to {filename}"

agent = Agent(
    agent_name="AutonomousResearcher",
    model_name="gpt-4.1",
    max_loops="auto",
    tools=[search_web, write_file],
    persistent_memory=True,
    context_compression=True,
    context_length=32000,
)
agent.run("Research the top 10 open-source LLMs and write a comparison report to report.md")

Pattern: Multi-model ensemble with streaming

import sys
from swarms import Agent, ConcurrentWorkflow

agents = [
    Agent(agent_name="GPT",    model_name="gpt-4.1",          max_loops=1),
    Agent(agent_name="Claude", model_name="claude-sonnet-4-6", max_loops=1),
    Agent(agent_name="Gemini", model_name="gemini/gemini-2.5-pro", max_loops=1),
]

workflow = ConcurrentWorkflow(agents=agents)
results = workflow.run("What is the most important unsolved problem in mathematics?")

for agent_name, answer in results.items():
    print(f"\n=== {agent_name} ===\n{answer}")

Pattern: Human-in-the-loop with AgentRearrange

from swarms import Agent, AgentRearrange

def human_input(response: str) -> str:
    print(f"\nAgent says:\n{response}\n")
    return input("Your feedback: ")

pipeline = AgentRearrange(
    agents=[
        Agent(agent_name="Drafter",  model_name="gpt-4.1"),
        Agent(agent_name="Finisher", model_name="gpt-4.1"),
    ],
    flow="Drafter -> H -> Finisher",
    human_in_the_loop=True,
    custom_human_in_the_loop=human_input,
)
result = pipeline.run("Draft a press release about our product launch.")

Pattern: GraphWorkflow with fan-out / fan-in

from swarms import Agent, GraphWorkflow, Node, Edge, NodeType

ingestion = Agent(agent_name="Ingestion", model_name="gpt-5.4-mini", max_loops=1)
branch_a  = Agent(agent_name="BranchA",   model_name="gpt-5.4-mini", max_loops=1)
branch_b  = Agent(agent_name="BranchB",   model_name="gpt-5.4-mini", max_loops=1)
merger    = Agent(agent_name="Merger",    model_name="gpt-4.1",      max_loops=1)

wf = GraphWorkflow()
for a in [ingestion, branch_a, branch_b, merger]:
    wf.add_node(Node(id=a.agent_name, type=NodeType.AGENT, agent=a))

wf.add_edge(Edge(source="Ingestion", target="BranchA"))
wf.add_edge(Edge(source="Ingestion", target="BranchB"))
wf.add_edge(Edge(source="BranchA",   target="Merger"))
wf.add_edge(Edge(source="BranchB",   target="Merger"))

wf.set_entry_points(["Ingestion"])
wf.set_end_points(["Merger"])

results = wf.run(task="Process this dataset from two angles and merge the findings.")

What to Avoid

Don't import from submodules directly — always import from swarms:

# Wrong
from swarms.structs.agent import Agent

# Right
from swarms import Agent

Don't set max_loops="auto" without a clear stopping condition — the agent will loop until it decides it is done or hits a resource limit. Prefer explicit max_loops=N for production tasks.

Don't give all agents the same agent_namepersistent_memory and MEMORY.md are keyed on agent_name. Duplicate names cause agents to share and corrupt each other's memory.

Don't instantiate heavyweight structures inside tight loops — create agents and workflows once, reuse them across calls.

Don't pass tools=[] (empty list) — pass tools=None instead. An empty list can confuse schema generation.

Don't use streaming_on=True and streaming_callback together on the same agentstreaming_on streams to stdout; streaming_callback streams to your function. Pick one.

Don't set context_compression=False on very long autonomous sessions — without compression the agent will eventually hit the context limit and raise an error.

For long-running autonomous agents in production, always set:

agent = Agent(
    ...
    persistent_memory=True,
    context_compression=True,
    context_length=32000,
    autosave=True,
)