Producer

May 4, 2026 · View on GitHub

Client for producing messages to Kafka topics with support for idempotent production and transactions.

The producer inherits from the Base client.

The complete TypeScript type of the Producer is determined by the serializers option.

The producer supports idempotent production and transactional message production. For detailed information about transactions, see the Transactions guide.

Constructor

Creates a new producer with type Producer<Key, Value, HeaderKey, HeaderValue>.

Options:

PropertyTypeDescription
producerIdbigintProducer ID.
producerEpochnumberProducer epoch.
idempotentbooleanIdempotency of the producer. Required for transactions.
transactionalIdstringTransactional ID for the producer. If not specified, a random UUID is generated. Required when using transactions to ensure the same ID is used across restarts.
acksnumberAcknowledgement to wait before returning.

Valid values are defined in the ProduceAcks enumeration.
compressionstringCompression algorithm to use before sending messages to the broker.

Valid values are: snappy, lz4, gzip, zstd
partitioner(message: MessageToProduce<Key, Value, HeaderKey, HeaderValue>, key: Buffer | undefined, context?: PartitionerContext) => numberPartitioner to use to assign a partition to messages that lack it.

It is a function that receives a message and an optional serialized key and should return the partition number.
autocreateTopicsbooleanWhether to ask brokers to auto-create missing topics during produce requests.
repeatOnStaleMetadatabooleanWhether to retry a produce operation when the system detects outdated topic or broker information.

Default is true.
serializersSerializers<Key, Value, HeaderKey, HeaderValue>Object that specifies which serialisers to use.

The object should only contain one or more of the key, value, headerKey and headerValue properties.

Note: Should not be provided when using a registry.
beforeSerializationBeforeSerializationHook<Key, Value, HeaderKey, HeaderValue>Hook function called before serialization of each message component (key, value, headers).

Experimental: Does not follow semver and may change in minor/patch releases.

Note: Should not be provided when using a registry.
registryAbstractSchemaRegistry<Key, Value, HeaderKey, HeaderValue>Schema registry instance for automatic serialization with schema management. See the Confluent Schema Registry guide for details.

Experimental: Does not follow semver and may change in minor/patch releases.

Note: When provided, do not use serializers or beforeSerialization.

It also supports all the constructor options of Base.

When idempotent is enabled, retries defaults to Number.MAX_SAFE_INTEGER when not set, and it is raised to Number.MAX_SAFE_INTEGER for values less than or equal to 1 to avoid unsafe idempotent retry behavior.

The default partitioning behavior for keyed messages uses the library's own murmur2 normalization. If you need compatibility with Java/kafkajs partitioning, use compatibilityPartitioner:

import { Producer, stringSerializers, compatibilityPartitioner } from '@platformatic/kafka'

const producer = new Producer({
  clientId: 'my-producer',
  bootstrapBrokers: ['localhost:9092'],
  serializers: stringSerializers,
  partitioner: compatibilityPartitioner
})

The compatibilityPartitioner implementation uses a Java-style toPositive step before partition normalization.

Custom partitioners receive a PartitionerContext with the producer's currentMetadata when metadata has already been loaded.

Notes: zstd is not available in node v20

Basic Methods

send<Key, Value, HeaderKey, HeaderValue>(options[, callback])

Sends one or more messages to Kafka.

When acks is not ProduceAcks.NO_RESPONSE, then the return value is an object with the property offsets containing a list of written topic-partition-offset triplets.

When acks is ProduceAcks.NO_RESPONSE, then the return value is an object with the property unwritableNodes containing a list of nodes which are currently busy and should wait for a client:broker:drain event before continuing.

If a send operation fails after producing some messages, the thrown error includes a produced property with the same shape as ProduceResult. Use error.produced.offsets to inspect successfully acknowledged messages, or error.produced.unwritableNodes when acks is ProduceAcks.NO_RESPONSE.

Options:

