Consumer

May 12, 2026 · View on GitHub

Client for consuming messages from Kafka topics with support for consumer groups and transactional message isolation.

The consumer inherits from the Base client.

The complete TypeScript type of the Consumer is determined by the deserializers option.

Events

NamePayload TypeDescription
consumer:group:joinConsumerGroupJoinPayloadEmitted when joining a group.
consumer:group:leaveConsumerGroupLeavePayloadEmitted when leaving a group.
consumer:group:rejoin(none)Emitted when re-joining a group after a rebalance.
consumer:group:rebalanceConsumerGroupRebalancePayloadEmitted when group rebalancing occurs.
consumer:heartbeat:startConsumerHeartbeatPayload?Emitted when starting new heartbeats.
consumer:heartbeat:cancelConsumerHeartbeatPayloadEmitted if a scheduled heartbeat has been canceled.
consumer:heartbeat:endConsumerHeartbeatPayload?Emitted during successful heartbeats.
consumer:heartbeat:errorConsumerHeartbeatErrorPayloadEmitted during failed heartbeats.
consumer:lagOffsetsEmitted during periodic lag monitoring with calculated lag data for topics and partitions.
consumer:lag:errorErrorEmitted when lag calculation fails during monitoring operations.

Constructor

Creates a new consumer with type Consumer<Key, Value, HeaderKey, HeaderValue>.

Options:

PropertyTypeDefaultDescription
groupIdstringConsumer group ID.
groupInstanceIdstringConsumer group instance ID.
autocommitboolean | numbertrueWhether to autocommit consumed messages.

If it is true, then messages are committed immediately.

If it is a number, it specifies how often offsets will be committed. Only the last offset for a topic-partition is committed.

If set to false, then each message read from the stream will have a commit method which should be used to manually commit offsets.
minBytesnumber1Minimum amount of data the brokers should return. The value might not be respected by Kafka.
maxBytesnumber50MBMaximum amount of data the brokers should return. The value might not be respected by Kafka.
maxBytesPerPartitionnumberMaximum amount of data the broker should return for each partition in a stream fetch request. If not set, it defaults to maxBytes. The value might not be respected by Kafka.
maxWaitTimenumber5 secondsMaximum amount of time in milliseconds the broker will wait before sending a response to a fetch request.
isolationLevelnumber1Kind of isolation applied to fetch requests. It can be used to only read producers-committed messages.

The valid values are defined in the FetchIsolationLevels enumeration.
deserializersDeserializers<Key, Value, HeaderKey, HeaderValue>Object that specifies which deserialisers to use.

The object should only contain one or more of the key, value, headerKey and headerValue properties.

Note: Should not be provided when using a registry.
beforeDeserializationBeforeDeserializationHookHook function called before deserialization of each message component (key, value, headers).

Experimental: Does not follow semver and may change in minor/patch releases.

Note: Should not be provided when using a registry.
registryAbstractSchemaRegistry<Key, Value, HeaderKey, HeaderValue>Schema registry instance for automatic deserialization with schema management. See the Confluent Schema Registry guide for details.

Experimental: Does not follow semver and may change in minor/patch releases.

Note: When provided, do not use deserializers or beforeDeserialization.
highWaterMarknumber1024The maximum amount of messages to store in memory before delaying fetch requests. Note that this severely impacts both performance at the cost of memory use.
sessionTimeoutnumber1 minuteAmount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.

This is only relevant when Kafka creates a new group.

Not supported for groupProtocol=consumer, instead it is set with broker configuration property group.consumer.session.timeout.ms.
rebalanceTimeoutnumber2 minutesAmount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.

This is only relevant when Kafka creates a new group.
heartbeatIntervalnumber3 secondsInterval in milliseconds between heartbeats.

Not supported for groupProtocol=consumer, instead it is set with the broker configuration property group.consumer.heartbeat.interval.
groupProtocol'classic' | 'consumer''classic'Group protocol to use. Use 'classic' for the original consumer group protocol and 'consumer' for the new protocol introduced in KIP-848.

The 'consumer' protocol provides server-side partition assignment and incremental rebalancing behavior.
groupRemoteAssignorstringnullServer-side assignor to use for groupProtocol=consumer. Keep it unset to let the server select a suitable assignor for the group. Available assignors: 'uniform' or 'range'.
protocolsGroupProtocolSubscription[]roundrobin, version 1Protocols used by this consumer group.

Each protocol must be an object specifying the name, version and optionally metadata properties.

Not supported for groupProtocol=consumer.
partitionAssignerGroupPartitionsAssignerClient-side partition assignment strategy.

