E4 Integration Bridges -- API Reference

April 8, 2026 ยท View on GitHub

Copyright 2026 Ryan Gillespie / Optitransfer. All rights reserved. Licensed under the Business Source License 1.1 (BSL-1.1). Patent: UK Application No. 2607132.4, GB2608127.3

E4 Integration Bridges -- API Reference

Integration bridges wire the E4 subsystem into the broader crdt-merge framework. For core E4 modules, see e4.md. For the resilience subsystem, see e4_resilience.md.


Quick Import

from crdt_merge.e4.integration import (
    initialize_defaults,
    get_trust_lattice,
    get_circuit_breaker,
    get_verifier,
    get_gossip_engine,
    get_stream_merge,
    get_agent_state,
    get_compat_controller,
    is_initialized,
    reset,
)
from crdt_merge.e4.integration.config import E4Config, get_config, set_config
from crdt_merge.e4.integration.gossip_bridge import TrustGossipEngine, TrustGossipPayload
from crdt_merge.e4.integration.stream_bridge import TrustStreamMerge, StreamChunk, ChunkResult
from crdt_merge.e4.integration.agent_bridge import TrustAgentState, TrustAnnotatedEntry

1. crdt_merge.e4.integration

System-wide bootstrap and component registry. The initialize_defaults() function is the single entry point called by crdt_merge/__init__.py on import.

initialize_defaults(config=None)

def initialize_defaults(config: Optional[E4Config] = None) -> None

Bootstrap the E4 subsystem with default component instances. Idempotent -- repeated calls are no-ops unless config differs from the current active configuration. Constructs and registers:

  • DeltaTrustLattice (peer_id "local")
  • TrustCircuitBreaker
  • TrustHomeostasis (if config.homeostasis_enabled)
  • AdaptiveVerificationController
  • CompatibilityController
  • TrustGossipEngine
  • TrustStreamMerge
  • TrustAgentState

Accessors

FunctionReturnsDescription
get_trust_lattice()DeltaTrustLatticeDefault trust lattice instance
get_circuit_breaker()TrustCircuitBreakerDefault circuit breaker
get_verifier()AdaptiveVerificationControllerDefault verification controller
get_gossip_engine()TrustGossipEngineDefault gossip engine
get_stream_merge()TrustStreamMergeDefault stream merge
get_agent_state()TrustAgentStateDefault agent state bridge
get_compat_controller()CompatibilityControllerDefault compatibility controller
is_initialized()boolWhether initialize_defaults has run
reset()NoneTear down default instances (for tests)

All accessors auto-call initialize_defaults() if not yet initialized.


2. crdt_merge.e4.integration.config

class E4Config

@dataclass
class E4Config:
    # Trust thresholds (ref 896-899)
    probation_trust: float = 0.5
    quarantine_threshold: float = 0.1
    low_trust_threshold: float = 0.4
    partial_trust_threshold: float = 0.8

    # Circuit breaker (ref 829)
    cb_window_size: int = 100
    cb_sigma_threshold: float = 2.0
    cb_cooldown_seconds: float = 30.0
    cb_min_samples: int = 10

    # Merkle tree (ref 850)
    merkle_branching_factor: int = 256

    # Compatibility (ref 855)
    compatibility_mode: str = "e4_only"

    # Adaptive verification (ref 895)
    verification_level_override: Optional[int] = None
    async_queue_limit: int = 1024

    # Homeostasis (ref 828)
    homeostasis_enabled: bool = True
    homeostasis_target_budget: Optional[float] = None

    # Delta management
    delta_max_history: int = 64

    # Gossip bridge
    gossip_include_trust_deltas: bool = True

    # Stream bridge
    stream_per_chunk_validation: bool = True
    stream_min_trust: float = 0.1

    # Agent bridge
    agent_trust_weight_context: bool = True

Unified runtime configuration for the E4 subsystem. Every field has a sensible default matching the specification.

MethodReturnsDescription
trust_thresholds()dict{"probation": ..., "quarantine": ..., "low": ..., "partial": ...}

Module Functions

FunctionParametersReturnsDescription
get_config()--E4ConfigCurrent global config (creates default if needed)
set_config(config)config: E4ConfigNoneReplace the global config
reset_config()--NoneReset to factory defaults

3. crdt_merge.e4.integration.gossip_bridge

Unified data + trust gossip engine (ref 1010-1020). Piggybacks trust deltas alongside data deltas in sync payloads.

class TrustGossipPayload

@dataclass
class TrustGossipPayload:
    data_deltas: List[ProjectionDelta]
    trust_deltas: List[ProjectionDelta]
    peer_id: str = ""

Wire payload bundling data and trust deltas.

Class Methods

MethodParametersDescription
TrustGossipPayload.enable_convergence_monitoring(peer_count=100, **kwargs)peer_count: intEnable convergence time monitoring via resilience.convergence_monitor
TrustGossipPayload.disable_convergence_monitoring()--Disable convergence monitoring

