E4 Core Modules -- API Reference

April 8, 2026 ยท View on GitHub

Copyright 2026 Ryan Gillespie / Optitransfer. All rights reserved. Licensed under the Business Source License 1.1 (BSL-1.1). Patent: UK Application No. 2607132.4, GB2608127.3

E4 Core Modules -- API Reference

Complete reference for the core modules in crdt_merge.e4. For integration bridges, see e4_integration.md. For the resilience subsystem, see e4_resilience.md.


Quick Import

from crdt_merge.e4.typed_trust import TypedTrustScore, TrustHomeostasis, TRUST_DIMENSIONS
from crdt_merge.e4.proof_evidence import TrustEvidence
from crdt_merge.e4.pco import AggregateProofCarryingOperation, SubtreeRef
from crdt_merge.e4.projection_delta import ProjectionDelta, ProjectionDeltaManager, FrozenDict
from crdt_merge.e4.trust_weighted_strategy import (
    TrustWeightedStrategySelector,
    TrustWeightedLWWResolver,
    TrustWeightedAveragingResolver,
    TrustGatedAcceptanceFilter,
)
from crdt_merge.e4.delta_trust_lattice import DeltaTrustLattice, TrustCircuitBreaker
from crdt_merge.e4.trust_bound_merkle import TrustBoundMerkle
from crdt_merge.e4.causal_trust_clock import CausalTrustClock
from crdt_merge.e4.adaptive_verification import AdaptiveVerificationController
from crdt_merge.e4.compatibility import CompatibilityController, CompatibilityMode

1. crdt_merge.e4.typed_trust

Multi-dimensional typed trust scores with homeostatic normalization (ref 820).

Constants

ConstantTypeValueDescription
TRUST_DIMENSIONSFrozenSet[str]{"integrity", "causality", "consistency", "gossip", "model", "context"}Six trust dimensions
PROBATION_TRUSTfloat0.5Default trust for new/unknown peers
QUARANTINE_THRESHOLDfloat0.1Below this: rejected (Level 3)
LOW_TRUST_THRESHOLDfloat0.4Below this: full PCO verification (Level 2)
PARTIAL_THRESHOLDfloat0.8Above this: signature only (Level 0)

class TypedTrustScore

@dataclass
class TypedTrustScore:
    _evidence: Dict[str, Dict[str, float]]  # dimension -> {observer -> amount}

Multi-dimensional trust score backed by GCounter evidence per observer. CRDT merge is element-wise max per dimension per observer.

Class Methods

MethodReturnsDescription
TypedTrustScore.probationary()TypedTrustScoreScore at probation level (empty evidence)
TypedTrustScore.full_trust()TypedTrustScoreScore with zero violations (empty evidence)

Instance Methods

MethodParametersReturnsDescription
trust_for_dimension(dimension)dimension: strfloatTrust in a specific dimension [0.0, 1.0]. Defaults to PROBATION_TRUST if no evidence.
overall_trust()--floatWeighted mean across all six dimensions
verification_level()--intAdaptive immune level: 0 (>0.8), 1 (0.4-0.8), 2 (<0.4), 3 (<0.1)
record_evidence(observer, dimension, amount, proof)observer: str, dimension: str, amount: float, proof: objectTypedTrustScoreRecord verified negative evidence. Returns a new instance. Raises ValueError if proof.verify() fails.
merge(other)other: TypedTrustScoreTypedTrustScoreCRDT merge: element-wise max per dimension per observer
serialize()--bytesCompact deterministic serialization for Merkle binding
hash()--strSHA-256 hex digest of serialized trust vector

class TrustHomeostasis

class TrustHomeostasis:
    def __init__(self, *, target_budget: Optional[float] = None) -> None

Conserved-budget trust normalization (ref 828). Ensures total trust across all peers is bounded, preventing inflation attacks in long-running clusters.

MethodParametersReturnsDescription
normalize(scores)scores: Dict[str, TypedTrustScore]Dict[str, TypedTrustScore]Normalize scores so total budget is conserved
set_target_budget(budget)budget: floatNoneOverride the target budget
current_budget()--floatCurrent total trust budget across all peers