Not supported for groupProtocol=consumer, use groupRemoteAssignor instead.
streamContextunknownDefault opaque user data for MessagesStream instances created by this consumer. It is forwarded to stream-owned ConnectionPool and Connection instances. Kafka never reads, mutates, or interprets this value.

It also supports all the constructor options of Base.

The readonly streamContext getters expose the same opaque values on the consumer instance.

Basic Methods

isActive

Returns true if the consumer is not closed and it is currently an active member of a consumer group.

This method will return false during consumer group rebalancing.

consume<Key, Value, HeaderKey, HeaderValue>(options[, callback])

Creates a stream to consume messages from topics.

The return value is a stream of type MessagesStream<Key, Value, HeaderKey, HeaderValue>.

Creating new Consumer(...) does not start consuming yet. Group join/rejoin and actual fetch/start happen when consume() is called and a stream is created.

Each MessagesStream maintains its own fetch connection pool. This allows multiple active streams from the same consumer to fetch in parallel without head-of-line blocking each other.

Options:

PropertyTypeDefaultDescription
topicsstring[]Topics to consume.
modestringLATESTWhere to start fetching new messages.

The valid values are defined in the MessagesStreamModes enumeration (see below).
fallbackModestringLATESTWhere to start fetching new messages when offset information is lacking for the consumer group.

The valid values are defined in the MessagesStreamFallbackModes enumeration (see below).
maxFetchesnumber0The maximum number of fetches to perform on each partition before automatically closing the stream. Setting to zero will disable automating closing.

Note that Array.fromAsync can be used to create an array out of a stream.
offsetsTopicWithPartitionAndOffset[]When manual offset mode is specified, a list of topic-partition-offset triplets.
onCorruptedMessageCorruptedMessageHandlerA callback that will be invoked if a deserialiser throws an error. If the function returns true, then the stream will throw an error and thus it will be destroyed. Note that the stream will not wait for the commit function to finish so make sure you handle callbacks and promises accordingly.
contextunknownOpaque user data for the created MessagesStream. It is forwarded to the stream-owned ConnectionPool and Connection instances. Kafka never reads, mutates, or interprets this value. When provided, it overrides the consumer constructor streamContext for that stream.

It also accepts all options of the constructor except deserializers and groupId.

For consume(), maxBytesPerPartition controls the per-partition fetch limit used by the created MessagesStream. If omitted, it uses the consumer constructor maxBytesPerPartition; if that is also omitted, it uses the effective maxBytes value for the consume() call.

MessagesStreamModes modes:

NameDescription
LATESTConsume messages received after the started streaming.
EARLIESTConsume messages from the earliest available on the broker.
COMMITTEDResume the consumer group from the offset after the last committed offset.
MANUALConsume messages after the offsets provided in the offsets option.

MessagesStreamFallbackModes modes:

NameDescription
LATESTConsume messages received after the started streaming.
EARLIESTConsume messages from the earliest available on the broker.
FAILThrow an exception.

close([force][, callback])

Closes the consumer and all its connections.

If force is not true, then the method will throw an error if any MessagesStream created from this consumer is still active.

If force is true, then the method will close all MessagesStream created from this consumer.

The return value is void.

Using Schema Registries

⚠️ Experimental API Confluent Schema Registry support and the registry/beforeDeserialization integration are experimental. They do not follow semver and may change in minor/patch releases.

The consumer supports automatic deserialization through schema registries like Confluent Schema Registry. When using a schema registry, messages are automatically deserialized according to their schemas that are fetched based on schema IDs embedded in the message.

Example with Confluent Schema Registry:

import { Consumer } from '@platformatic/kafka'
import { ConfluentSchemaRegistry } from '@platformatic/kafka/registries'

// Create a schema registry instance
const registry = new ConfluentSchemaRegistry({
  url: 'http://localhost:8081',
  auth: {
    username: 'user',
    password: 'password'
  }
})

// Create a consumer with the registry
const consumer = new Consumer({
  groupId: 'my-consumer-group',
  clientId: 'my-consumer',
  bootstrapBrokers: ['localhost:9092'],
  registry // Registry handles deserialization automatically
})

// Create a consumer stream
const stream = await consumer.consume({
  topics: ['events'],
  autocommit: true
})

// Messages are automatically deserialized according to their schemas
for await (const message of stream) {
  console.log('Key:', message.key) // Deserialized key object
  console.log('Value:', message.value) // Deserialized value object
}

await consumer.close()

When using a schema registry:

  • Do not provide the deserializers option - the registry provides its own deserializers
  • Do not provide the beforeDeserialization hook - the registry provides its own hook
  • Messages are automatically deserialized according to their schemas
  • Schema IDs are extracted from the Confluent wire format (5-byte header)
  • The registry fetches and caches schemas as needed
  • JSON schemas are validated on receive (and optionally on send)
  • Supports AVRO, Protocol Buffers, and JSON Schema formats

