Confluent Schema Registry
February 20, 2026 · View on GitHub
⚠️ Experimental API
ConfluentSchemaRegistryand the relatedregistry,beforeSerialization, andbeforeDeserializationhooks are experimental. They do not follow semver and may change in minor/patch releases.
The ConfluentSchemaRegistry class provides integration with Confluent Schema Registry for automatic message serialization and deserialization with schema management.
Features
- Multi-format support: AVRO, Protocol Buffers, and JSON Schema
- Automatic serialization/deserialization: Seamlessly integrates with Producer and Consumer
- Schema caching: Fetched schemas are cached for performance
- Authentication support: Basic and Bearer token authentication
- Type safety: Full TypeScript generics support
- Validation: Optional JSON schema validation on send, automatic on receive
Installation
The Confluent Schema Registry support is included in the main package:
npm install @platformatic/kafka
For Protocol Buffers support, also install:
npm install protobufjs
Constructor
Creates a new schema registry instance with type ConfluentSchemaRegistry<Key, Value, HeaderKey, HeaderValue>.
Options:
| Property | Type | Required | Description |
|---|---|---|---|
url | string | Yes | URL of the Confluent Schema Registry |
auth | object | No | Authentication configuration |
auth.username | string | CredentialProvider | No | Username for Basic authentication |
auth.password | string | CredentialProvider | No | Password for Basic authentication |
auth.token | string | CredentialProvider | No | Token for Bearer authentication |
protobufTypeMapper | function | No | Custom type mapper for Protocol Buffers |
jsonValidateSend | boolean | No | Enable JSON schema validation on send (default: false) |
Basic Usage
AVRO Schema
import { Producer, Consumer } from '@platformatic/kafka'
import { ConfluentSchemaRegistry } from '@platformatic/kafka/registries'
// Create registry instance
const registry = new ConfluentSchemaRegistry({
url: 'http://localhost:8081'
})
// Producer
const producer = new Producer({
clientId: 'avro-producer',
bootstrapBrokers: ['localhost:9092'],
registry
})
await producer.send({
messages: [
{
topic: 'users',
key: { id: 123 },
value: { name: 'John Doe', age: 30 },
metadata: {
schemas: {
key: 1, // AVRO schema ID for key
value: 2 // AVRO schema ID for value
}
}
}
]
})
// Consumer
const consumer = new Consumer({
groupId: 'avro-consumers',
clientId: 'avro-consumer',
bootstrapBrokers: ['localhost:9092'],
registry
})
const stream = await consumer.consume({
topics: ['users']
})
for await (const message of stream) {
// Automatically deserialized from AVRO
console.log('User:', message.value)
}
JSON Schema
const registry = new ConfluentSchemaRegistry({
url: 'http://localhost:8081',
jsonValidateSend: true // Enable validation on send
})
const producer = new Producer({
clientId: 'json-producer',
bootstrapBrokers: ['localhost:9092'],
registry
})
// Will validate against schema before sending
await producer.send({
messages: [
{
topic: 'events',
value: {
eventType: 'user_login',
timestamp: Date.now(),
userId: 'user-123'
},
metadata: {
schemas: {
value: 3 // JSON schema ID
}
}
}
]
})
Protocol Buffers
// Custom type mapper for complex protobuf schemas
function customTypeMapper (id, type, context) {
// Map schema IDs to protobuf message types
const typeMap = {
4: 'com.example.UserKey',
5: 'com.example.UserValue'
}
return typeMap[id] || `${context.topic}-${type}`
}
const registry = new ConfluentSchemaRegistry({
url: 'http://localhost:8081',
protobufTypeMapper: customTypeMapper
})
const producer = new Producer({
clientId: 'protobuf-producer',
bootstrapBrokers: ['localhost:9092'],
registry
})
await producer.send({
messages: [
{
topic: 'users',
key: { id: 123 },
value: { name: 'John', email: 'john@example.com' },
metadata: {
schemas: {
key: 4, // Protobuf schema ID for key
value: 5 // Protobuf schema ID for value
}
}
}
]
})
Authentication
Basic Authentication
const registry = new ConfluentSchemaRegistry({
url: 'http://localhost:8081',
auth: {
username: 'user',
password: 'password'
}
})
Bearer Token Authentication
const registry = new ConfluentSchemaRegistry({
url: 'http://localhost:8081',
auth: {
token: 'your-api-token'
}
})
Dynamic Credentials
// Using CredentialProvider for dynamic credentials
const registry = new ConfluentSchemaRegistry({
url: 'http://localhost:8081',
auth: {
username: async () => getUsername(),
password: async () => getPassword()
}
})
Message Metadata (Producer)
Schema IDs are passed through message metadata when producing:
// Producer message with schema metadata
const message = {
topic: 'my-topic',
key: { id: 123 },
value: { data: 'example' },
headers: { source: 'api' },
metadata: {
schemas: {
key: 1, // Schema ID for key
value: 2, // Schema ID for value
headerKey: 3, // Schema ID for header keys (optional)
headerValue: 4 // Schema ID for header values (optional)
}
}
}
Schema Types
AVRO Schema
AVRO schemas are parsed using the avsc library:
// Registry response for AVRO schema
{
"schemaType": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}"
}
Protocol Buffers Schema
Protocol Buffers schemas require the protobufjs library:
// Registry response for Protobuf schema
{
"schemaType": "PROTOBUF",
"schema": "syntax = \"proto3\";\n\nmessage User {\n int32 id = 1;\n string name = 2;\n}"
}
JSON Schema
JSON schemas are validated using AJV:
// Registry response for JSON schema
{
"schemaType": "JSON",
"schema": "{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"number\"},\"name\":{\"type\":\"string\"}},\"required\":[\"id\",\"name\"]}"
}