Claim Processing Workflow

March 5, 2026 · View on GitHub

This document describes the claim processing workflow — the v2 orchestration layer that drives the end-to-end user experience. The workflow is implemented in the ContentProcessorWorkflow service using an Agent Framework's Workflow Engine, a DAG-based event-streaming execution model.

For the individual document extraction pipeline (Extract → Map → Evaluate → Save), see Processing Pipeline Approach.


End-to-End Flow

flowchart LR
    UI["🖥️ Web UI<br/>Upload files to claim"]
    API["📡 ContentProcessor API<br/>/claimprocessor/*"]
    QUEUE1[("claim-process-<br/>queue")]
    WF["⚙️ ContentProcessorWorkflow<br/>Agent Framework<br/>Workflow Engine"]

    UI -->|"HTTP — upload files,<br/>create claim"| API
    API -->|"Create manifest in Blob<br/>& enqueue message"| QUEUE1
    QUEUE1 -->|Dequeue| WF

    subgraph STAGE1 ["Stage 1 — Document Processing"]
        direction TB
        LOOP["For each document in the claim"]
        SUBMIT["ContentProcessor API<br/>/contentprocessor/submit"]
        QUEUE2[("content-pipeline-<br/>extract-queue")]
        CP["ContentProcessor — 4-stage pipeline<br/>Extract → Map → Evaluate → Save"]
        POLL["Poll for completion &<br/>collect extraction results"]

        LOOP --> SUBMIT --> QUEUE2 --> CP --> POLL
    end

    subgraph STAGE2 ["Stage 2 — RAI Analysis"]
        RAI["🤖 Azure OpenAI GPT<br/>Responsible AI safety gate<br/>Blocks unsafe content"]
    end

    subgraph STAGE3 ["Stage 3 — Summarizing"]
        SUM["🤖 Azure OpenAI GPT<br/>Consolidated summary<br/>across all documents"]
    end

    subgraph STAGE4 ["Stage 4 — Gap Analysis"]
        GAP["🤖 Azure OpenAI GPT<br/>+ YAML Rules DSL<br/>Missing docs & discrepancies"]
    end

    WF --> STAGE1 --> STAGE2 --> STAGE3 --> STAGE4

    COSMOS[("🗄️ Cosmos DB")]
    RESULT["🖥️ Web UI<br/>View Results"]

    STAGE4 -->|Persist results| COSMOS --> RESULT

Key Interaction Points

StepActorTargetMechanism
1Web UIContentProcessor APIHTTP — upload files, create claim, start processing
2ContentProcessor APIAzure Storage QueueEnqueues message to claim-process-queue
3ContentProcessorWorkflowAzure Storage QueueDequeues from claim-process-queue
4ContentProcessorWorkflow (DocumentProcessExecutor)ContentProcessor APIHTTP — submits each document via /contentprocessor/submit, polls for completion
5ContentProcessor APIAzure Storage QueueEnqueues to content-pipeline-extract-queue
6ContentProcessorAzure Storage QueueDequeues from content-pipeline-extract-queue, runs 4-stage pipeline
7ContentProcessorWorkflow (RAIExecutor)Azure OpenAI GPT-5.1Responsible AI safety check — blocks pipeline if content is unsafe
8ContentProcessorWorkflow (SummarizeExecutor)Azure OpenAI GPT-5.1Generates summary from extraction results
9ContentProcessorWorkflow (GapExecutor)Azure OpenAI GPT-5.1Performs gap analysis from extraction results
10ContentProcessorWorkflowAzure Cosmos DBPersists claim results (status, summary, gaps)

Agent Framework Workflow Engine

The ContentProcessorWorkflow is built on the Agent Framework — a DAG-based workflow engine with an event-streaming execution model. This is distinct from the simple sequential pipeline used in ContentProcessor.

WorkflowBuilder API

The framework provides a fluent builder API for constructing workflow graphs:

from agent_framework import WorkflowBuilder, Workflow

