LFX V2 Indexer Service
May 4, 2026 ยท View on GitHub
A high-performance, indexer service for the LFX V2 platform that processes resource transactions into OpenSearch with comprehensive NATS message processing and queue group load balancing.
๐ Overview
The LFX V2 Indexer Service is responsible for:
- Message Processing: NATS stream processing with queue group load balancing across multiple instances
- Transaction Enrichment: JWT authentication, data validation, and principal parsing with delegation support
- Search Indexing: OpenSearch document indexing with optimistic concurrency control
- Data Consistency: Event-driven janitor service for conflict resolution
- Dual Format Support: Both LFX v2 (past-tense actions) and legacy v1 (present-tense actions) message formats
- Health Monitoring: Kubernetes-ready health check endpoints
๐ Quick Start
Prerequisites
- Go 1.24
- NATS Server (message streaming)
- OpenSearch/Elasticsearch (document indexing)
- Heimdall JWT Service (authentication)
Environment Setup
# Core Services (Required)
export NATS_URL=nats://nats:4222
export OPENSEARCH_URL=http://localhost:9200
export JWKS_URL=http://localhost:4457/.well-known/jwks
# Message Processing
export NATS_QUEUE=lfx.indexer.queue
export OPENSEARCH_INDEX=resources
# Optional Configuration
export LOG_LEVEL=info
export PORT=8080
Install & Run
# Install dependencies
go mod download
# Development mode
make run
# Build and run
make build-local
./bin/lfx-indexer
# Direct Go commands
go run ./cmd/lfx-indexer
go build -o bin/lfx-indexer ./cmd/lfx-indexer
# Command-line options
./bin/lfx-indexer -help
./bin/lfx-indexer -check-config
CLI Flags
# Service configuration
./bin/lfx-indexer -p 9090 # Custom health check port
./bin/lfx-indexer --bind 0.0.0.0 # Bind to all interfaces
./bin/lfx-indexer -d # Enable debug logging
# Feature toggles
./bin/lfx-indexer --nojanitor # Disable janitor service (overrides JANITOR_ENABLED)
./bin/lfx-indexer --simple-health # Use simple 'OK' health responses
# Utilities
./bin/lfx-indexer --help # Show help message
Note: CLI flags have highest precedence: CLI flags > Environment variables > Defaults
Health Check
# Kubernetes probes
curl http://localhost:8080/livez # Liveness probe
curl http://localhost:8080/readyz # Readiness probe
curl http://localhost:8080/health # General health
๐๏ธ Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ LFX Indexer Service โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Entry Points (cmd/ - Standard Go Layout) โ
โ โโ cmd/lfx-indexer/main.go - Pure dependency injection โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Presentation Layer (NATS Protocol + Health Checks) โ
โ โโ IndexingMessageHandler - Unified V2/V1 message processing โ
โ โโ BaseMessageHandler - Shared NATS response logic โ
โ โโ HealthHandler - Kubernetes health probes โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Application Layer (Orchestration) โ
โ โโ MessageProcessor - Complete workflow coordination โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Domain Layer (Business Logic) โ
โ โโ IndexerService - Consolidated transaction + health logic โ
โ โโ Contracts Package - Pure domain interfaces & entities โ
โ โโ Repository Interfaces - Clean abstractions โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Infrastructure Layer (External Services) โ
โ โโ MessagingRepository - NATS client with queue groups โ
โ โโ StorageRepository - OpenSearch client โ
โ โโ AuthRepository - JWT validation (Heimdall integration) โ
โ โโ CleanupRepository - Background cleanup operations โ
โ โโ Container - Pure dependency injection โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโ
โ External Dependencies โ
โ โ
โ NATS โโ OpenSearch โโ JWT โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ Data Flow Architecture
graph TB
%% External message sources
V2_APPS["V2 Applications<br/>(Projects API, Orgs API, etc.)"]
V1_APPS["V1 Legacy Applications<br/>(Platform DB Events)"]
%% NATS Infrastructure
NATS_SERVER["NATS Server<br/>nats://nats:4222"]
V2_SUBJECT["V2 Subject: lfx.index.*<br/>(lfx.index.project, lfx.index.organization)"]
V1_SUBJECT["V1 Subject: lfx.v1.index.*<br/>(lfx.v1.index.project)"]
QUEUE_GROUP["Queue Group: lfx.indexer.queue<br/>(Load Balancing)"]
%% Service Instances
INSTANCE1["LFX Indexer Instance 1"]
INSTANCE2["LFX Indexer Instance 2"]
INSTANCE3["LFX Indexer Instance N..."]
%% Clean Architecture Layers
MAIN_GO["cmd/lfx-indexer/main.go<br/>(Entry Point & DI)"]
CONTAINER["Container<br/>(Dependency Injection)"]
MSG_REPO["MessagingRepository<br/>(NATS Wrapper)"]
%% Presentation Layer
UNIFIED_HANDLER["IndexingMessageHandler<br/>(Unified V2 + V1 Messages)"]
HEALTH_HANDLER["HealthHandler<br/>(Kubernetes Probes)"]
%% Application Layer
MESSAGE_PROCESSOR["MessageProcessor<br/>(Workflow Coordination)"]
%% Domain Layer
INDEXER_SVC["IndexerService<br/>(Business Logic + Health + Action Validation)"]
CONTRACTS_PKG["Contracts Package<br/>(Pure Domain Interfaces & Entities)"]
TRANSACTION_ENTITY["LFXTransaction Entity<br/>(Pure Data Structure)"]
%% Infrastructure Layer
STORAGE_REPO["StorageRepository<br/>(OpenSearch Client)"]
AUTH_REPO["AuthRepository<br/>(JWT Validation)"]
OPENSEARCH["OpenSearch Cluster<br/>resources index"]
%% Cleanup System
CLEANUP_REPO["CleanupRepository<br/>(Background Cleanup)"]
%% External Services
JWT_SERVICE["Heimdall JWT Service<br/>(Token Validation)"]
%% Message Flow
V2_APPS -->|"Publish V2 Messages"| V2_SUBJECT
V1_APPS -->|"Publish V1 Messages"| V1_SUBJECT
V2_SUBJECT --> NATS_SERVER
V1_SUBJECT --> NATS_SERVER
NATS_SERVER -->|"Queue Group Distribution"| QUEUE_GROUP
QUEUE_GROUP --> INSTANCE1
QUEUE_GROUP --> INSTANCE2
QUEUE_GROUP --> INSTANCE3
%% Service Architecture Flow
INSTANCE1 --> MAIN_GO
MAIN_GO --> CONTAINER
CONTAINER --> MSG_REPO
CONTAINER --> HEALTH_HANDLER
MSG_REPO -->|"Both V2 + V1"| UNIFIED_HANDLER
%% Processing Flow
UNIFIED_HANDLER --> MESSAGE_PROCESSOR
MESSAGE_PROCESSOR --> INDEXER_SVC
MESSAGE_PROCESSOR --> CLEANUP_REPO
INDEXER_SVC --> CONTRACTS_PKG
INDEXER_SVC --> TRANSACTION_ENTITY
INDEXER_SVC --> AUTH_REPO
INDEXER_SVC --> STORAGE_REPO
CONTRACTS_PKG --> TRANSACTION_ENTITY
STORAGE_REPO --> OPENSEARCH
AUTH_REPO --> JWT_SERVICE
%% Styling
classDef application fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef domain fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef infrastructure fill:#e8f5e8,stroke:#388e3c,stroke-width:2px
classDef external fill:#fff3e0,stroke:#f57c00,stroke-width:2px
class MESSAGE_PROCESSOR,UNIFIED_HANDLER application
class INDEXER_SVC,CONTRACTS_PKG,TRANSACTION_ENTITY domain
class CONTAINER,MSG_REPO,STORAGE_REPO,AUTH_REPO,CLEANUP_REPO infrastructure
class V2_APPS,V1_APPS,JWT_SERVICE,OPENSEARCH,NATS_SERVER external
๐ Message Processing Sequence
sequenceDiagram
participant NATS as NATS Server
participant MR as MessagingRepository
participant UH as IndexingMessageHandler
participant MP as MessageProcessor
participant IS as IndexerService
participant AR as AuthRepository
participant JWT as JWT Service
participant TE as LFXTransaction
participant SR as StorageRepository
participant OS as OpenSearch
participant CR as CleanupRepository
Note over NATS,CR: Message Processing Flow
%% Message Arrival & Presentation Layer
NATS->>MR: NATS Message arrives<br/>Subject: lfx.index.project OR lfx.v1.index.project<br/>Queue: lfx.indexer.queue
MR->>UH: HandleWithReply(ctx, data, subject, reply)
Note over UH: Route based on subject prefix<br/>(V2 vs V1 format)
%% Application Layer Coordination
UH->>MP: ProcessIndexingMessage(ctx, data, subject)
Note over MP: Generate messageID<br/>Log reception metrics
%% Transaction Creation & Validation
MP->>TE: createTransaction(data, subject)
TE->>TE: Parse message format<br/>Extract object type from subject<br/>Validate required fields (pure data)
TE-->>MP: LFXTransaction entity (clean data structure)
%% Domain Layer Business Logic
MP->>IS: ProcessTransaction(ctx, transaction, index)
Note over IS: Consolidated processing:<br/>โข EnrichTransaction() with action validation<br/>โข GenerateTransactionBody()<br/>โข Index document<br/>(Action helpers moved to service layer)
%% Authentication & Authorization
IS->>AR: ParsePrincipals(ctx, headers)
AR->>JWT: ValidateToken(ctx, token)
JWT-->>AR: Principal{Principal, Email}
AR-->>IS: []Principal with delegation support
%% Validation & Body Generation
IS->>IS: EnrichTransaction()<br/>ValidateObjectType()<br/>GenerateTransactionBody()
%% Document Indexing
IS->>SR: Index(ctx, index, docID, body)
SR->>OS: POST /resources/_doc/{docID}<br/>with optimistic concurrency
alt Successful Index
OS-->>SR: 201 Created
SR-->>IS: Success with DocumentID
%% Background Conflict Resolution
IS-->>MP: ProcessingResult{Success: true, DocumentID}
MP->>CR: CheckItem(documentID)
Note over CR: Background cleanup<br/>Event-driven processing
else Index Conflict/Error
OS-->>SR: 409 Conflict / 4xx Error
SR-->>IS: Error with details
IS-->>MP: ProcessingResult{Success: false, Error}
end
%% Response Handling
MP-->>UH: Processing result
alt Success
UH->>UH: reply([]byte("OK"))
Note over UH: Log success metrics
else Error
UH->>UH: reply([]byte("ERROR: details"))
Note over UH: Log error with context
end
UH-->>MR: Acknowledge message
MR-->>NATS: Message processed
๐ง Configuration & Environment
Core Environment Variables
# Required Services
NATS_URL=nats://nats:4222 # NATS server URL
OPENSEARCH_URL=http://localhost:9200 # OpenSearch endpoint
JWKS_URL=http://localhost:4457/.well-known/jwks # JWT validation endpoint
# Message Processing
NATS_INDEXING_SUBJECT=lfx.index.> # V2 subject pattern
NATS_V1_INDEXING_SUBJECT=lfx.v1.index.> # V1 subject pattern
NATS_QUEUE=lfx.indexer.queue # Queue group name
OPENSEARCH_INDEX=resources # OpenSearch index name
# NATS Connection Settings
NATS_MAX_RECONNECTS=10 # Max reconnection attempts
NATS_RECONNECT_WAIT=2s # Wait time between reconnects
NATS_CONNECTION_TIMEOUT=10s # Initial connection timeout
# JWT Configuration
JWT_ISSUER=heimdall # JWT issuer validation
JWT_AUDIENCES=["audience1","audience2"] # Allowed audiences (JSON array)
JWT_CLOCK_SKEW=6h # Clock skew tolerance
# Server Configuration
PORT=8080 # Health check server port
LOG_LEVEL=info # Logging level (debug,info,warn,error)
LOG_FORMAT=text # Log format (text,json)
# Cleanup Service (Background Cleanup)
JANITOR_ENABLED=true # Enable cleanup service (default: true)
NATS Subject Patterns & Load Balancing
| Subject Pattern | Purpose | Example | Load Balancing |
|---|---|---|---|
lfx.index.* | V2 resource indexing | lfx.index.project, lfx.index.organization | Queue group distribution |
lfx.v1.index.* | V1 legacy support | lfx.v1.index.project | Queue group distribution |
Queue Group: lfx.indexer.queue
- Load Balancing: Automatic distribution across service instances
- Durability: Messages processed exactly once per queue group
- Fault Tolerance: Failed instances don't lose messages
๐ Project Structure
โโโ cmd/ # Application entry points (Standard Go Layout)
โ โโโ lfx-indexer/ # Main indexer service
โ โโโ main.go # Service entry point & dependency injection
โ โโโ cli.go # CLI command handling
โ โโโ server.go # HTTP server setup
โโโ internal/ # Private application code
โ โโโ application/ # Application layer (use cases)
โ โ โโโ message_processor.go # Message processing coordination
โ โ โโโ message_processor_test.go # Application layer tests
โ โโโ domain/ # Domain layer (business logic)
โ โ โโโ contracts/ # Domain contracts/interfaces & entities
โ โ โ โโโ auth.go # Auth repository interface
โ โ โ โโโ cleanup.go # Cleanup repository interface
โ โ โ โโโ messaging.go # Messaging repository interface
โ โ โ โโโ storage.go # Storage repository interface
โ โ โ โโโ transaction.go # Transaction interfaces & entity types (pure data)
โ โ โโโ services/ # Domain services
โ โ โโโ indexer_service.go # Core indexer business logic + action validation
โ โ โโโ indexer_service_test.go # Domain service tests
โ โโโ infrastructure/ # Infrastructure layer
โ โ โโโ auth/ # Authentication
โ โ โ โโโ auth_repository.go # JWT validation implementation
โ โ โ โโโ auth_repository_test.go # Auth tests
โ โ โโโ config/ # Configuration management
โ โ โ โโโ app_config.go # Application configuration
โ โ โ โโโ cli_config.go # CLI configuration
โ โ โโโ cleanup/ # Background cleanup operations
โ โ โ โโโ cleanup_repository.go # Background cleanup repository
โ โ โ โโโ cleanup_repository_test.go # Cleanup tests
โ โ โโโ messaging/ # NATS messaging
โ โ โ โโโ messaging_repository.go # NATS client wrapper
โ โ โ โโโ messaging_repository_test.go # Messaging tests
โ โ โโโ storage/ # OpenSearch storage
โ โ โโโ storage_repository.go # OpenSearch client wrapper
โ โ โโโ storage_repository_test.go # Storage tests
โ โโโ presentation/ # Presentation layer
โ โ โโโ handlers/ # Message and HTTP handlers
โ โ โโโ health_handler.go # Kubernetes health check endpoints
โ โ โโโ health_handler_test.go # Health handler tests
โ โ โโโ indexing_message_handler.go # NATS message handler
โ โ โโโ indexing_message_handler_test.go # Handler tests
โ โโโ container/ # Dependency injection
โ โ โโโ container.go # DI container implementation
โ โ โโโ container_test.go # Container tests
โ โโโ mocks/ # Mock implementations
โ โโโ repositories.go # Repository mocks for testing
โโโ pkg/ # Public packages (reusable)
โ โโโ constants/ # Shared constants
โ โ โโโ app.go # Application constants
โ โ โโโ auth.go # Authentication constants
โ โ โโโ errors.go # Error constants
โ โ โโโ health.go # Health check constants
โ โ โโโ messaging.go # Messaging constants
โ โโโ logging/ # Logging utilities
โ โโโ logger.go # Logger implementation
โ โโโ logger_test.go # Logger tests
โ โโโ testing.go # Test logging utilities
โโโ deployment/ # Deployment configurations
โ โโโ deployment.yaml # Kubernetes deployment
โโโ Dockerfile # Container definition
โโโ Makefile # Build automation
โโโ go.mod # Go module definition
โโโ go.sum # Go module checksums
โโโ run.sh # Development run script
โโโ README.md # This file
Layer Responsibilities
| Layer | Components | Responsibilities |
|---|---|---|
| Entry Point | cmd/lfx-indexer/main.go | Pure application startup and dependency injection |
| Presentation | IndexingMessageHandler, HealthHandler | NATS protocol concerns, message parsing, response handling, health checks |
| Application | MessageProcessor | Workflow coordination, use case orchestration |
| Domain | IndexerService, Contracts Package, LFXTransaction Entity | Business logic, action validation, pure domain data structures, repository interfaces |
| Infrastructure | Container, MessagingRepository, StorageRepository, AuthRepository, CleanupRepository | External service integration, data persistence, event-driven processing |
๐ Client Integration
Indexing Your Resources
The LFX V2 Indexer Service is data-agnostic โ it accepts any object_type the publisher sends and uses the indexing_config block to shape the indexed document. No server-side resource knowledge is needed.
For detailed guidance on integrating with the indexer service, see the Client Guide:
- Message Format: Learn the
IndexerMessageEnvelopestructure - Field Reference: Complete reference of all available fields
- Examples: Real-world examples for create, update, and delete operations
- Best Practices: Tips for optimal search performance and access control
- Go Client Integration: Using the public
pkg/typespackage
Quick example:
{
"action": "created",
"headers": {
"authorization": "Bearer <token>"
},
"data": {
"id": "proj-123",
"name": "My Project"
},
"indexing_config": {
"object_id": "proj-123",
"public": true,
"access_check_object": "project:proj-123",
"access_check_relation": "viewer",
"history_check_object": "project:proj-123",
"history_check_relation": "historian",
"sort_name": "my project",
"name_and_aliases": ["My Project"],
"tags": ["featured"]
}
}
Key Benefits of indexing_config:
- โ Full control over indexing behavior and access control
- โ
No server-side resource knowledge required โ any
object_typeis accepted - โ Server automatically sets timestamps and principal fields
See the Client Guide for complete documentation.
๐งโ๐ป Development
Build Targets
# Local development
make build-local # Build for current OS
make run # Run with environment setup
make fmt # Format code
make lint # Run linters
make test # Run tests
# Production build
make build # Build for Linux (deployment)
make docker-build # Build container image
make docker-push # Push to registry
# Testing by layer
make test-domain # Domain layer tests
make test-application # Application layer tests
make test-infrastructure # Infrastructure layer tests
make test-presentation # Presentation layer tests
make coverage # Generate coverage report
Testing Message Processing
# Test V2 message format
nats pub lfx.index.project '{
"action": "created",
"headers": {"Authorization": "Bearer token"},
"data": {
"id": "test-project-123",
"uid": "test-project-123",
"name": "Test Project",
"slug": "test-project",
"public": true
}
}' --server nats://your-nats-server:4222
# Test V1 message format
nats pub lfx.v1.index.project '{
"action": "create",
"data": {
"id": "test-project-456",
"name": "Legacy Project"
},
"v1_data": {
"legacy_field": "legacy_value"
}
}' --server nats://your-nats-server:4222
# Monitor processing
nats sub "lfx.index.>" --server nats://your-nats-server:4222
๐จ Troubleshooting
Common Issues
1. NATS Connection Failures
# Check NATS connectivity
nats server check --server $NATS_URL
# Verify queue configuration
nats consumer info lfx.indexer.queue --server $NATS_URL
# Test basic pub/sub
nats pub test.subject "hello" --server $NATS_URL
nats sub test.subject --server $NATS_URL
2. OpenSearch Issues
# Check OpenSearch health
curl $OPENSEARCH_URL/_cluster/health
# Verify index exists
curl $OPENSEARCH_URL/resources/_mapping
# Check recent documents
curl $OPENSEARCH_URL/resources/_search?size=5&sort=@timestamp:desc
3. JWT Validation Failures
# Check Heimdall connectivity
curl $JWKS_URL
# Enable debug logging for auth details
LOG_LEVEL=debug ./bin/lfx-indexer
Performance Monitoring
Health Endpoints:
curl http://localhost:8080/livez # Kubernetes liveness
curl http://localhost:8080/readyz # Kubernetes readiness
curl http://localhost:8080/health # General health status
Debug Configuration:
# Enable comprehensive debugging
export LOG_LEVEL=debug
export LOG_FORMAT=json
# Monitor message processing
LOG_LEVEL=debug make run
๐ Security
JWT Authentication:
- Heimdall Integration: All messages validated against JWT service
- Multiple Audiences: Configurable audience validation
- Principal Parsing: Authorization header and X-On-Behalf-Of delegation support
- Machine User Detection:
clients@prefix identification
Input Validation:
- Domain Validation: LFXTransaction entity validates all inputs
- Subject Format Validation: String prefix validation with enhanced error messages
- Data Structure Validation: Comprehensive JSON schema validation
Error Handling:
- Safe Error Messages: No sensitive data leakage
- Structured Logging: Detailed error context for debugging
- Graceful Degradation: Continue processing on non-critical failures
๐ณ Deployment
Helm Chart
# Install with Make
make helm-install
# Uninstall with Make
make helm-uninstall
# Manual installation with custom namespace/values
helm install lfx-indexer ./charts/lfx-v2-indexer-service \
--namespace lfx \
--create-namespace \
--values custom-values.yaml
# Manual upgrade
helm upgrade lfx-indexer ./charts/lfx-v2-indexer-service \
--namespace lfx
Docker
# Build container
make docker-build
# Run container
docker run -p 8080:8080 \
-e NATS_URL=nats://nats:4222 \
-e OPENSEARCH_URL=http://opensearch:9200 \
-e JWKS_URL=http://heimdall:4457/.well-known/jwks \
lfx-v2-indexer-service:latest
๐ License
This project uses dual licensing to cover different types of content:
Software License (MIT)
The source code in this project is licensed under the MIT License. See the LICENSE file for the complete license text.
Copyright: The Linux Foundation and each contributor to LFX.
Documentation License (Creative Commons)
The documentation in this project is licensed under the Creative Commons Attribution 4.0 International License. See the LICENSE-docs file for the complete license text.
Security Policy
For information about reporting security vulnerabilities, please see our SECURITY.md file.
Code Ownership
Code review and maintenance responsibilities are defined in the CODEOWNERS file.