2. crdt_merge.e4.proof_evidence

Cryptographic trust evidence with proof verification.

class TrustEvidence

@dataclass(frozen=True)
class TrustEvidence:
    observer: str
    target: str
    evidence_type: str
    dimension: str
    amount: float
    proof: bytes
    proof_type: str
    timestamp: float

Verifiable evidence of peer misbehaviour.

Class Methods

MethodParametersReturnsDescription
TrustEvidence.create(observer, target, evidence_type, dimension, amount, proof)observer: str, target: str, evidence_type: str, dimension: str, amount: float, proof: bytesTrustEvidenceBuild evidence with auto-detected proof type

Instance Methods

MethodReturnsDescription
verify()boolIndependently verify the proof
pack()bytesCompact wire serialization

Evidence Types

TypeDimensionProof Format
"equivocation"integrityTwo conflicting signed operations
"merkle_divergence"integrityMerkle path mismatch
"invalid_delta"integrityExpected hash + delta bytes
"clock_regression"causalityClock snapshot pair
"gossip_stale"gossipStale gossip entry

Utility Functions

FunctionParametersReturnsDescription
pack_trust_evidence(evidence)evidence: TrustEvidencebytesSerialize evidence for wire transport
unpack_trust_evidence(data)data: bytesTrustEvidenceDeserialize evidence from wire bytes

3. crdt_merge.e4.pco

Aggregate proof-carrying operations (ref 880-886). Wire format: 64 B signature + 32 B aggregate hash + 32 B metadata = 128 bytes.

class SubtreeRef

@dataclass(frozen=True)
class SubtreeRef:
    path: Tuple[int, ...]   # index path from root, 0..B-1 per level
    depth: int              # depth in the Merkle tree
    old_hash: str           # hash before the operation
    new_hash: str           # hash after the operation

Reference to a subtree in the high-arity Merkle tree.

class AggregateProofCarryingOperation

@dataclass(frozen=True)
class AggregateProofCarryingOperation:
    originator_id: str
    signature: bytes          # 64 B
    aggregate_hash: str       # 32 B hex
    merkle_root: str
    clock_snapshot: bytes
    trust_vector_hash: str
    delta_bounds: Tuple[SubtreeRef, ...]
    version: int
    flags: int
    timestamp: float

128-byte aggregate proof covering integrity, causality, trust, and minimality.

Class Methods

MethodKey ParametersReturnsDescription
AggregateProofCarryingOperation.build(originator_id, signing_fn, merkle_root, clock_snapshot, trust_vector_hash, delta_bounds, *, version=1, flags=0)signing_fn: Callable[[str], bytes]AggregateProofCarryingOperationConstruct a PCO with computed hashes and signature
AggregateProofCarryingOperation.from_wire(data, originator_id, merkle_root, clock_snapshot, trust_vector_hash, delta_bounds)data: bytes (128 B)AggregateProofCarryingOperationDeserialize from wire format

Instance Methods

MethodParametersReturnsDescription
verify(state, trust_lattice, *, verification_level=2)state: object, trust_lattice: object, verification_level: intboolVerify at the given adaptive level (0-3)
to_wire()--bytes128-byte wire serialization

4. crdt_merge.e4.projection_delta

Sparse delta encoding via high-arity Merkle tree traversal.

class FrozenDict

class FrozenDict(dict):
    """Immutable dict for use in frozen dataclasses."""

Hashable, immutable dict subclass.

class ProjectionDelta

@dataclass(frozen=True)
class ProjectionDelta:
    source_id: str
    source_version: Optional[object]
    target_version: Optional[object]
    changed_subtrees: Tuple[SubtreeRef, ...]
    insertions: FrozenDict        # key -> bytes
    updates: FrozenDict           # key -> bytes
    deletions: FrozenSet[str]
    pco: AggregateProofCarryingOperation
    encoding: str = "raw"
    compression_ratio: float = 1.0

Sparse state change identified via O(log_B n) Merkle traversal.

Instance Methods