For more details, see the Confluent Schema Registry documentation.

Rack-aware fetching

When you set the clientRack option, the consumer sends it as the Fetch request rack_id and honors any preferred_read_replica the broker returns. Subsequent fetches for that partition route to the preferred replica until the lease expires, the metadata changes, or the preferred fetch fails. This applies to both the MessagesStream fetch loop and direct Consumer.fetch calls when every requested partition resolves to the same target broker. Otherwise, the original node is used as a fallback.

The lease length matches the consumer's metadataMaxAge. Cached preferred replicas are also dropped when:

  • the broker returns preferred_read_replica = -1 is treated as "no new hint"; an existing cached value is kept until it expires
  • the cached node is no longer in the partition replicas, is offline, or is missing from cluster metadata (also triggers a metadata refresh)
  • the partition is no longer assigned to this consumer after a rebalance
  • a fetch request fails (the cache is cleared for every partition in the failed request, mirroring the Apache Kafka Java client)

For closest-replica fetching to actually pick a same-rack follower, the cluster must support KIP-392 (Apache Kafka 2.4+) and have a replica selector configured (for example, replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector). Without this, the broker will not advertise a preferred replica and the consumer will continue to read from partition leaders.

Advanced Methods

The consumer manages group participation automatically. Some of the APIs are exposed to allow for advanced uses.

fetch<Key, Value, HeaderKey, HeaderValue>(options[, callback])

Fetches messages directly from a specific node without allocating a stream.

The return value is the raw response from Kafka.

Options:

PropertyTypeDefaultDescription
nodenumberNode to consume data from.
topicsFetchRequestTopic[]Topic and partitions to consume.
minBytesnumber1Minimum amount of data the brokers should return. The value might not be respected by Kafka.
maxBytesnumber50MBMaximum amount of data the brokers should return. The value might not be respected by Kafka. This is a key throughput and memory tuning parameter: increase it to allow larger fetch responses and reduce fetch round trips, or decrease it to cap response size and memory pressure.
maxWaitTimenumber5 secondsMaximum amount of time in milliseconds the broker will wait before sending a response to a fetch request.
isolationLevelnumber1Kind of isolation applied to fetch requests. It can be used to only read producers-committed messages.

The valid values are defined in the FetchIsolationLevels enumeration.

Note that all the partitions must be hosted on the node otherwise the operation will fail.

commit(options[, callback])

Commits offsets for consumed messages.

The return value is void.

Options:

PropertyTypeDescription
offsetsCommitOptionsPartition[]Offsets to commit. Each offset is an object containing the topic, partition, offset and leaderEpoch properties.

listOffsets(options[, callback])

Lists available offsets for topics.

If the topics list is empty, then it will fetch the offsets of all topics currently consumed by the consumer.

The return value is a map where keys are in the form $topic:$partition and values are arrays of offsets (where the position represents the partition).

Options:

PropertyTypeDescription
topicsstring[]Topics to check.
partitionRecord<string, number[]>Partitions to get for each topic. By default it fetches all the partitions of the topic.
timestampbigintTimestamp of the offsets to retrieve.

If it is -1, it will return the last available offset; if it is -2, it will return the first available offset.

These special values are also defined in the ListOffsetTimestamps enum.
isolationLevelnumberKind of isolation applied to fetch requests. It can be used to only read producers-committed messages.

The valid values are defined in the FetchIsolationLevels enumeration.

listCommittedOffsets(options[, callback])

Lists offsets committed by the consumer group.

The return value is a map where keys are in the form $topic:$partition and values are arrays of offsets (where the position represents the partition).

Options:

PropertyTypeDescription
topicsstring[]Topics to check.

getLag(options[, callback])

Calculates the consumer lag for specified topics.

If the topics list is empty, then it will calculate the lag of all topics currently consumed by the consumer.

The return value is a map where keys are topic names and values are arrays of lag values (where the position represents the partition). A value of -1n indicates that the consumer is not assigned to that partition.

If a partition is filtered out via the partitions option, then a -2n will be returned for that partition.

Options:

PropertyTypeDescription
topicsstring[]Topics to check lag for.
partitionsRecord<string, number[]>Partitions to get for each topic. By default it fetches all the partitions of the topic.

startLagMonitoring(options, interval)

Initiates periodic consumer lag monitoring at the specified interval in milliseconds.

Consumer lag data is automatically emitted through consumer:lag events at each monitoring cycle.

Monitoring continues until explicitly stopped with stopLagMonitoring or automatically terminates when the consumer closes.

The options parameter accepts the same configuration as getLag.

stopLagMonitoring()

Terminates the periodic consumer lag monitoring that was previously activated with startLagMonitoring.