PropertyTypeDescription
messagesMessageToProduce<Key, Value, HeaderKey, HeaderValue>[]The messages to send.

It also accepts all options of the constructor except serializers.

getSendTopicPartitions<Key, Value, HeaderKey, HeaderValue>(options)

Synchronously returns the topic partitions that would be used for the provided messages without sending them.

The return value is a Map<string, Set<number>>, where each key is a topic name and each value is the set of partitions selected for that topic.

This method only computes the topic-partition mapping from the provided messages. It does not fetch broker metadata, open connections, or send messages.

This method serializes message keys and headers as needed to resolve partitions, so serializer errors are reported the same way as send().

Options: accepts the same options as send().

Example:

const topicPartitions = producer.getSendTopicPartitions({
  messages: [{ topic: 'events', key: Buffer.from('user-1'), value: Buffer.from('login') }]
})

console.log(topicPartitions.get('events'))

getSendBrokers<Key, Value, HeaderKey, HeaderValue>(options[, callback])

Returns the brokers that would receive the provided messages without sending them.

The return value is a record keyed by topic:partition, where each value is the broker responsible for that topic partition.

This method uses getSendTopicPartitions() to resolve the topic partitions before looking up their leaders.

Options: accepts the same options as send().

Example:

const brokers = await producer.getSendBrokers({
  messages: [{ topic: 'events', key: Buffer.from('user-1'), value: Buffer.from('login') }]
})

console.log(brokers['events:0'])

getSendConnections<Key, Value, HeaderKey, HeaderValue>(options[, callback])

Prepares the broker connections needed to send the provided messages.

The return value is a record keyed by topic:partition, where each value is the open connection to the broker responsible for that topic partition.

Use this method to warm up the producer connections for a set of messages before calling send().

Options: accepts the same options as send().

Example:

const connections = await producer.getSendConnections({
  messages: [{ topic: 'events', key: Buffer.from('user-1'), value: Buffer.from('login') }]
})

console.log(connections['events:0'])

asStream<Key, Value, HeaderKey, HeaderValue>([options])

Creates a ProducerStream writable stream that batches messages and sends them through this producer.

The stream lifecycle is tracked by the producer. If one or more streams are still active, producer.close() will fail unless force is set to true.

streamsCount

Returns the number of currently active ProducerStream instances created by this producer.

beginTransaction([options, callback])

Begins a new transaction and returns a Transaction object.

The producer must be configured with idempotent: true to use transactions. Only one transaction can be active at a time per producer.

Options: accepts all options from the constructor except serializers.

The return value is a Transaction object. See the Transactions guide for detailed usage.

Example:

const producer = new Producer({
  clientId: 'my-producer',
  bootstrapBrokers: ['localhost:9092'],
  idempotent: true,
  transactionalId: 'my-transaction-id'
})

const transaction = await producer.beginTransaction()
await transaction.send({ messages: [{ topic: 'my-topic', value: 'message' }] })
await transaction.commit()

close([force][, callback])

Closes the producer and all its connections.

If force is not true, then the method throws an error if any ProducerStream created from this producer is still active.

If force is true, then the method closes all active ProducerStream instances before closing the producer.

The return value is void.

Using Producers as streams.

producer.asStream() returns a ProducerStream, a writable stream in object mode.

This is useful when:

  • you already have a Node.js stream pipeline,
  • you want automatic batching (batchSize / batchTime),
  • you want stream backpressure to naturally regulate producers.

Stream options

asStream([options]) accepts:

  • all send() produce options except messages (for example acks, compression, partitioner, autocreateTopics, repeatOnStaleMetadata),
  • plus stream-specific options:
    • batchSize (max messages per flush),
    • batchTime (max wait before timed flush),
    • reportMode (NONE, BATCH, MESSAGE),
    • highWaterMark (writable stream high-water mark).

Lifecycle and shutdown

The producer tracks every ProducerStream it creates.

You can inspect active stream count through producer.streamsCount.

