Temporal Plasmate

April 12, 2026 ยท View on GitHub

Reliable AI agent workflows with Plasmate browser engine and Temporal workflow orchestration.

Why Temporal + Plasmate?

  • 10-100x token compression: Plasmate converts HTML to Semantic Object Model (SOM) JSON
  • Reliable execution: Temporal provides retries, timeouts, and resumable workflows
  • Enterprise-ready: Used by Stripe, Netflix, Datadog for mission-critical automation

Installation

pip install temporal-plasmate

Prerequisites:

Quick Start

1. Start the Worker

# Start Temporal (if not running)
temporal server start-dev

# Start Plasmate worker
python -m temporal_plasmate.worker

2. Execute Workflows

import asyncio
from temporal_plasmate import PlasmateTemporal

async def main():
    async with PlasmateTemporal() as client:
        # Research a topic across multiple sources
        result = await client.research(
            topic="AI agent architectures",
            urls=[
                "https://docs.anthropic.com/agents",
                "https://langchain.com/docs",
            ],
            max_depth=1,
        )
        
        print(f"Researched {result.total_pages} pages")
        for page in result.results:
            print(f"  - {page['url']}")

asyncio.run(main())

Pre-built Workflows

WebResearchWorkflow

Research a topic across multiple URLs with automatic link following.

result = await client.research(
    topic="kubernetes security",
    urls=["https://kubernetes.io/docs/concepts/security/"],
    max_depth=2,              # Follow links 2 levels deep
    extract_text_only=True,   # Just text, not full SOM
)

SiteCrawlWorkflow

Crawl an entire site with depth and page limits.

result = await client.crawl(
    start_url="https://docs.example.com",
    max_depth=3,
    max_pages=100,
    same_domain_only=True,
)

MonitorWorkflow

Monitor URLs for changes over time.

result = await client.monitor(
    urls=["https://status.example.com"],
    check_interval_seconds=30,
    duration_seconds=3600,  # Monitor for 1 hour
)

for change in result.changes:
    print(f"Change detected at {change.url}: {change.change_type}")

ContentPipelineWorkflow

Fetch, extract, and transform content in a pipeline.

result = await client.pipeline(
    url="https://example.com/article",
    extract_text=True,
    extract_links=True,
)

print(result.text)
print(f"Found {len(result.links)} links")

Activities

Use individual activities for custom workflows:

from temporalio import workflow
from temporal_plasmate.activities import fetch_page, extract_text, batch_fetch

@workflow.defn
class MyCustomWorkflow:
    @workflow.run
    async def run(self, urls: list[str]):
        # Fetch multiple pages
        results = await workflow.execute_activity(
            batch_fetch,
            urls,
            start_to_close_timeout=timedelta(minutes=5),
        )
        
        # Process results...
        return {"fetched": len(results)}

Available Activities

ActivityDescription
fetch_page(url)Fetch URL and return SOM JSON
extract_text(url)Extract readable text only
extract_links(url)Extract all links from page
batch_fetch(urls)Fetch multiple URLs concurrently
wait_for_content(url, selector, timeout)Poll until content appears

Worker Configuration

from temporal_plasmate import create_worker
from temporalio.client import Client

async def run():
    client = await Client.connect("localhost:7233")
    
    worker = await create_worker(
        client,
        task_queue="plasmate-queue",
        max_concurrent_activities=20,     # Parallel fetches
        max_concurrent_workflow_tasks=10, # Parallel workflows
    )
    
    await worker.run()

Sync Client

For non-async environments:

from temporal_plasmate import PlasmateSyncClient

with PlasmateSyncClient() as client:
    result = client.crawl(
        start_url="https://example.com",
        max_pages=10,
    )

Enterprise Use Cases

Documentation Indexing

# Crawl and index documentation for RAG
result = await client.crawl(
    start_url="https://docs.company.com",
    max_pages=1000,
)

for page in result.pages:
    # Index page['som'] in vector database
    vector_db.upsert(page['url'], page['som'])

Competitive Intelligence

# Monitor competitor pricing pages
result = await client.monitor(
    urls=[
        "https://competitor1.com/pricing",
        "https://competitor2.com/pricing",
    ],
    check_interval_seconds=3600,  # Hourly
    duration_seconds=86400 * 7,   # 1 week
)

Compliance Monitoring

# Research regulatory updates
result = await client.research(
    topic="GDPR compliance updates",
    urls=[
        "https://gdpr.eu/news/",
        "https://ec.europa.eu/data-protection/",
    ],
    max_depth=2,
)

Architecture

+-------------------+     +------------------+     +------------+
|  Your Application | --> | Temporal Server  | --> |   Worker   |
+-------------------+     +------------------+     +------------+
                                                         |
                                                         v
                                                  +------------+
                                                  |  Plasmate  |
                                                  +------------+
                                                         |
                                                         v
                                                  +------------+
                                                  |    Web     |
                                                  +------------+
  • Application: Starts workflows via Temporal client
  • Temporal Server: Orchestrates execution, handles retries
  • Worker: Runs activities using Plasmate CLI
  • Plasmate: Fetches and converts HTML to SOM

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Type check
mypy src/

# Format
black src/
ruff check src/ --fix

License

MIT