Asynchronous Processing in OpenContracts
May 5, 2026 · View on GitHub
OpenContracts uses Celery for distributed task processing and Django signals for event-driven architecture. This document covers both systems and how they interact.
Celery Task Queue
OpenContracts makes extensive use of Celery, a powerful Python framework for distributed and asynchronous processing. The docker compose stack includes dedicated celeryworkers to handle computationally-intensive and long-running tasks.
Common Task Types
| Task Category | Examples |
|---|---|
| Document Processing | Parsing PDFs, extracting text, generating thumbnails |
| Embeddings | Creating vector embeddings for semantic search |
| Analysis | Running analyzers on documents |
| Extraction | Executing fieldset-based data extraction |
| Agent Actions | Running AI agents on documents |
| Export/Import | Creating and importing corpus exports |
Delivery semantics & task idempotency (Issue #1493)
OpenContracts configures Celery for at-least-once delivery via two global
settings (config/settings/base.py):
CELERY_TASK_ACKS_LATE = True
CELERY_TASK_REJECT_ON_WORKER_LOST = True
The broker only removes a message after the task returns successfully, and
hard-kills (SIGKILL, OOM, host loss, deploy eviction) cause the broker to
requeue the message rather than silently treat it as done. Without these,
long-running ingest/parse/embed tasks could die mid-flight and leave documents
stuck with backend_lock=True and no parsed content.
All Celery tasks in this project MUST be idempotent. Running the same task twice on the same input must not corrupt state, double-count, or produce duplicate side effects.
When you write a new task, follow these patterns:
| Concern | Pattern |
|---|---|
| Creating DB rows | Model.objects.get_or_create(...) or update_or_create keyed on a deterministic field |
| External webhooks / non-idempotent APIs | Pass an idempotency key derived from the task arguments, or guard with a "did we already do this?" check |
| Counters / accumulators | Use SQL UPDATE ... SET x = <absolute value> rather than x = x + 1, or use a deduplication key |
| Multi-step state transitions | Re-check the entry-state at the top of the task; bail early if already in the target state (this is how ingest_doc and set_doc_lock_state already behave) |
| Truly non-idempotent work that cannot be guarded | Opt out per-task: @shared_task(acks_late=False, reject_on_worker_lost=False). Document why in a comment. |
If you cannot make a task idempotent, prefer the per-task opt-out over reverting the global default — the global default protects the long-running document processing pipeline that motivated the change.
Redis visibility timeout
OpenContracts uses Redis as the Celery broker. Redis tracks unacknowledged
messages with a visibility timeout: once a worker pulls a message, the broker
considers it eligible for redelivery to a different worker after the timeout
elapses, regardless of whether the original worker is still alive. With
task_acks_late=True, a task that runs longer than the visibility timeout will
be redelivered while still executing — a guaranteed double execution even
without any worker crash.
To prevent this, CELERY_BROKER_TRANSPORT_OPTIONS = {"visibility_timeout": 12 * 60 * 60}
sets the timeout to 12 hours, longer than any expected document-processing
task in this codebase. If you add a task that legitimately runs longer than
12 hours, split it into smaller chunks rather than raising the timeout
further — a long timeout directly delays redelivery after a real worker
death.
Reference: Redis broker visibility timeout.
Known non-idempotent tasks
The shift to at-least-once delivery exposes pre-existing tasks that create rows unconditionally. Until they are made idempotent, a worker death mid-task on these paths can produce duplicate database rows on retry:
| Task | File | Risk |
|---|---|---|
process_corpus_action (extract path) | opencontractserver/tasks/corpus_tasks.py | Duplicate Datacell rows |
process_thread_corpus_action | opencontractserver/tasks/corpus_tasks.py | Duplicate CorpusActionExecution |
process_message_corpus_action | opencontractserver/tasks/corpus_tasks.py | Duplicate CorpusActionExecution |
generate_agent_response | opencontractserver/tasks/agent_tasks.py | Duplicate ChatMessage |
| Bulk import paths | opencontractserver/tasks/import_tasks.py (raw .create() calls) | Duplicate imported objects |
In practice, real worker deaths during these tasks are rare and the duplicate
rows are recoverable. Hardening these paths (typically by switching to
get_or_create keyed on a deterministic field, or by claiming a
pre-allocated row at task start) is tracked separately. Treat any new task
as idempotent-by-construction; do not extend this list.
Queue Management
If your Celery queue gets clogged due to unexpected issues or high volume, you can purge it:
docker compose -f local.yml run django celery -A config.celery_app purge
Warning: Purging the queue can cause issues:
- Documents may lack PAWLs token layers (not annotatable)
- Corpus actions may not trigger
- In such cases, delete and re-upload affected documents
Django Signals
OpenContracts uses Django signals for event-driven processing. Key signals include:
Document Processing Signals
Location: opencontractserver/documents/signals.py
post_save on Document (Creation)
When a document is created, triggers the processing pipeline:
@receiver(post_save, sender=Document)
def process_doc_on_create_atomic(sender, instance, created, **kwargs):
if created:
# Chain: thumbnail → parse → unlock
transaction.on_commit(lambda: chain(
extract_thumbnail.si(doc_id=instance.id),
ingest_doc.si(user_id=instance.creator_id, doc_id=instance.id),
set_doc_lock_state.si(locked=False, doc_id=instance.id),
).apply_async())
document_processing_complete (Custom Signal)
Fired when document processing finishes (from set_doc_lock_state):
# Definition
document_processing_complete = Signal() # provides: document, user_id
# Fired in set_doc_lock_state task
if not locked:
document_processing_complete.send(
sender=Document,
document=document,
user_id=document.creator_id,
)
Corpus Action Signals
Location: opencontractserver/corpuses/signals.py
Direct Invocation (No M2M Signals)
Note: The
Corpus.documentsM2M field has been removed (Issue #835). Corpus action triggering is now handled directly inCorpus.add_document(),import_document(), andset_doc_lock_state()— not via signals.
When a document is added to a corpus via Corpus.add_document(), actions are triggered
directly if the document is ready (backend_lock=False). Locked documents are handled
by set_doc_lock_state() when processing completes.
set_doc_lock_state() — Deferred Action Handler
Triggers deferred corpus actions when document processing completes:
@receiver(document_processing_complete)
def handle_document_processing_complete(sender, document, user_id, **kwargs):
corpuses = Corpus.objects.filter(documents=document)
for corpus in corpuses:
process_corpus_action.si(...).apply_async()
Document Processing Pipeline
When a document is uploaded, it goes through a processing pipeline:
┌─────────────────────────────────────────────────────────────────┐
│ DOCUMENT PROCESSING PIPELINE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Document Created │
│ └─► backend_lock = True │
│ │
│ 2. post_save Signal Fires │
│ └─► Chains processing tasks │
│ │
│ 3. extract_thumbnail Task │
│ └─► Generates preview image │
│ │
│ 4. ingest_doc Task │
│ └─► Parses document (Docling/LlamaParse) │
│ └─► Extracts text layers │
│ └─► Creates PAWLs tokens │
│ │
│ 5. set_doc_lock_state Task │
│ └─► backend_lock = False │
│ └─► processing_finished = now() │
│ └─► Fires document_processing_complete signal │
│ │
│ 6. Corpus Actions Triggered (if doc in corpus) │
│ └─► Fieldset extractions │
│ └─► Analyzer analyses │
│ └─► Agent actions │
│ │
└─────────────────────────────────────────────────────────────────┘
Deferred Action Architecture
Corpus actions wait for document processing to complete before executing. This is critical for agent-based actions that need access to parsed document content.
Why Deferred?
- Agent tools like
load_document_textrequire parsed content - Embedding-based search requires vector embeddings
- Thumbnail previews should be available
How It Works
| Document State | M2M Signal Behavior | Processing Complete Behavior |
|---|---|---|
backend_lock=True | Skipped | Triggers actions |
backend_lock=False | Triggers immediately | N/A |
Timing
-
New upload to corpus:
- M2M signal fires → document locked → skipped
- Processing completes → signal fires → actions trigger
-
Existing doc added to corpus:
- M2M signal fires → document unlocked → triggers immediately
Signal Registration
Signals must be imported in the app's ready() method:
# opencontractserver/corpuses/apps.py
class CorpusesConfig(AppConfig):
def ready(self):
from opencontractserver.corpuses import signals # noqa: F401
# opencontractserver/documents/apps.py
class DocumentsConfig(AppConfig):
def ready(self):
from opencontractserver.documents import signals # noqa: F401
Monitoring
Flower Dashboard
Access Celery monitoring at http://localhost:5555 (when running locally).
Logging
Key log patterns for debugging:
| Pattern | Component |
|---|---|
[set_doc_lock_state] | Document processing completion |
[CorpusSignal] | Corpus action triggering |
[AgentCorpusAction] | Agent action execution |
process_corpus_action() | Action task processing |