findGroupCoordinator([callback])

Finds the coordinator for the consumer group.

The return value is the number of the node acting as a coordinator for the group.

joinGroup(options[, callback])

Joins (and creates if necessary) a consumer group.

It returns the group member ID for this consumer.

This method is no-op for groupProtocol=consumer.

Options:

PropertyTypeDefaultDescription
sessionTimeoutnumber1 minuteAmount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.

This is only relevant when Kafka creates a new group.
rebalanceTimeoutnumber2 minutesAmount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.

This is only relevant when Kafka creates a new group.
heartbeatIntervalnumber3 secondsInterval in milliseconds between heartbeats.
protocolsGroupProtocolSubscription[]roundrobin, version 1Protocols used by this consumer group.

Each protocol must be an object specifying the name, version and optionally metadata properties.

leaveGroup([force])

Leaves a consumer group.

If force is not true, then the method will throw an error if any MessagesStream created from this consumer is still active.

The return value is void.

This method is no-op for groupProtocol=consumer.

FAQs

My consumer is not receiving any message when the application restarts

If you use a fixed consumer group ID (or one that is persisted between restarts), you might encounter a rebalance issue if the application is closed without properly shutting down the consumer or issuing a leaveGroup request.

This commonly happens during development (for example, when you terminate the process with Ctrl+C or use a file watcher), or if the application crashes in production.

When the application exits without leaving the group, Kafka still considers the consumer as potentially alive. Upon restarting with the same group ID but a different member ID, Kafka triggers a group rebalance. Since the previous member did not formally leave, Kafka waits for the full session timeout (1 minute by default) to determine whether the old member is still active.

Only after this timeout expires and the member is considered dead will Kafka complete the rebalance and assign partitions to the new instance. During this period, the restarted consumer cannot consume any messages, even if it is the only member in the group.

If you encounter this issue, there are two ways to easily avoid it:

  1. If don't really use consumer groups, just use a random consumer group (like crypto.randomUUID) as your group ID.
  2. If you use consumer groups, manually call joinGroup, rather than relying on consume to handle it automatically.

How do blocking operations impact consuming of messages?

Kafka requires consumers to regularly report their status using a dedicated heartbeat API. If a consumer fails to send heartbeats within the configured session timeout, Kafka considers it dead. When this happens, the consumer must rejoin the group, during which time it cannot consume messages. Moreover, all other group members must also rejoin due to the triggered rebalance, resulting in a temporary halt in message consumption across the group.

We've identified three common scenarios that can lead to missed heartbeats while consuming messages:

  • Long fetch durations – If the consumer fetches messages with a maxWaitTime that exceeds the sessionTimeout, heartbeats may not be processed—even if they are sent—because the fetch API is blocking. @platformatic/kafka solves this by using separate fetch connections per MessagesStream, ensuring that other APIs (such as heartbeats) remain unaffected and allowing parallel fetches across streams.

  • Complex asynchronous message processing – When user code performs long or delayed async tasks while handling messages, it may block the driver from sending timely heartbeats. If the client does not decouple fetching from processing, heartbeats can be lost. Solutions like KafkaJS require users to manually send heartbeats during processing, which is error-prone. In contrast, @platformatic/kafka fully decouples message fetching from processing to prevent this issue entirely.

  • CPU-intensive operations – In Node.js, blocking the event loop with heavy computations prevents any I/O operations, including heartbeats. If such operations are unavoidable, the recommended solution is to offload them to a separate thread using Node.js Worker Threads.

How do I handle reconnections when the connections fail or the brokers crash?

By default, @platformatic/kafka retries an operation three times with a waiting time of 250 milliseconds (these are the default of the retries and retryDelay options). This is to ensure that connection error are properly reported to the user. If the operation fails, the error event is emitted on the stream. After that, the stream is no longer usable, but the consumer still is (thanks to the internal decoupled architecture) and therefore you just have to invoke consume again to obtain a new stream and start processing messages again.

If you instead want it to handle reconnections automatically, you just have to change the values of retries to a higher value or true to set it to "infinite retries". Also, increase retryDelay to at least a second to avoid opening too many TCP connections in short period of time if the broker is down.

You can change those values in the Consumer constructor (and, similarly, to any other @platformatic/kafka client).

Why does my server rejects my request even if SASL authentication succeeded?

Some Kafka servers (such as those using JWT tokens) may return a successful exit code (0) even when invalid credentials are provided, storing authentication information only in auth bytes rather than the exit code.

In such cases, you can provide the sasl.authBytesValidator option to validate authentication information from the auth bytes response rather than relying on the exit code alone.

For JWT tokens we already provide saslOAuthBearer.jwtValidateAuthenticationBytes.