workflow: Workflow = (
    WorkflowBuilder()
    .register_executor(lambda: DocumentProcessExecutor(...), name="document_processing")
    .register_executor(lambda: RAIExecutor(...),             name="rai_analysis")
    .register_executor(lambda: SummarizeExecutor(...),       name="summarizing")
    .register_executor(lambda: GapExecutor(...),             name="gap_analysis")
    .set_start_executor("document_processing")
    .add_edge("document_processing", "rai_analysis")
    .add_edge("rai_analysis", "summarizing")
    .add_edge("summarizing", "gap_analysis")
    .build()
)
MethodPurpose
register_executor(factory, name)Registers a lazy factory that produces an Executor subclass, keyed by a string name
set_start_executor(name)Designates the entry-point node in the DAG
add_edge(source, target)Defines a directed edge — output of source flows to target
build()Validates the graph and returns a frozen Workflow object

Executor Pattern

Each stage is an Executor subclass with a @handler-decorated async method:

from agent_framework import Executor, WorkflowContext, handler

class SummarizeExecutor(Executor):
    @handler
    async def run(self, context: WorkflowContext, input_data: str) -> None:
        # ... process input_data ...
        await context.send("summarizing", result)   # forward to next executor
        # OR
        await context.emit(final_result)            # emit final workflow output
Context MethodPurpose
context.send(name, data)Forwards data to the next executor in the edge graph
context.emit(data)Emits final workflow output (only the last executor should call this)
context.stateShared key-value store visible across all executors in the same run

Event Streaming

The workflow emits events during execution that are used for status tracking:

EventWhen FiredUsed For
WorkflowStartedEventWorkflow beginsInitialize claim status
ExecutorInvokedEventAn executor startsUpdate Cosmos DB status (e.g., Processing, Summarizing, GapAnalysis)
ExecutorCompletedEventAn executor finishesLog completion, timing
ExecutorFailedEventAn executor errorsMark claim as Failed, store error message
WorkflowOutputEventFinal output producedMark claim as Completed
WorkflowFailedEventWorkflow failsMark claim as Failed

Four Workflow Stages

Stage 1: Document Processing (DocumentProcessExecutor)

Purpose: Submit all documents in the claim to ContentProcessor (via the API) and collect extraction results.

Steps:

  1. Downloads the claim manifest (JSON) from Azure Blob Storage (process-batch container)
  2. Creates a claimprocess record in Cosmos DB with status Pending
  3. For each document in the manifest (concurrently, up to 4):
    • Downloads the file from blob storage
    • POSTs it to ContentProcessor API (/contentprocessor/submit) with schema assignment
    • ContentProcessor API enqueues to content-pipeline-extract-queue
    • ContentProcessor picks up the message and runs the 4-stage pipeline (Extract → Map → Evaluate → Save)
    • Polls the API Operation-Location URL until terminal status
    • Fetches final output with extraction score and schema score
    • Upserts per-file result records into Cosmos DB
  4. Aggregates all document results and forwards to the next stage

Stage 2: RAI Analysis (RAIExecutor)

Purpose: Responsible AI safety gate — screen all extracted document content against safety rules before further processing.

Steps:

  1. Reads document processing results from the shared workflow context
  2. For each successfully processed document:
    • PDFs: Fetches extraction steps and extracts markdown content
    • Images (PNG/JPG): Fetches map steps and extracts mapped content
  3. Concatenates all document text and sends to an LLM-based safety classifier
  4. Loads the RAI prompt template (from src/ContentProcessorWorkflow/src/steps/rai/prompt/)
  5. Invokes Azure OpenAI GPT-5.1 (temperature=0.1, top_p=0.1) with structured RAIResponse output
  6. If content is flagged as unsafe (IsNotSafe == true), halts the workflow with an error — the claim is marked Failed
  7. If content passes, forwards result to the next stage

Safety Rules: The classifier evaluates content against 10 rules covering self-harm, violence, illegal activities, discrimination, sexual content, medical information, profanity, prompt injection, embedded system commands, and spam. Rules are defined in the prompt template and can be customized. See Customize System Prompts.

Stage 3: Summarizing (SummarizeExecutor)

Purpose: Generate an AI-powered consolidated summary across all processed documents.

