E4 Core Modules -- API Reference
April 8, 2026 ยท View on GitHub
Copyright 2026 Ryan Gillespie / Optitransfer. All rights reserved. Licensed under the Business Source License 1.1 (BSL-1.1). Patent: UK Application No. 2607132.4, GB2608127.3
E4 Core Modules -- API Reference
Complete reference for the core modules in crdt_merge.e4. For integration
bridges, see e4_integration.md. For the resilience
subsystem, see e4_resilience.md.
Quick Import
from crdt_merge.e4.typed_trust import TypedTrustScore, TrustHomeostasis, TRUST_DIMENSIONS
from crdt_merge.e4.proof_evidence import TrustEvidence
from crdt_merge.e4.pco import AggregateProofCarryingOperation, SubtreeRef
from crdt_merge.e4.projection_delta import ProjectionDelta, ProjectionDeltaManager, FrozenDict
from crdt_merge.e4.trust_weighted_strategy import (
TrustWeightedStrategySelector,
TrustWeightedLWWResolver,
TrustWeightedAveragingResolver,
TrustGatedAcceptanceFilter,
)
from crdt_merge.e4.delta_trust_lattice import DeltaTrustLattice, TrustCircuitBreaker
from crdt_merge.e4.trust_bound_merkle import TrustBoundMerkle
from crdt_merge.e4.causal_trust_clock import CausalTrustClock
from crdt_merge.e4.adaptive_verification import AdaptiveVerificationController
from crdt_merge.e4.compatibility import CompatibilityController, CompatibilityMode
1. crdt_merge.e4.typed_trust
Multi-dimensional typed trust scores with homeostatic normalization (ref 820).
Constants
| Constant | Type | Value | Description |
|---|---|---|---|
TRUST_DIMENSIONS | FrozenSet[str] | {"integrity", "causality", "consistency", "gossip", "model", "context"} | Six trust dimensions |
PROBATION_TRUST | float | 0.5 | Default trust for new/unknown peers |
QUARANTINE_THRESHOLD | float | 0.1 | Below this: rejected (Level 3) |
LOW_TRUST_THRESHOLD | float | 0.4 | Below this: full PCO verification (Level 2) |
PARTIAL_THRESHOLD | float | 0.8 | Above this: signature only (Level 0) |
class TypedTrustScore
@dataclass
class TypedTrustScore:
_evidence: Dict[str, Dict[str, float]] # dimension -> {observer -> amount}
Multi-dimensional trust score backed by GCounter evidence per observer. CRDT merge is element-wise max per dimension per observer.
Class Methods
| Method | Returns | Description |
|---|---|---|
TypedTrustScore.probationary() | TypedTrustScore | Score at probation level (empty evidence) |
TypedTrustScore.full_trust() | TypedTrustScore | Score with zero violations (empty evidence) |
Instance Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
trust_for_dimension(dimension) | dimension: str | float | Trust in a specific dimension [0.0, 1.0]. Defaults to PROBATION_TRUST if no evidence. |
overall_trust() | -- | float | Weighted mean across all six dimensions |
verification_level() | -- | int | Adaptive immune level: 0 (>0.8), 1 (0.4-0.8), 2 (<0.4), 3 (<0.1) |
record_evidence(observer, dimension, amount, proof) | observer: str, dimension: str, amount: float, proof: object | TypedTrustScore | Record verified negative evidence. Returns a new instance. Raises ValueError if proof.verify() fails. |
merge(other) | other: TypedTrustScore | TypedTrustScore | CRDT merge: element-wise max per dimension per observer |
serialize() | -- | bytes | Compact deterministic serialization for Merkle binding |
hash() | -- | str | SHA-256 hex digest of serialized trust vector |
class TrustHomeostasis
class TrustHomeostasis:
def __init__(self, *, target_budget: Optional[float] = None) -> None
Conserved-budget trust normalization (ref 828). Ensures total trust across all peers is bounded, preventing inflation attacks in long-running clusters.
| Method | Parameters | Returns | Description |
|---|---|---|---|
normalize(scores) | scores: Dict[str, TypedTrustScore] | Dict[str, TypedTrustScore] | Normalize scores so total budget is conserved |
set_target_budget(budget) | budget: float | None | Override the target budget |
current_budget() | -- | float | Current total trust budget across all peers |
2. crdt_merge.e4.proof_evidence
Cryptographic trust evidence with proof verification.
class TrustEvidence
@dataclass(frozen=True)
class TrustEvidence:
observer: str
target: str
evidence_type: str
dimension: str
amount: float
proof: bytes
proof_type: str
timestamp: float
Verifiable evidence of peer misbehaviour.
Class Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
TrustEvidence.create(observer, target, evidence_type, dimension, amount, proof) | observer: str, target: str, evidence_type: str, dimension: str, amount: float, proof: bytes | TrustEvidence | Build evidence with auto-detected proof type |
Instance Methods
| Method | Returns | Description |
|---|---|---|
verify() | bool | Independently verify the proof |
pack() | bytes | Compact wire serialization |
Evidence Types
| Type | Dimension | Proof Format |
|---|---|---|
"equivocation" | integrity | Two conflicting signed operations |
"merkle_divergence" | integrity | Merkle path mismatch |
"invalid_delta" | integrity | Expected hash + delta bytes |
"clock_regression" | causality | Clock snapshot pair |
"gossip_stale" | gossip | Stale gossip entry |
Utility Functions
| Function | Parameters | Returns | Description |
|---|---|---|---|
pack_trust_evidence(evidence) | evidence: TrustEvidence | bytes | Serialize evidence for wire transport |
unpack_trust_evidence(data) | data: bytes | TrustEvidence | Deserialize evidence from wire bytes |
3. crdt_merge.e4.pco
Aggregate proof-carrying operations (ref 880-886). Wire format: 64 B signature + 32 B aggregate hash + 32 B metadata = 128 bytes.
class SubtreeRef
@dataclass(frozen=True)
class SubtreeRef:
path: Tuple[int, ...] # index path from root, 0..B-1 per level
depth: int # depth in the Merkle tree
old_hash: str # hash before the operation
new_hash: str # hash after the operation
Reference to a subtree in the high-arity Merkle tree.
class AggregateProofCarryingOperation
@dataclass(frozen=True)
class AggregateProofCarryingOperation:
originator_id: str
signature: bytes # 64 B
aggregate_hash: str # 32 B hex
merkle_root: str
clock_snapshot: bytes
trust_vector_hash: str
delta_bounds: Tuple[SubtreeRef, ...]
version: int
flags: int
timestamp: float
128-byte aggregate proof covering integrity, causality, trust, and minimality.
Class Methods
| Method | Key Parameters | Returns | Description |
|---|---|---|---|
AggregateProofCarryingOperation.build(originator_id, signing_fn, merkle_root, clock_snapshot, trust_vector_hash, delta_bounds, *, version=1, flags=0) | signing_fn: Callable[[str], bytes] | AggregateProofCarryingOperation | Construct a PCO with computed hashes and signature |
AggregateProofCarryingOperation.from_wire(data, originator_id, merkle_root, clock_snapshot, trust_vector_hash, delta_bounds) | data: bytes (128 B) | AggregateProofCarryingOperation | Deserialize from wire format |
Instance Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
verify(state, trust_lattice, *, verification_level=2) | state: object, trust_lattice: object, verification_level: int | bool | Verify at the given adaptive level (0-3) |
to_wire() | -- | bytes | 128-byte wire serialization |
4. crdt_merge.e4.projection_delta
Sparse delta encoding via high-arity Merkle tree traversal.
class FrozenDict
class FrozenDict(dict):
"""Immutable dict for use in frozen dataclasses."""
Hashable, immutable dict subclass.
class ProjectionDelta
@dataclass(frozen=True)
class ProjectionDelta:
source_id: str
source_version: Optional[object]
target_version: Optional[object]
changed_subtrees: Tuple[SubtreeRef, ...]
insertions: FrozenDict # key -> bytes
updates: FrozenDict # key -> bytes
deletions: FrozenSet[str]
pco: AggregateProofCarryingOperation
encoding: str = "raw"
compression_ratio: float = 1.0
Sparse state change identified via O(log_B n) Merkle traversal.
Instance Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
is_empty() | -- | bool | True if no insertions, updates, or deletions |
content_hash() | -- | str | Deterministic SHA-256 over all fields |
compose(other) | other: ProjectionDelta | ProjectionDelta | Associative composition: delta_ab . delta_bc = delta_ac |
compress(encoding) | encoding: str ("sparse", "rle", "raw") | ProjectionDelta | Compress the delta with the given encoding |
class ProjectionDeltaManager
class ProjectionDeltaManager:
def __init__(self, *, max_history: int = 64) -> None
Delta lifecycle management: recording, composition, and history.
| Method | Parameters | Returns | Description |
|---|---|---|---|
record(delta) | delta: ProjectionDelta | None | Append a delta to the history |
latest(source_id) | source_id: str | Optional[ProjectionDelta] | Most recent delta from a peer |
compose_range(source_id, start, end) | source_id: str, start: int, end: int | ProjectionDelta | Compose deltas in the given index range |
peers() | -- | List[str] | All known peer IDs with recorded deltas |
history(source_id) | source_id: str | List[ProjectionDelta] | Full delta history for a peer |
5. crdt_merge.e4.trust_weighted_strategy
Trust-weighted conflict resolution strategies (ref 870-874).
class ConflictType (Enum)
| Value | Description |
|---|---|
NUMERIC | float/int values (routed to averaging) |
OPAQUE | binary blobs (routed to LWW) |
STRUCTURED | nested dicts/lists (routed to recursive) |
COUNTER | monotone counters (routed to max) |
SET | set union/intersection (trust-weighted) |
class ConflictEntry
@dataclass(frozen=True)
class ConflictEntry:
peer_id: str
value: Any
timestamp: float
trust: TypedTrustScore
dimension: str = "integrity"
class ResolutionResult
@dataclass(frozen=True)
class ResolutionResult:
resolved_value: Any
confidence: float # [0.0, 1.0]
method: str
contributors: Tuple[str, ...]
rejected_peers: Tuple[str, ...]
class TrustWeightedLWWResolver
class TrustWeightedLWWResolver:
def __init__(self, *, trust_weight_factor: float = 1.0) -> None
Last-writer-wins where effective timestamp = timestamp * (1 + trust_weight * factor).
| Method | Parameters | Returns | Description |
|---|---|---|---|
resolve(entries) | entries: Sequence[ConflictEntry] | ResolutionResult | Resolve via trust-weighted LWW |
class TrustWeightedAveragingResolver
class TrustWeightedAveragingResolver:
def __init__(self) -> None
Numeric merge via trust-weighted average. Each peer's contribution is proportional to their typed trust score.
| Method | Parameters | Returns | Description |
|---|---|---|---|
resolve(entries) | entries: Sequence[ConflictEntry] | ResolutionResult | Resolve via trust-weighted average |
class TrustGatedAcceptanceFilter
class TrustGatedAcceptanceFilter:
def __init__(
self,
*,
thresholds: Optional[Dict[str, float]] = None,
global_threshold: float = 0.4,
strict_mode: bool = False,
) -> None
Pre-merge gate that rejects operations from peers below a per-dimension or global trust threshold.
| Method | Parameters | Returns | Description |
|---|---|---|---|
filter_entries(entries) | entries: Sequence[ConflictEntry] | Tuple[List[ConflictEntry], List[ConflictEntry]] | Returns (accepted, rejected) |
class TrustWeightedStrategySelector
class TrustWeightedStrategySelector:
def __init__(
self,
*,
lww: Optional[TrustWeightedLWWResolver] = None,
averaging: Optional[TrustWeightedAveragingResolver] = None,
gate: Optional[TrustGatedAcceptanceFilter] = None,
) -> None
Meta-strategy that routes conflicts to the appropriate resolver based on ConflictType and trust topology.
| Method | Parameters | Returns | Description |
|---|---|---|---|
resolve(entries, conflict_type) | entries: Sequence[ConflictEntry], conflict_type: ConflictType | ResolutionResult | Select and execute the best resolver |
6. crdt_merge.e4.delta_trust_lattice
Recursive trust lattice where trust propagates as projection deltas (ref 840-843).
class DeltaTrustLattice
class DeltaTrustLattice:
def __init__(
self,
peer_id: str,
*,
circuit_breaker: Optional[TrustCircuitBreaker] = None,
homeostasis: Optional[TrustHomeostasis] = None,
) -> None
Central trust store for a single node. Trust changes are deltas verified by the same pipeline as data.
| Method | Parameters | Returns | Description |
|---|---|---|---|
get_trust(peer_id) | peer_id: str | TypedTrustScore | Current trust for a peer. Auto-initializes unknown peers at probation. |
record_evidence(target, evidence) | target: str, evidence: TrustEvidence | None | Record local evidence, update score, queue delta |
observe_and_propagate(target, evidence, state) | target: str, evidence: TrustEvidence, state: object | Optional[ProjectionDelta] | Record + build a projection delta for gossip |
receive_trust_delta(delta, state) | delta: ProjectionDelta, state: object | bool | Receive and verify an incoming trust delta |
merge(other) | other: DeltaTrustLattice | DeltaTrustLattice | CRDT merge of two lattices |
drain_async_queue() | -- | List[ProjectionDelta] | Drain pending trust deltas for outbound gossip |
Properties
| Property | Type | Description |
|---|---|---|
peer_id | str | Local peer identifier |
known_peers | Set[str] | All peers with trust state |
class TrustCircuitBreaker
class TrustCircuitBreaker:
def __init__(
self,
*,
window_size: int = 100,
sigma_threshold: float = 2.0,
cooldown_seconds: float = 30.0,
min_samples: int = 10,
) -> None
Monitors trust change velocity. Trips when rate exceeds mean + sigma_threshold * std_dev, forcing Level 2 verification until cooldown expires (ref 829).
| Method | Parameters | Returns | Description |
|---|---|---|---|
record_event(magnitude) | magnitude: float | None | Record a trust mutation event |
is_tripped() | -- | bool | Whether the breaker is currently tripped |
event_count() | -- | int | Total events recorded |
current_rate() | -- | float | Events per second in the current window |
reset() | -- | None | Manually reset the breaker |
7. crdt_merge.e4.trust_bound_merkle
High-arity Merkle tree where hashes incorporate trust context: H(data || trust_score || originator) (ref 850-855).
class TrustBoundMerkle
class TrustBoundMerkle:
def __init__(
self,
trust_lattice: Optional[DeltaTrustLattice] = None,
*,
branching_factor: int = 256,
compatibility_mode: bool = False,
) -> None
Branching factor 256: 1M params -> depth 3, 1B params -> depth 4, 1T params -> depth 5.
| Method | Parameters | Returns | Description |
|---|---|---|---|
insert(key, data, originator) | key: str, data: bytes, originator: str | None | Insert a leaf |
root_hash() | -- | str | Current root hash |
compute_leaf_hash(data, originator) | data: bytes, originator: str | str | E4 hash: H(data || trust || originator) |
compute_leaf_hash_compat(data) | data: bytes | str | Legacy hash: H(data) |
diff(other) | other: TrustBoundMerkle | List[SubtreeRef] | Changed subtrees between two trees |
verify_proof(key, proof) | key: str, proof: List[str] | bool | Verify a Merkle inclusion proof |
Class Methods
| Method | Description |
|---|---|
TrustBoundMerkle.enable_domain_hashing() | Enable domain-separated hashing for cross-component hash isolation |
TrustBoundMerkle.disable_domain_hashing() | Restore original hashing |
8. crdt_merge.e4.causal_trust_clock
Vector clock where entries are (logical_time, trust_score) pairs (ref 860-863).
class CausalTrustClock
class CausalTrustClock:
TRUST_OVERRIDE_FACTOR = 1.5
def __init__(
self,
peer_id: str,
trust_lattice: Optional[DeltaTrustLattice] = None,
) -> None
Low-trust peers cannot causally dominate high-trust peers even with higher logical time.
| Method | Parameters | Returns | Description |
|---|---|---|---|
increment() | -- | CausalTrustClock | Increment local clock with current trust. Returns new instance. |
trust_weighted_compare(other) | other: CausalTrustClock | str | Returns "before", "after", "concurrent", or "trust_override" |
merge(other) | other: CausalTrustClock | CausalTrustClock | CRDT merge: element-wise max of (time, trust) pairs |
serialize_compact() | -- | bytes | Compact binary representation for PCO embedding |
deserialize_compact(data, peer_id, trust_lattice) | data: bytes, peer_id: str, ... | CausalTrustClock | Class method. Reconstruct from compact bytes. |
is_consistent_with(snapshot) | snapshot: bytes | bool | Check if a serialized snapshot is causally consistent |
content_hash() | -- | str | SHA-256 of the compact serialization |
bind_trust_lattice(lattice) | lattice: DeltaTrustLattice | None | Late-bind the trust lattice |
Properties
| Property | Type | Description |
|---|---|---|
peer_id | str | Local peer identifier |
logical_time | int | Local peer's logical time |
entries | Dict[str, Tuple[int, float]] | Copy of all entries |
9. crdt_merge.e4.adaptive_verification
Trust-tiered verification routing (ref 895).
Verification Levels
| Level | Trust Range | Cost | Action |
|---|---|---|---|
| 0 | > 0.8 (partial) | O(1) | Signature only |
| 1 | 0.4 - 0.8 | O(1) | Signature + Merkle root |
| 2 | < 0.4 (low) | O(k) | Full aggregate PCO |
| 3 | < 0.1 (quarantine) | O(1) | Unconditional reject |
class VerificationResult
@dataclass(frozen=True)
class VerificationResult:
accepted: bool
level: int
reason: str = ""
async_pending: bool = False
class AdaptiveVerificationController
class AdaptiveVerificationController:
def __init__(
self,
trust_lattice: Optional[DeltaTrustLattice] = None,
circuit_breaker: Optional[TrustCircuitBreaker] = None,
*,
async_queue_limit: int = 1024,
) -> None
Routes incoming deltas to the appropriate verification level based on sender trust.
| Method | Parameters | Returns | Description |
|---|---|---|---|
verify(delta, state, trust_lattice) | delta: ProjectionDelta, state: object, trust_lattice: object | VerificationResult | Verify at the trust-determined level |
run_async_followup(state, *, batch_size=32) | state: object, batch_size: int | List[Tuple[ProjectionDelta, VerificationResult]] | Re-verify optimistic Level 0/1 accepts at Level 2 |
queue_depth() | -- | int | Pending async verifications |
bind_trust_lattice(lattice) | lattice: DeltaTrustLattice | None | Late-bind the trust lattice |
bind_circuit_breaker(breaker) | breaker: TrustCircuitBreaker | None | Late-bind the circuit breaker |
10. crdt_merge.e4.compatibility
Dual-hash compatibility for E4 migration (ref 855, 945).
class CompatibilityMode (Enum)
| Value | Description |
|---|---|
E4_ONLY | Only H(data || trust) hashes |
DUAL_HASH | Both H(data) and H(data || trust) in parallel |
LEGACY_ONLY | Pre-E4 mode, only H(data) |
class PeerCapability (Enum)
| Value | Code | Description |
|---|---|---|
PRE_E4 | 0 | No E4 support |
E4_DUAL | 1 | Supports dual-hash |
E4_FULL | 2 | Full E4 support |
class CompatHandshake
@dataclass(frozen=True)
class CompatHandshake:
peer_id: str
capability: PeerCapability
version: int = 1
class CompatibilityController
class CompatibilityController:
def __init__(
self,
*,
default_mode: CompatibilityMode = CompatibilityMode.E4_ONLY,
merkle: Optional[TrustBoundMerkle] = None,
) -> None
Manages hash compatibility across mixed E4 / pre-E4 clusters. Migration path: LEGACY_ONLY -> DUAL_HASH -> E4_ONLY.
| Method | Parameters | Returns | Description |
|---|---|---|---|
process_handshake(hs) | hs: CompatHandshake | CompatibilityMode | Record peer capability, return negotiated mode |
build_handshake(local_peer_id) | local_peer_id: str | CompatHandshake | Build handshake payload advertising local capability |
mode_for_peer(peer_id) | peer_id: str | CompatibilityMode | Effective mode for a peer (falls back to default) |
compute_hashes(data, originator, peer_id) | data: bytes, originator: str, peer_id: str | Dict[str, str] | Compute {"e4": ..., "legacy": ...} as required |
peers_ready_for_e4_only() | -- | Set[str] | Peers that can upgrade from DUAL_HASH |
upgrade_peer(peer_id) | peer_id: str | CompatibilityMode | Attempt to upgrade a peer to the next level |
bind_merkle(merkle) | merkle: TrustBoundMerkle | None | Late-bind the Merkle tree |
crdt-merge v0.9.5 -- April 2026