Other APIs and types

April 28, 2026 ยท View on GitHub

Connection

Represents a single low-level broker connection.

It is returned by Base.connectToBrokers() and exposed by ConnectionPool operations.

Properties

NameTypeDescription
hoststring | undefinedCurrent target host while connecting or connected.
portnumber | undefinedCurrent target port while connecting or connected.
instanceIdnumberNumeric identifier for this connection instance.
ownerIdnumber | undefinedOptional owner instance ID of the component that created the connection.
statusConnectionStatusValueCurrent connection state.
contextunknownOpaque user context forwarded from the creating component.
socketSocketThe underlying Node.js socket.

Methods

isConnected()

Returns true when the connection status is CONNECTED.

connect(host, port[, callback])

Opens the socket connection to a broker and performs SASL authentication if configured.

ready([callback])

Waits until the connection becomes usable or fails while waiting.

close([callback])

Gracefully closes the socket and resolves once the connection is fully closed.

send(apiKey, apiVersion, createPayload, responseParser, hasRequestHeaderTaggedFields, hasResponseHeaderTaggedFields, callback)

Sends a low-level Kafka protocol request over the connection.

  • apiKey: Kafka API key.
  • apiVersion: Kafka API version.
  • createPayload: function returning a Writer with the request payload.
  • responseParser: function that parses the broker response.
  • hasRequestHeaderTaggedFields: whether the request header includes tagged fields.
  • hasResponseHeaderTaggedFields: whether the response header includes tagged fields.
  • callback: completion callback receiving the parsed response.

reauthenticate()

Starts SASL re-authentication when the current connection uses SASL and the broker session requires renewal.

Events

NamePayload TypeDescription
connecting(none)Emitted when a socket connection attempt starts.
timeoutTimeoutErrorEmitted when the connection attempt or request handling times out.
errorErrorEmitted when the connection encounters an error.
connect(none)Emitted when the connection is established. It can fire again after re-authentication.
ready(none)Emitted when the connection is ready to send Kafka requests.
close(none)Emitted when the underlying socket is closed.
closing(none)Emitted when the connection starts shutting down.
sasl:handshakestring[]Emitted when SASL handshake completes with the list of supported mechanisms.
sasl:authenticationBuffer | undefinedEmitted when SASL authentication completes.
sasl:authentication:extendedBuffer | undefinedEmitted when SASL authentication is refreshed or extended.
drain(none)Emitted when the socket becomes writable again after backpressure.

ConnectionPool

Tracks and reuses Connection instances keyed by broker host and port.

It is exposed as the default pool through the Base client connections getter and is also used internally by other components.

Properties

NameTypeDescription
instanceIdnumberNumeric identifier for this pool instance.
ownerIdnumber | undefinedOptional owner instance ID of the component that created the pool.
contextunknownOpaque user context forwarded to created connections.

Methods

[Symbol.iterator]()

Returns an iterator of [brokerKey, connection] entries for the currently pooled connections.

get(broker[, callback])

Returns an existing connection for the broker or creates a new one.

getFirstAvailable(brokers[, callback])

Tries the provided brokers in order and returns the first successful connection.

getEstablishedConnection(broker)

Returns the currently pooled Connection for the broker, if present.

has(broker)

Returns true when the pool currently contains a connection for the broker.

isActive()

Returns true when the pool contains at least one connection.

isConnected()

Returns true when the pool contains at least one connection and all pooled connections are connected.

close([callback])

Closes and removes all pooled connections.

Events

NamePayload TypeDescription
connectingConnectionPoolEventPayloadEmitted when the pool starts opening a connection to a broker.
failedConnectionPoolEventPayloadEmitted when a broker connection attempt fails.
connectConnectionPoolEventPayloadEmitted when a broker connection is established.
sasl:handshakeConnectionPoolEventPayload & { mechanisms: string[] }Emitted when SASL handshake completes and the broker mechanisms are known.
sasl:authenticationConnectionPoolEventPayload & { authentication?: Buffer }Emitted when SASL authentication completes.
sasl:authentication:extendedConnectionPoolEventPayload & { authentication?: Buffer }Emitted when SASL authentication is refreshed or extended after the initial authentication.
disconnectConnectionPoolEventPayloadEmitted when a pooled connection closes and is removed from the pool.
drainConnectionPoolEventPayloadEmitted when a pooled connection becomes writable again after backpressure.

ConnectionPoolEventPayload contains:

  • broker: the broker { host, port } associated with the event.
  • connection: the Connection instance associated with the event.

Messages

Message<Key, Value, HeaderKey, HeaderValue>

Represents a message that been consumed from Kafka by using a Consumer.

The types of the key, value and headers fields are determined by the current serialisation settings of the Producer or the Consumer.

PropertyTypeDescription
topicstringThe topic of the message.
partitionnumberThe topic's partition of the message.
keyKeyThe key of the message.
valueValueThe value of the message.
timestampbigintThe timestamp of the message. When producing, it defaults to the current timestamp.
headersMap<HeaderKey, HeaderValue>A map with the message headers.
offsetbigintThe message offset
commit() => PromiseA function to commit the offset. This is a no-op if consumer's autocommit option was not false.

Message also provides a .toJSON() method for debugging and logging.