Shutdown rules:

  1. await producer.close() fails if one or more producer streams are still active.
  2. await producer.close(true) force-closes active producer streams, then closes the producer.

Recommended patterns:

  • Explicit close (preferred): close streams first, then producer.
  • Force close: useful during shutdown paths where you cannot reliably coordinate all stream owners.

Example: piping with pipeline

import { Readable } from 'node:stream'
import { pipeline } from 'node:stream/promises'
import { Producer } from '@platformatic/kafka'

const producer = new Producer({
  clientId: 'my-producer',
  bootstrapBrokers: ['localhost:9092']
})

const sink = producer.asStream({
  batchSize: 500,
  batchTime: 25
})

await pipeline(
  Readable.from(
    [
      { topic: 'events', key: Buffer.from('k1'), value: Buffer.from('v1') },
      { topic: 'events', key: Buffer.from('k2'), value: Buffer.from('v2') }
    ],
    { objectMode: true }
  ),
  sink
)

// pipeline() ends the sink; then close producer
await producer.close()

Example: manual writes + delivery reports

import { Producer, ProducerStreamReportModes, stringSerializers } from '@platformatic/kafka'

const producer = new Producer<string, string, string, string>({
  clientId: 'my-producer',
  bootstrapBrokers: ['localhost:9092'],
  serializers: stringSerializers
})

const stream = producer.asStream({
  batchSize: 100,
  batchTime: 50,
  reportMode: ProducerStreamReportModes.BATCH
})

stream.on('delivery-report', report => {
  // In BATCH mode: { batchId, count, result }
  console.log(report)
})

stream.on('flush', info => {
  // { batchId, count, duration, result }
  console.log('flushed', info.count)
})

stream.write({ topic: 'events', key: 'user-1', value: 'login' })
stream.write({ topic: 'events', key: 'user-2', value: 'logout' })

await stream.close()
await producer.close()

// Alternative shutdown:
// await producer.close(true)

Using Schema Registries

⚠️ Experimental API Confluent Schema Registry support and the registry/beforeSerialization integration are experimental. They do not follow semver and may change in minor/patch releases.

The producer supports automatic serialization through schema registries like Confluent Schema Registry. When using a schema registry, messages are automatically serialized according to their schemas and schema IDs are included in the message headers.

Example with Confluent Schema Registry:

import { Producer } from '@platformatic/kafka'
import { ConfluentSchemaRegistry } from '@platformatic/kafka/registries'

// Create a schema registry instance
const registry = new ConfluentSchemaRegistry({
  url: 'http://localhost:8081',
  auth: {
    username: 'user',
    password: 'password'
  }
})

// Create a producer with the registry
const producer = new Producer({
  clientId: 'my-producer',
  bootstrapBrokers: ['localhost:9092'],
  registry // Registry handles serialization automatically
})

// Send messages with schema IDs in metadata
await producer.send({
  messages: [
    {
      topic: 'events',
      key: { id: 123 },
      value: { name: 'John', action: 'login' },
      metadata: {
        schemas: {
          key: 1, // Schema ID for key
          value: 2 // Schema ID for value
        }
      }
    }
  ]
})

await producer.close()

When using a schema registry:

  • Do not provide the serializers option - the registry provides its own serializers
  • Do not provide the beforeSerialization hook - the registry provides its own hook
  • Messages are automatically serialized according to their schemas
  • Schema IDs must be provided in the message metadata
  • The registry fetches and caches schemas as needed
  • Supports AVRO, Protocol Buffers, and JSON Schema formats

For more details, see the Confluent Schema Registry documentation.

Advanced Methods

The producer manages auxiliary operations automatically. Some of the APIs are exposed to allow for advanced uses.

initIdempotentProducer<Key, Value, HeaderKey, HeaderValue>(options[, callback])

Initialises an idempotent producer. It accepts all options of the constructor except serializers.

The return value is an object containing the producerId and producerEpoch values returned from the broker.