MethodParametersReturnsDescription
is_empty()--boolTrue if no insertions, updates, or deletions
content_hash()--strDeterministic SHA-256 over all fields
compose(other)other: ProjectionDeltaProjectionDeltaAssociative composition: delta_ab . delta_bc = delta_ac
compress(encoding)encoding: str ("sparse", "rle", "raw")ProjectionDeltaCompress the delta with the given encoding

class ProjectionDeltaManager

class ProjectionDeltaManager:
    def __init__(self, *, max_history: int = 64) -> None

Delta lifecycle management: recording, composition, and history.

MethodParametersReturnsDescription
record(delta)delta: ProjectionDeltaNoneAppend a delta to the history
latest(source_id)source_id: strOptional[ProjectionDelta]Most recent delta from a peer
compose_range(source_id, start, end)source_id: str, start: int, end: intProjectionDeltaCompose deltas in the given index range
peers()--List[str]All known peer IDs with recorded deltas
history(source_id)source_id: strList[ProjectionDelta]Full delta history for a peer

5. crdt_merge.e4.trust_weighted_strategy

Trust-weighted conflict resolution strategies (ref 870-874).

class ConflictType (Enum)

ValueDescription
NUMERICfloat/int values (routed to averaging)
OPAQUEbinary blobs (routed to LWW)
STRUCTUREDnested dicts/lists (routed to recursive)
COUNTERmonotone counters (routed to max)
SETset union/intersection (trust-weighted)

class ConflictEntry

@dataclass(frozen=True)
class ConflictEntry:
    peer_id: str
    value: Any
    timestamp: float
    trust: TypedTrustScore
    dimension: str = "integrity"

class ResolutionResult

@dataclass(frozen=True)
class ResolutionResult:
    resolved_value: Any
    confidence: float       # [0.0, 1.0]
    method: str
    contributors: Tuple[str, ...]
    rejected_peers: Tuple[str, ...]

class TrustWeightedLWWResolver

class TrustWeightedLWWResolver:
    def __init__(self, *, trust_weight_factor: float = 1.0) -> None

Last-writer-wins where effective timestamp = timestamp * (1 + trust_weight * factor).

MethodParametersReturnsDescription
resolve(entries)entries: Sequence[ConflictEntry]ResolutionResultResolve via trust-weighted LWW

class TrustWeightedAveragingResolver

class TrustWeightedAveragingResolver:
    def __init__(self) -> None

Numeric merge via trust-weighted average. Each peer's contribution is proportional to their typed trust score.

MethodParametersReturnsDescription
resolve(entries)entries: Sequence[ConflictEntry]ResolutionResultResolve via trust-weighted average

class TrustGatedAcceptanceFilter

class TrustGatedAcceptanceFilter:
    def __init__(
        self,
        *,
        thresholds: Optional[Dict[str, float]] = None,
        global_threshold: float = 0.4,
        strict_mode: bool = False,
    ) -> None

Pre-merge gate that rejects operations from peers below a per-dimension or global trust threshold.

MethodParametersReturnsDescription
filter_entries(entries)entries: Sequence[ConflictEntry]Tuple[List[ConflictEntry], List[ConflictEntry]]Returns (accepted, rejected)

class TrustWeightedStrategySelector

class TrustWeightedStrategySelector:
    def __init__(
        self,
        *,
        lww: Optional[TrustWeightedLWWResolver] = None,
        averaging: Optional[TrustWeightedAveragingResolver] = None,
        gate: Optional[TrustGatedAcceptanceFilter] = None,
    ) -> None

Meta-strategy that routes conflicts to the appropriate resolver based on ConflictType and trust topology.

MethodParametersReturnsDescription
resolve(entries, conflict_type)entries: Sequence[ConflictEntry], conflict_type: ConflictTypeResolutionResultSelect and execute the best resolver

6. crdt_merge.e4.delta_trust_lattice

Recursive trust lattice where trust propagates as projection deltas (ref 840-843).

class DeltaTrustLattice

