MCP Integration Guide

February 17, 2026 · View on GitHub

Connect to remote tool servers using the Model Context Protocol.

Table of Contents


Overview

CHUK Tool Processor supports three MCP transport mechanisms:

    LLM Output

  Tool Processor

 ┌──────────────┬────────────────────┐
 │ Local Tools  │ Remote Tools (MCP) │
 └──────────────┴────────────────────┘

Relationship with chuk-mcp:

  • chuk-mcp is a low-level MCP protocol client (handles transports, protocol negotiation)
  • chuk-tool-processor wraps chuk-mcp to integrate external tools into your execution pipeline
  • You can use local tools, remote MCP tools, or both in the same processor

Transport Comparison

TransportUse CaseReal Examples
HTTP StreamableCloud APIs, SaaS servicesNotion (mcp.notion.com)
STDIOLocal tools, databasesSQLite (mcp-server-sqlite), Echo (chuk-mcp-echo)
SSELegacy cloud servicesAtlassian (mcp.atlassian.com)

When to Use Each

ScenarioRecommended Transport
Cloud SaaS with OAuthHTTP Streamable
Local database accessSTDIO
File system operationsSTDIO
Legacy MCP serversSSE
Development/testingSTDIO (Echo server)

Modern HTTP streaming transport for cloud-based MCP servers like Notion.

Use for: Cloud SaaS services (OAuth, long-running streams, resilient reconnects)

from chuk_tool_processor.mcp import setup_mcp_http_streamable

# Connect to Notion MCP with OAuth
servers = [
    {
        "name": "notion",
        "url": "https://mcp.notion.com/mcp",
        "headers": {"Authorization": f"Bearer {access_token}"}
    }
]

processor, manager = await setup_mcp_http_streamable(
    servers=servers,
    namespace="notion",
    initialization_timeout=120.0,  # Some services need time to initialize
    enable_caching=True,
    enable_retries=True
)

# Use Notion tools through MCP
results = await processor.process(
    '<tool name="notion.search_pages" args=\'{"query": "meeting notes"}\'/>'
)

HTTP Streamable Options

ParameterDefaultDescription
serversrequiredList of server configs with name, url, headers
namespace"mcp"Prefix for tool names
initialization_timeout30.0Timeout for server initialization
enable_cachingFalseEnable result caching
enable_retriesFalseEnable automatic retries
oauth_refresh_callbackNoneCallback for token refresh

STDIO (Best for Local Tools)

Run local MCP servers as subprocesses. Great for databases, file systems, and local tools.

Use for: Local/embedded tools and databases (SQLite, file systems, local services)

from chuk_tool_processor.mcp import setup_mcp_stdio
import json

# Configure SQLite MCP server
config = {
    "mcpServers": {
        "sqlite": {
            "command": "uvx",
            "args": ["mcp-server-sqlite", "--db-path", "/path/to/database.db"],
            "env": {"MCP_SERVER_NAME": "sqlite"},
            "transport": "stdio"
        }
    }
}

# Save config to file
with open("mcp_config.json", "w") as f:
    json.dump(config, f)

# Connect to local SQLite server
processor, manager = await setup_mcp_stdio(
    config_file="mcp_config.json",
    servers=["sqlite"],
    namespace="db",
    initialization_timeout=120.0  # First run downloads packages
)

# Query your local database via MCP
results = await processor.process(
    '<tool name="db.query" args=\'{"sql": "SELECT * FROM users LIMIT 10"}\'/>'
)

STDIO Config File Format

{
  "mcpServers": {
    "server_name": {
      "command": "uvx",
      "args": ["package-name", "--flag", "value"],
      "env": {"ENV_VAR": "value"},
      "transport": "stdio"
    }
  }
}

STDIO Options

ParameterDefaultDescription
config_filerequiredPath to MCP config JSON
serversrequiredList of server names to connect
namespace"mcp"Prefix for tool names
initialization_timeout30.0Timeout for server initialization

SSE (Legacy Support)

For backward compatibility with older MCP servers using Server-Sent Events.

Use for: Legacy compatibility only. Prefer HTTP Streamable for new integrations.

from chuk_tool_processor.mcp import setup_mcp_sse

# Connect to Atlassian with OAuth via SSE
servers = [
    {
        "name": "atlassian",
        "url": "https://mcp.atlassian.com/v1/sse",
        "headers": {"Authorization": f"Bearer {access_token}"}
    }
]

processor, manager = await setup_mcp_sse(
    servers=servers,
    namespace="atlassian",
    initialization_timeout=120.0
)

Middleware Stack

The MiddlewareStack provides production-grade resilience for MCP tool calls. It wraps MCP connections with configurable retry, circuit breaker, and rate limiting layers.

Architecture

Tool Call

┌─────────────────────────────────┐
│        Middleware Stack          │
│  (Applied in this order)         │
│   • Rate Limiting (outermost)    │
│   • Circuit Breaker              │
│   • Retry (innermost)            │
│   • Transport                    │
└─────────────────────────────────┘

MCP Server

Basic Usage