class TrustGossipEngine

class TrustGossipEngine:
    def __init__(
        self,
        trust_lattice: Optional[DeltaTrustLattice] = None,
        verifier: Optional[AdaptiveVerificationController] = None,
        state: Optional[object] = None,
    ) -> None

Gossip engine that propagates both data and trust deltas.

MethodParametersReturnsDescription
prepare_sync(data_deltas, *, include_trust=True)data_deltas: Sequence[ProjectionDelta]TrustGossipPayloadBuild a sync payload. When include_trust=True, drains pending trust deltas from the lattice.
receive_sync(payload)payload: TrustGossipPayloadTuple[List[ProjectionDelta], List[ProjectionDelta]]Process incoming payload. Returns (accepted_data, accepted_trust).
drain_outbound()--List[TrustGossipPayload]Drain prepared outbound payloads
bind_trust_lattice(lattice)lattice: DeltaTrustLatticeNoneLate-bind trust lattice
bind_verifier(verifier)verifier: AdaptiveVerificationControllerNoneLate-bind verifier
bind_state(state)state: objectNoneLate-bind application state

Properties

PropertyTypeDescription
pending_outboundintNumber of prepared but un-drained payloads

4. crdt_merge.e4.integration.stream_bridge

Trust-validated streaming merge (ref 1155-1156). Per-chunk PCO validation with trust-gated stream acceptance.

class StreamChunk

@dataclass
class StreamChunk:
    delta: ProjectionDelta
    sequence: int              # monotonic within stream
    stream_id: str = ""

class ChunkResult

@dataclass(frozen=True)
class ChunkResult:
    accepted: bool
    sequence: int
    reason: str = ""

class TrustStreamMerge

class TrustStreamMerge:
    def __init__(
        self,
        verifier: Optional[AdaptiveVerificationController] = None,
        state: Optional[object] = None,
        *,
        min_trust: float = 0.1,
    ) -> None

Streaming merge with per-chunk trust validation. Streams from quarantined peers (trust < min_trust) are rejected outright.

MethodParametersReturnsDescription
accept_stream(peer_id, stream_id, trust_lattice)peer_id: str, stream_id: str, trust_lattice: objectboolGate: accept or reject a stream based on peer trust
validate_chunk(chunk, trust_lattice=None)chunk: StreamChunk, trust_lattice: objectChunkResultValidate a single chunk via adaptive verification
validate_stream(chunks, trust_lattice=None)chunks: Sequence[StreamChunk], trust_lattice: objectList[ChunkResult]Validate all chunks in order. Stops early on failure.
stream_results(stream_id)stream_id: strList[ChunkResult]Validation history for a stream
active_stream_ids()--List[str]Currently tracked streams
close_stream(stream_id)stream_id: strNoneRemove stream from tracking
bind_verifier(verifier)verifier: AdaptiveVerificationControllerNoneLate-bind verifier
bind_state(state)state: objectNoneLate-bind application state

5. crdt_merge.e4.integration.agent_bridge

Trust-aware agent state bridge (ref 1157). Agent memory entries are CRDTs with trust annotations.

class TrustAnnotatedEntry

@dataclass
class TrustAnnotatedEntry:
    key: str
    value: Any
    peer_id: str
    trust_at_write: float
    timestamp: float = 0.0

Single memory entry with trust provenance.

class TrustAgentState

class TrustAgentState:
    def __init__(
        self,
        trust_lattice: Optional[DeltaTrustLattice] = None,
        *,
        trust_weight_context: bool = True,
    ) -> None

Agent memory as a CRDT with trust provenance. When trust_weight_context is enabled, conflicts resolve in favour of the higher-trust peer; on equal trust, the later timestamp wins (LWW fallback).

MethodParametersReturnsDescription
get(key)key: strOptional[TrustAnnotatedEntry]Retrieve an entry
put(key, value, peer_id, *, timestamp=0.0)key: str, value: Any, peer_id: strTrustAnnotatedEntryWrite an entry. Trust is looked up from the lattice. Conflicts resolved automatically.
delete(key)key: strOptional[TrustAnnotatedEntry]Remove an entry
merge_context(other)other: TrustAgentStateTrustAgentStateCRDT merge of two agent states
snapshot()--Dict[str, TrustAnnotatedEntry]Export all entries
load_snapshot(entries)entries: Dict[str, TrustAnnotatedEntry]NoneRestore from a snapshot
ranked_entries()--List[TrustAnnotatedEntry]Entries sorted by trust at write time (descending)
peer_contributions()--Dict[str, int]Count of entries per peer
bind_trust_lattice(lattice)lattice: DeltaTrustLatticeNoneLate-bind trust lattice

Properties

PropertyTypeDescription
sizeintNumber of entries

crdt-merge v0.9.5 -- April 2026