class DeltaTrustLattice:
    def __init__(
        self,
        peer_id: str,
        *,
        circuit_breaker: Optional[TrustCircuitBreaker] = None,
        homeostasis: Optional[TrustHomeostasis] = None,
    ) -> None

Central trust store for a single node. Trust changes are deltas verified by the same pipeline as data.

MethodParametersReturnsDescription
get_trust(peer_id)peer_id: strTypedTrustScoreCurrent trust for a peer. Auto-initializes unknown peers at probation.
record_evidence(target, evidence)target: str, evidence: TrustEvidenceNoneRecord local evidence, update score, queue delta
observe_and_propagate(target, evidence, state)target: str, evidence: TrustEvidence, state: objectOptional[ProjectionDelta]Record + build a projection delta for gossip
receive_trust_delta(delta, state)delta: ProjectionDelta, state: objectboolReceive and verify an incoming trust delta
merge(other)other: DeltaTrustLatticeDeltaTrustLatticeCRDT merge of two lattices
drain_async_queue()--List[ProjectionDelta]Drain pending trust deltas for outbound gossip

Properties

PropertyTypeDescription
peer_idstrLocal peer identifier
known_peersSet[str]All peers with trust state

class TrustCircuitBreaker

class TrustCircuitBreaker:
    def __init__(
        self,
        *,
        window_size: int = 100,
        sigma_threshold: float = 2.0,
        cooldown_seconds: float = 30.0,
        min_samples: int = 10,
    ) -> None

Monitors trust change velocity. Trips when rate exceeds mean + sigma_threshold * std_dev, forcing Level 2 verification until cooldown expires (ref 829).

MethodParametersReturnsDescription
record_event(magnitude)magnitude: floatNoneRecord a trust mutation event
is_tripped()--boolWhether the breaker is currently tripped
event_count()--intTotal events recorded
current_rate()--floatEvents per second in the current window
reset()--NoneManually reset the breaker

7. crdt_merge.e4.trust_bound_merkle

High-arity Merkle tree where hashes incorporate trust context: H(data || trust_score || originator) (ref 850-855).

class TrustBoundMerkle

class TrustBoundMerkle:
    def __init__(
        self,
        trust_lattice: Optional[DeltaTrustLattice] = None,
        *,
        branching_factor: int = 256,
        compatibility_mode: bool = False,
    ) -> None

Branching factor 256: 1M params -> depth 3, 1B params -> depth 4, 1T params -> depth 5.

MethodParametersReturnsDescription
insert(key, data, originator)key: str, data: bytes, originator: strNoneInsert a leaf
root_hash()--strCurrent root hash
compute_leaf_hash(data, originator)data: bytes, originator: strstrE4 hash: H(data || trust || originator)
compute_leaf_hash_compat(data)data: bytesstrLegacy hash: H(data)
diff(other)other: TrustBoundMerkleList[SubtreeRef]Changed subtrees between two trees
verify_proof(key, proof)key: str, proof: List[str]boolVerify a Merkle inclusion proof

Class Methods

MethodDescription
TrustBoundMerkle.enable_domain_hashing()Enable domain-separated hashing for cross-component hash isolation
TrustBoundMerkle.disable_domain_hashing()Restore original hashing

8. crdt_merge.e4.causal_trust_clock

Vector clock where entries are (logical_time, trust_score) pairs (ref 860-863).

class CausalTrustClock

class CausalTrustClock:
    TRUST_OVERRIDE_FACTOR = 1.5

    def __init__(
        self,
        peer_id: str,
        trust_lattice: Optional[DeltaTrustLattice] = None,
    ) -> None

Low-trust peers cannot causally dominate high-trust peers even with higher logical time.

MethodParametersReturnsDescription
increment()--CausalTrustClockIncrement local clock with current trust. Returns new instance.
trust_weighted_compare(other)other: CausalTrustClockstrReturns "before", "after", "concurrent", or "trust_override"
merge(other)other: CausalTrustClockCausalTrustClockCRDT merge: element-wise max of (time, trust) pairs
serialize_compact()--bytesCompact binary representation for PCO embedding
deserialize_compact(data, peer_id, trust_lattice)data: bytes, peer_id: str, ...CausalTrustClockClass method. Reconstruct from compact bytes.
is_consistent_with(snapshot)snapshot: bytesboolCheck if a serialized snapshot is causally consistent
content_hash()--strSHA-256 of the compact serialization
bind_trust_lattice(lattice)lattice: DeltaTrustLatticeNoneLate-bind the trust lattice

