Confluent Schema Registry

February 20, 2026 · View on GitHub

⚠️ Experimental API ConfluentSchemaRegistry and the related registry, beforeSerialization, and beforeDeserialization hooks are experimental. They do not follow semver and may change in minor/patch releases.

The ConfluentSchemaRegistry class provides integration with Confluent Schema Registry for automatic message serialization and deserialization with schema management.

Features

  • Multi-format support: AVRO, Protocol Buffers, and JSON Schema
  • Automatic serialization/deserialization: Seamlessly integrates with Producer and Consumer
  • Schema caching: Fetched schemas are cached for performance
  • Authentication support: Basic and Bearer token authentication
  • Type safety: Full TypeScript generics support
  • Validation: Optional JSON schema validation on send, automatic on receive

Installation

The Confluent Schema Registry support is included in the main package:

npm install @platformatic/kafka

For Protocol Buffers support, also install:

npm install protobufjs

Constructor

Creates a new schema registry instance with type ConfluentSchemaRegistry<Key, Value, HeaderKey, HeaderValue>.

Options:

PropertyTypeRequiredDescription
urlstringYesURL of the Confluent Schema Registry
authobjectNoAuthentication configuration
auth.usernamestring | CredentialProviderNoUsername for Basic authentication
auth.passwordstring | CredentialProviderNoPassword for Basic authentication
auth.tokenstring | CredentialProviderNoToken for Bearer authentication
protobufTypeMapperfunctionNoCustom type mapper for Protocol Buffers
jsonValidateSendbooleanNoEnable JSON schema validation on send (default: false)

Basic Usage

AVRO Schema

import { Producer, Consumer } from '@platformatic/kafka'
import { ConfluentSchemaRegistry } from '@platformatic/kafka/registries'

// Create registry instance
const registry = new ConfluentSchemaRegistry({
  url: 'http://localhost:8081'
})

// Producer
const producer = new Producer({
  clientId: 'avro-producer',
  bootstrapBrokers: ['localhost:9092'],
  registry
})

await producer.send({
  messages: [
    {
      topic: 'users',
      key: { id: 123 },
      value: { name: 'John Doe', age: 30 },
      metadata: {
        schemas: {
          key: 1, // AVRO schema ID for key
          value: 2 // AVRO schema ID for value
        }
      }
    }
  ]
})

// Consumer
const consumer = new Consumer({
  groupId: 'avro-consumers',
  clientId: 'avro-consumer',
  bootstrapBrokers: ['localhost:9092'],
  registry
})

const stream = await consumer.consume({
  topics: ['users']
})

for await (const message of stream) {
  // Automatically deserialized from AVRO
  console.log('User:', message.value)
}

JSON Schema

const registry = new ConfluentSchemaRegistry({
  url: 'http://localhost:8081',
  jsonValidateSend: true // Enable validation on send
})

const producer = new Producer({
  clientId: 'json-producer',
  bootstrapBrokers: ['localhost:9092'],
  registry
})

// Will validate against schema before sending
await producer.send({
  messages: [
    {
      topic: 'events',
      value: {
        eventType: 'user_login',
        timestamp: Date.now(),
        userId: 'user-123'
      },
      metadata: {
        schemas: {
          value: 3 // JSON schema ID
        }
      }
    }
  ]
})

Protocol Buffers

// Custom type mapper for complex protobuf schemas
function customTypeMapper (id, type, context) {
  // Map schema IDs to protobuf message types
  const typeMap = {
    4: 'com.example.UserKey',
    5: 'com.example.UserValue'
  }
  return typeMap[id] || `${context.topic}-${type}`
}

const registry = new ConfluentSchemaRegistry({
  url: 'http://localhost:8081',
  protobufTypeMapper: customTypeMapper
})

const producer = new Producer({
  clientId: 'protobuf-producer',
  bootstrapBrokers: ['localhost:9092'],
  registry
})

await producer.send({
  messages: [
    {
      topic: 'users',
      key: { id: 123 },
      value: { name: 'John', email: 'john@example.com' },
      metadata: {
        schemas: {
          key: 4, // Protobuf schema ID for key
          value: 5 // Protobuf schema ID for value
        }
      }
    }
  ]
})

Authentication

Basic Authentication

const registry = new ConfluentSchemaRegistry({
  url: 'http://localhost:8081',
  auth: {
    username: 'user',
    password: 'password'
  }
})

Bearer Token Authentication

const registry = new ConfluentSchemaRegistry({
  url: 'http://localhost:8081',
  auth: {
    token: 'your-api-token'
  }
})

Dynamic Credentials

// Using CredentialProvider for dynamic credentials
const registry = new ConfluentSchemaRegistry({
  url: 'http://localhost:8081',
  auth: {
    username: async () => getUsername(),
    password: async () => getPassword()
  }
})

Message Metadata (Producer)

Schema IDs are passed through message metadata when producing:

// Producer message with schema metadata
const message = {
  topic: 'my-topic',
  key: { id: 123 },
  value: { data: 'example' },
  headers: { source: 'api' },
  metadata: {
    schemas: {
      key: 1, // Schema ID for key
      value: 2, // Schema ID for value
      headerKey: 3, // Schema ID for header keys (optional)
      headerValue: 4 // Schema ID for header values (optional)
    }
  }
}

Schema Types

AVRO Schema

AVRO schemas are parsed using the avsc library:

// Registry response for AVRO schema
{
  "schemaType": "AVRO",
  "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}"
}

Protocol Buffers Schema

Protocol Buffers schemas require the protobufjs library:

// Registry response for Protobuf schema
{
  "schemaType": "PROTOBUF",
  "schema": "syntax = \"proto3\";\n\nmessage User {\n  int32 id = 1;\n  string name = 2;\n}"
}

JSON Schema

JSON schemas are validated using AJV:

// Registry response for JSON schema
{
  "schemaType": "JSON",
  "schema": "{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"number\"},\"name\":{\"type\":\"string\"}},\"required\":[\"id\",\"name\"]}"
}