from chuk_tool_processor.mcp.middleware import (
    MiddlewareConfig,
    MiddlewareStack,
    RetrySettings,
    CircuitBreakerSettings,
    RateLimitSettings,
)
from chuk_tool_processor.mcp import StreamManager

# Configure middleware layers
config = MiddlewareConfig(
    retry=RetrySettings(
        enabled=True,
        max_retries=3,
        base_delay=1.0,
        max_delay=30.0,
        jitter=True,
    ),
    circuit_breaker=CircuitBreakerSettings(
        enabled=True,
        failure_threshold=5,
        reset_timeout=60.0,
    ),
    rate_limiting=RateLimitSettings(
        enabled=True,
        global_limit=100,
        period=60.0,
    ),
)

# Create middleware stack around a StreamManager
stream_manager = StreamManager()
await stream_manager.initialize(servers)

middleware = MiddlewareStack(stream_manager, config=config)

# Execute tools through middleware
result = await middleware.call_tool(
    tool_name="notion.search_pages",
    arguments={"query": "docs"},
    timeout=30.0,
)

if result.success:
    print(f"Result: {result.result}")
else:
    print(f"Error: {result.error}")

Middleware Configuration

RetrySettings

FieldTypeDefaultDescription
enabledboolTrueEnable retry middleware
max_retriesint3Maximum retry attempts
base_delayfloat1.0Initial delay between retries (seconds)
max_delayfloat30.0Maximum delay between retries
jitterboolTrueAdd randomness to delays
retry_on_errorslist[str]Transport errorsError patterns that trigger retry
skip_on_errorslist[str]Auth errorsError patterns that skip retry

CircuitBreakerSettings

FieldTypeDefaultDescription
enabledboolTrueEnable circuit breaker
failure_thresholdint5Failures before opening circuit
success_thresholdint2Successes to close circuit
reset_timeoutfloat60.0Seconds before half-open state
half_open_max_callsint1Calls allowed in half-open state

RateLimitSettings

FieldTypeDefaultDescription
enabledboolFalseEnable rate limiting
global_limitint100Max requests per period
periodfloat60.0Time window in seconds
per_tool_limitsdict{}Per-tool limits as {tool: (limit, period)}

Monitoring Middleware Status

# Get current middleware status
status = middleware.get_status()

# Check retry status
if status.retry:
    print(f"Retry enabled: max_retries={status.retry.max_retries}")

# Check circuit breaker states per tool
if status.circuit_breaker:
    for tool, state in status.circuit_breaker.tool_states.items():
        print(f"{tool}: state={state.state}, failures={state.failure_count}")

# Check rate limiting
if status.rate_limiting:
    print(f"Rate limit: {status.rate_limiting.global_limit}/{status.rate_limiting.period}s")

Error Classification

The middleware automatically classifies errors:

Retryable Errors (will trigger retry):

  • Transport not initialized
  • connection errors
  • timeout errors
  • refused connections
  • reset connections
  • closed connections

Non-Retryable Errors (fail immediately):

  • oauth errors
  • unauthorized errors
  • authentication errors
  • invalid_grant errors
  • No server found errors

ToolExecutionResult

Each middleware call returns a ToolExecutionResult:

result = await middleware.call_tool("tool_name", {"arg": "value"})

# Fields available:
result.success       # bool: True if execution succeeded
result.result        # Any: The tool result (if success)
result.error         # str | None: Error message (if failed)
result.tool_name     # str: Name of the tool called
result.duration_ms   # float: Execution time in milliseconds
result.attempts      # int: Number of attempts (with retries)
result.from_cache    # bool: True if result was cached

Real-World Examples

Notion Integration with OAuth

Complete OAuth flow connecting to Notion's MCP server:

from chuk_tool_processor.mcp import setup_mcp_http_streamable

# After completing OAuth flow (see examples/04_mcp_integration/notion_oauth.py)
processor, manager = await setup_mcp_http_streamable(
    servers=[{
        "name": "notion",
        "url": "https://mcp.notion.com/mcp",
        "headers": {"Authorization": f"Bearer {access_token}"}
    }],
    namespace="notion",
    initialization_timeout=120.0
)

# Get available Notion tools
tools = manager.get_all_tools()
print(f"Available tools: {[t['name'] for t in tools]}")

# Use Notion tools in your LLM workflow
results = await processor.process(
    '<tool name="notion.search_pages" args=\'{"query": "Q4 planning"}\'/>'
)

Local SQLite Database Access

Run SQLite MCP server locally for database operations:

from chuk_tool_processor.mcp import setup_mcp_stdio
import json

# Configure SQLite server
config = {
    "mcpServers": {
        "sqlite": {
            "command": "uvx",
            "args": ["mcp-server-sqlite", "--db-path", "./data/app.db"],
            "transport": "stdio"
        }
    }
}

with open("mcp_config.json", "w") as f:
    json.dump(config, f)

# Connect to local database
processor, manager = await setup_mcp_stdio(
    config_file="mcp_config.json",
    servers=["sqlite"],
    namespace="db",
    initialization_timeout=120.0  # First run downloads mcp-server-sqlite
)