Properties

PropertyTypeDescription
peer_idstrLocal peer identifier
logical_timeintLocal peer's logical time
entriesDict[str, Tuple[int, float]]Copy of all entries

9. crdt_merge.e4.adaptive_verification

Trust-tiered verification routing (ref 895).

Verification Levels

LevelTrust RangeCostAction
0> 0.8 (partial)O(1)Signature only
10.4 - 0.8O(1)Signature + Merkle root
2< 0.4 (low)O(k)Full aggregate PCO
3< 0.1 (quarantine)O(1)Unconditional reject

class VerificationResult

@dataclass(frozen=True)
class VerificationResult:
    accepted: bool
    level: int
    reason: str = ""
    async_pending: bool = False

class AdaptiveVerificationController

class AdaptiveVerificationController:
    def __init__(
        self,
        trust_lattice: Optional[DeltaTrustLattice] = None,
        circuit_breaker: Optional[TrustCircuitBreaker] = None,
        *,
        async_queue_limit: int = 1024,
    ) -> None

Routes incoming deltas to the appropriate verification level based on sender trust.

MethodParametersReturnsDescription
verify(delta, state, trust_lattice)delta: ProjectionDelta, state: object, trust_lattice: objectVerificationResultVerify at the trust-determined level
run_async_followup(state, *, batch_size=32)state: object, batch_size: intList[Tuple[ProjectionDelta, VerificationResult]]Re-verify optimistic Level 0/1 accepts at Level 2
queue_depth()--intPending async verifications
bind_trust_lattice(lattice)lattice: DeltaTrustLatticeNoneLate-bind the trust lattice
bind_circuit_breaker(breaker)breaker: TrustCircuitBreakerNoneLate-bind the circuit breaker

10. crdt_merge.e4.compatibility

Dual-hash compatibility for E4 migration (ref 855, 945).

class CompatibilityMode (Enum)

ValueDescription
E4_ONLYOnly H(data || trust) hashes
DUAL_HASHBoth H(data) and H(data || trust) in parallel
LEGACY_ONLYPre-E4 mode, only H(data)

class PeerCapability (Enum)

ValueCodeDescription
PRE_E40No E4 support
E4_DUAL1Supports dual-hash
E4_FULL2Full E4 support

class CompatHandshake

@dataclass(frozen=True)
class CompatHandshake:
    peer_id: str
    capability: PeerCapability
    version: int = 1

class CompatibilityController

class CompatibilityController:
    def __init__(
        self,
        *,
        default_mode: CompatibilityMode = CompatibilityMode.E4_ONLY,
        merkle: Optional[TrustBoundMerkle] = None,
    ) -> None

Manages hash compatibility across mixed E4 / pre-E4 clusters. Migration path: LEGACY_ONLY -> DUAL_HASH -> E4_ONLY.

MethodParametersReturnsDescription
process_handshake(hs)hs: CompatHandshakeCompatibilityModeRecord peer capability, return negotiated mode
build_handshake(local_peer_id)local_peer_id: strCompatHandshakeBuild handshake payload advertising local capability
mode_for_peer(peer_id)peer_id: strCompatibilityModeEffective mode for a peer (falls back to default)
compute_hashes(data, originator, peer_id)data: bytes, originator: str, peer_id: strDict[str, str]Compute {"e4": ..., "legacy": ...} as required
peers_ready_for_e4_only()--Set[str]Peers that can upgrade from DUAL_HASH
upgrade_peer(peer_id)peer_id: strCompatibilityModeAttempt to upgrade a peer to the next level
bind_merkle(merkle)merkle: TrustBoundMerkleNoneLate-bind the Merkle tree

crdt-merge v0.9.5 -- April 2026