Steps:

  1. Reads document processing results from the shared workflow context
  2. For each successfully processed document:
    • PDFs: Fetches extraction steps and extracts markdown content
    • Images (PNG/JPG): Fetches map steps and extracts mapped content
  3. Builds a chat message with a prompt template (from src/ContentProcessorWorkflow/src/steps/summarize/prompt/)
  4. Invokes Azure OpenAI GPT-5.1 (temperature=0.1, top_p=0.1) for deterministic output
  5. Persists the summary to Cosmos DB
  6. Forwards result to the next stage

Stage 4: Gap Analysis (GapExecutor)

Purpose: Perform gap analysis across all claim documents — identifying missing/insufficient documents and flagging cross-document discrepancies.

Steps:

  1. Reads document processing results from the shared workflow context
  2. Fetches full processed JSON for each completed document
  3. Loads a prompt template and injects YAML rules DSL (gap analysis rules)
  4. Invokes Azure OpenAI GPT-5.1 with the rule-injected prompt
  5. Persists gap analysis results to Cosmos DB
  6. Emits the final workflow output (terminates the pipeline)

DSL-Based Ruleset: Gap analysis rules — both missing document checks and cross-document discrepancy detection — are defined in a reusable YAML-based Domain-Specific Language (DSL), not hard-coded in Python. Domain experts can add, modify, or replace gap rules and cross-document discrepancy checks by editing a single YAML file. The same DSL format is portable across industries (insurance, logistics, legal, finance). For a comprehensive guide on the DSL structure, expression language, and how to adapt it for other domains, see Gap Analysis Ruleset Guide.


Queue-Driven Execution Model

Queue Architecture

QueuePurposeProducerConsumer
claim-process-queueClaim workflow requestsContentProcessor APIContentProcessorWorkflow
claim-process-dead-letter-queueFailed messages after retry exhaustionContentProcessorWorkflowManual / ops review
content-pipeline-extract-queueIndividual document extraction jobsContentProcessor APIContentProcessor

Polling & Concurrency

  • Long-poll loop: Each worker continuously polls the queue with configurable intervals
  • Concurrent workers: CONCURRENT_WORKERS async worker loops (default: 1), each processes one claim at a time
  • Poll interval: POLL_INTERVAL_SECONDS (default: 5s) sleep when no messages
  • Visibility timeout: VISIBILITY_TIMEOUT_MINUTES (default: 30 min) — renewed at ~60% interval to prevent re-delivery during long processing

Retry & Dead-Letter Logic

ScenarioBehavior
Attempt < max retries (default: 3)Visibility timeout shortened to RETRY_VISIBILITY_DELAY_SECONDS (default: 5s), message re-enters queue quickly
Final attempt (dequeue count ≥ max)Payload sent to claim-process-dead-letter-queue, output blobs cleaned up, original message deleted
Dead-letter send failsOriginal message visibility extended (≥60s); message is NOT deleted to prevent silent data loss
Malformed messagesImmediately dead-lettered regardless of attempt count
Process termination (SIGINT/SIGTERM)Queue message deleted, output blobs cleaned up, task cancelled gracefully

Status Tracking

Claim processing status is tracked in Cosmos DB (claimprocesses collection) through these stages:

stateDiagram-v2
    direction LR
    [*] --> Pending
    Pending --> Processing
    Processing --> RAIAnalysis
    Processing --> Failed
    RAIAnalysis --> Summarizing
    RAIAnalysis --> Failed
    Summarizing --> GapAnalysis
    Summarizing --> Failed
    GapAnalysis --> Completed
    GapAnalysis --> Failed
    Completed --> [*]
StatusWhen SetBy
PendingClaim created, message enqueuedContentProcessor API
ProcessingExecutorInvokedEvent for document_processingDocumentProcessExecutor
RAIAnalysisExecutorInvokedEvent for rai_analysisRAIExecutor
SummarizingExecutorInvokedEvent for summarizingSummarizeExecutor
GapAnalysisExecutorInvokedEvent for gap_analysisGapExecutor
CompletedWorkflowOutputEvent receivedWorkflow engine
FailedExecutorFailedEvent or WorkflowFailedEventWorkflow engine (+ error message stored)

Processing duration (elapsed wall-clock time) is always persisted in the finally block.