MessageToProduce<Key, Value, HeaderKey, HeaderValue>

Represents a message that is being produced to Kafka via using a Producer.

All fields, except topic and value, are optional.

The types of the key, value and headers fields are determined by the current serialisation settings of the Producer or the Consumer.

PropertyTypeDescription
topicstringThe topic of the message.
partitionnumberThe topic's partition of the message.
keyKeyThe key of the message.
valueValueThe value of the message.
timestampbigintThe timestamp of the message. When producing, it defaults to the current timestamp.
headersMap<HeaderKey, HeaderValue> | Record<HeaderKey, HeaderValue>A map or plain Javascript object with the message headers.

MessagesStream<Key, Value, HeaderKey, HeaderValue>

It is a Node.js Readable stream returned by the Consumer consume method.

Do not try to create this manually.

The readonly context getter exposes the opaque value supplied through Consumer.consume({ context }) or the consumer streamContext default.

Properties

NameTypeDescription
consumerConsumer<Key, Value, HeaderKey, HeaderValue>The parent consumer that created the stream.
connectionsConnectionPoolThe fetch-specific connection pool owned by the stream.
contextunknownThe opaque stream context value.
offsetsToFetchMap<string, bigint>Internal next offsets keyed as $topic:$partition.
offsetsToCommitMap<string, CommitOptionsPartition>Offsets queued for commit.
offsetsCommittedMap<string, bigint>Last committed offsets tracked by the stream.
committedOffsetsMap<string, bigint>Deprecated alias for offsetsCommitted.

Methods

close([callback])

Closes the stream, stops autocommit timers, flushes pending autocommit work when needed, and closes the stream-owned connection pool.

isActive()

Returns true when the stream is open and its parent consumer is currently active.

isConnected()

Returns true when the stream is open and its parent consumer is currently connected.

pause()

Pauses message delivery and marks the stream as explicitly paused.

resume()

Resumes message delivery and restarts the fetch loop when resuming from an explicit pause.

Events

NamePayload TypeDescription
fetch(none)Emitted each time the stream schedules or completes a fetch cycle.
autocommit(error: Error | null, offsets?: CommitOptionsPartition[])Emitted after an automatic commit attempt, with either the error or the committed offsets.
offsets(none)Emitted after the stream refreshes its internal fetch offsets.

ProducerStream<Key, Value, HeaderKey, HeaderValue>

It is a Node.js Writable stream returned by the Producer asStream method.

Do not try to create this manually.

It operates in object mode and batches writes before forwarding them to the parent producer.

Properties

NameTypeDescription
producerProducer<Key, Value, HeaderKey, HeaderValue>The parent producer that created the stream.
instancenumberNumeric identifier for this producer stream.

Methods

close([callback])

Ends the writable stream, flushes any buffered messages, and resolves when shutdown completes.

Events

NamePayload TypeDescription
flush{ batchId: number, count: number, duration: number, result: ProduceResult }Emitted after a buffered batch has been sent.
delivery-reportProducerStreamReport | ProducerStreamMessageReport<Key, Value, HeaderKey, HeaderValue>Emitted when delivery reporting is enabled. In BATCH mode it reports one event per batch; in MESSAGE mode it reports one event per message.

ClusterMetadata

Metadata about the Kafka cluster. It is returned by the Base client, which is the base class for the Producer, Consumer and Admin clients.

PropertyTypeDescription
idstringCluster ID
brokersMap<number, Broker>Map of brokers. The keys are node IDs, while the values are objects with host and port properties.
topicsMap<string, ClusterTopicMetadata>Map of topics. The keys are the topics, while the values contain partition information.
lastUpdatenumberTimestamp of the metadata

Utilities

debugDump(...values)

Debug/logger utility to inspect any object.

import { debugDump } from '@platformatic/kafka'

debugDump('received-message', message)

Serialisation and Deserialisation

stringSerializer and stringDeserializer

Courtesy string serialisers implementing Serializer<string> and Deserialier<string>.

jsonSerializer and jsonDeserializer

Courtesy JSON serialisers implementing Serializer<T = object> and Deserializer<T = object>.

stringSerializers and stringDeserializers

Courtesy serializers and deserializers objects using stringSerializer or stringDeserializer ready to be used in Producer or Consumer.

serializersFrom and deserializersFrom

Courtesy methods to create a Serializers<T, T, T, T> out of a single Serializer<T> or a Deserializers<T, T, T, T> out of a single Deserializer<T>.

For instance, the following two snippets are equivalent:

import { Producer } from '@platformatic/kafka'

function serialize (source: YourType): Buffer {
  return Buffer.from(JSON.stringify(source))
}

const producer = new Producer({
  clientId: 'my-producer',
  bootstrapBrokers: ['localhost:9092'],
  serializers: {
    key: serialize,
    value: serialize,
    headerKey: serialize,
    headerValue: serialize
  }
})
import { Producer, serializersFrom } from '@platformatic/kafka'

function serialize (source: YourType): Buffer {
  return Buffer.from(JSON.stringify(source))
}

const producer = new Producer({
  clientId: 'my-producer',
  bootstrapBrokers: ['localhost:9092'],
  serializers: serializersFrom(serialize)
})