# Query your database via LLM
results = await processor.process(
    '<tool name="db.query" args=\'{"sql": "SELECT COUNT(*) FROM users"}\'/>'
)

Simple STDIO Echo Server

Minimal example for testing STDIO transport:

from chuk_tool_processor.mcp import setup_mcp_stdio
import json

# Configure echo server (great for testing)
config = {
    "mcpServers": {
        "echo": {
            "command": "uvx",
            "args": ["chuk-mcp-echo", "stdio"],
            "transport": "stdio"
        }
    }
}

with open("echo_config.json", "w") as f:
    json.dump(config, f)

processor, manager = await setup_mcp_stdio(
    config_file="echo_config.json",
    servers=["echo"],
    namespace="echo",
    initialization_timeout=60.0
)

# Test echo functionality
results = await processor.process(
    '<tool name="echo.echo" args=\'{"message": "Hello MCP!"}\'/>'
)

OAuth Token Refresh

For MCP servers that use OAuth authentication, CHUK Tool Processor supports automatic token refresh when access tokens expire.

How It Works

  1. When a tool call receives an OAuth-related error (e.g., "invalid_token", "expired token")
  2. The processor automatically calls your refresh callback
  3. Updates the authentication headers with the new token
  4. Retries the tool call with fresh credentials

Setup with HTTP Streamable

from chuk_tool_processor.mcp import setup_mcp_http_streamable

async def refresh_oauth_token():
    """Called automatically when tokens expire."""
    new_token = await your_refresh_logic()
    return {"Authorization": f"Bearer {new_token}"}

processor, manager = await setup_mcp_http_streamable(
    servers=[{
        "name": "notion",
        "url": "https://mcp.notion.com/mcp",
        "headers": {"Authorization": f"Bearer {initial_access_token}"}
    }],
    namespace="notion",
    oauth_refresh_callback=refresh_oauth_token  # Enable auto-refresh
)

Setup with SSE

from chuk_tool_processor.mcp import setup_mcp_sse

async def refresh_oauth_token():
    """Refresh expired OAuth token."""
    new_access_token = await exchange_refresh_token(refresh_token)
    return {"Authorization": f"Bearer {new_access_token}"}

processor, manager = await setup_mcp_sse(
    servers=[{
        "name": "atlassian",
        "url": "https://mcp.atlassian.com/v1/sse",
        "headers": {"Authorization": f"Bearer {initial_token}"}
    }],
    namespace="atlassian",
    oauth_refresh_callback=refresh_oauth_token
)

OAuth Errors Detected

The following error patterns trigger automatic refresh:

  • invalid_token
  • expired token
  • OAuth validation failed
  • unauthorized
  • token expired
  • authentication failed
  • invalid access token

Important Notes

  • The refresh callback must return a dict with an Authorization key
  • If refresh fails or returns invalid headers, the original error is returned
  • Token refresh is attempted only once per tool call (no infinite retry loops)
  • After successful refresh, the updated headers are used for all subsequent calls

MCPConfig (Clean Configuration)

Use Pydantic models for clean, type-safe configuration:

from chuk_tool_processor.mcp import setup_mcp_stdio, MCPConfig, MCPServerConfig

# Clean Pydantic config object instead of dict
processor, manager = await setup_mcp_stdio(
    config=MCPConfig(
        servers=[
            MCPServerConfig(
                name="echo",
                command="uvx",
                args=["mcp-echo"]
            )
        ],
        namespace="tools",
        enable_caching=True,
        cache_ttl=600,
    )
)

MCPConfig Options

FieldTypeDescription
serverslist[MCPServerConfig]Server configurations
namespacestrTool name prefix
enable_cachingboolEnable result caching
cache_ttlintCache TTL in seconds
enable_retriesboolEnable automatic retries
max_retriesintMaximum retry attempts
initialization_timeoutfloatServer init timeout

MCPServerConfig Options

FieldTypeDescription
namestrServer identifier
commandstrCommand to run (STDIO)
argslist[str]Command arguments (STDIO)
envdict[str, str]Environment variables (STDIO)
urlstrServer URL (HTTP/SSE)
headersdict[str, str]HTTP headers (HTTP/SSE)

Mixing Local and Remote Tools

You can use local tools and MCP tools in the same processor:

from chuk_tool_processor import ToolProcessor, register_tool, initialize
from chuk_tool_processor.mcp import setup_mcp_http_streamable

# 1. Register local tools
@register_tool(name="local_calculator")
class Calculator:
    async def execute(self, a: int, b: int) -> int:
        return a + b

await initialize()

# 2. Add remote MCP tools
processor, manager = await setup_mcp_http_streamable(
    servers=[{
        "name": "notion",
        "url": "https://mcp.notion.com/mcp",
        "headers": {"Authorization": f"Bearer {token}"}
    }],
    namespace="notion"
)

# 3. Use both in the same call
async with processor:
    results = await processor.process('''
        <tool name="local_calculator" args='{"a": 5, "b": 3}'/>
        <tool name="notion.search_pages" args='{"query": "docs"}'/>
    ''')