Base

May 12, 2026 ยท View on GitHub

This is the base class for all other clients (Producer, Consumer and Admin).

Unless you only care about cluster metadata, it is unlikely that you would ever initialise an instance of this.

Events

NameDescription
client:broker:connectEmitted when connecting to a broker.
client:broker:disconnectEmitted when disconnecting from a broker.
client:broker:failedEmitted when a broker connection fails.
client:broker:drainEmitted when a broker is ready to be triggered by requests.
client:broker:sasl:handshakeEmitted when SASL handshake with a broker is completed.
client:broker:sasl:authenticationEmitted when SASL authentication to a broker is completed.
client:broker:sasl:authentication:extendedEmitted when SASL authentication to a broker is extended by performing a new authentication.
client:metadataEmitted when metadata is retrieved.
client:closeEmitted when client is closed.

Constructor

Creates a new base client.

PropertyTypeDefaultDescription
clientIdstringClient ID.
clientRackstringClient rack identifier sent by consumers as the Fetch and ConsumerGroupHeartbeat rack ID.
bootstrapBrokers(Broker | string)[]Bootstrap brokers.

Each broker can be either an object with host and port properties or a string in the format $host:$port.
timeoutnumber5 secondsTimeout in milliseconds for Kafka requests that support the parameter.
retriesnumber | boolean3Number of times to retry an operation before failing. true means "infinity", while false means 0
retryDelaynumber | function1000Amount of time in milliseconds to wait between retries, or a function to calculate custom delays. See section below.
metadataMaxAgenumber5 secondsMaximum lifetime of cluster metadata.
autocreateTopicsbooleanfalseWhether to autocreate missing topics during metadata retrieval.
strictbooleanfalseWhether to validate all user-provided options on each request.

This will impact performance so we recommend disabling it in production.
metricsobjectA Prometheus configuration. See the Metrics section for more information.
connectTimeoutnumber5000Client connection timeout.
requestTimeoutnumber30000Local timeout in milliseconds while waiting for a response to an in-flight request.
maxInflightsnumber5Amount of request to send in parallel to Kafka without awaiting for responses, when allowed from the protocol.
handleBackPressurebooleanfalseIf set to true, the client will respect the return value of socket.write and wait for a drain even before resuming sending of requests.
tlsTLSConnectionOptionsConfigures TLS for broker connections. See section below.
sslTLSConnectionOptionsAlias for tls. Configures TLS for broker connections. See section below. If both are provided, tls overrides this.
tlsServerNameboolean | stringA TLS servername to use when connecting. When set to true it will use the current target host.
saslSASLOptionsConfigures SASL authentication. See section below.
contextunknownOpaque user data forwarded to internally created ConnectionPool and Connection instances. Kafka never reads, mutates, or interprets this value.

The readonly context getter exposes the same opaque value on the client instance.

The readonly currentMetadata getter exposes the currently cached ClusterMetadata, or undefined if metadata has not been loaded yet. It does not fetch or refresh metadata.

Methods

connectToBrokers([nodeIds][, callback])

Establish a connection to one or more brokers in the cluster.

The return value is a Map<number, Connection> object.

PropertyTypeDefaultDescription
nodesnumber[] | nullnullThe nodes to connect to. Valid IDs can be obtained via the metadata method and invalid IDs are ignored.

metadata(options[, callback])

Fetches information about the cluster and the topics.

The return value is a ClusterMetadata object.

PropertyTypeDefaultDescription
topicsstring[]Topics to get.
forceUpdatebooleanfalseWhether to retrieve metadata even if the in-memory cache is still valid.
autocreateTopicsbooleanfalseWhether to autocreate missing topics.
metadataMaxAgenumber5 secondsMaximum lifetime of cluster metadata.

close([callback])

Closes the client and all its connections.

The return value is void.

isActive

Returns true if the client is not closed.

isConnected

Returns true if all client's connections are currently connected and the client is connected to at least one broker.

connections

Returns the client's default broker connection pool.

clearMetadata

Clear the current metadata.

Custom Retry Delay Function

The retryDelay option can accept either a number (for fixed delay) or a function for dynamic retry delays. This allows you to implement custom retry strategies such as exponential backoff, jitter, or any other retry pattern.

When using a function, it receives the following parameters:

ParameterTypeDescription
clientobjectThe client instance performing the retry
operationIdstringA unique identifier for the operation being retried
attemptnumberThe current attempt number (starts from 1)
retriesnumberThe maximum number of retries configured
errorErrorThe error that caused the retry

The function must return a number representing the delay in milliseconds before the next retry attempt.

timeout vs requestTimeout

Both options are valid and control different things:

  • timeout: request-level Kafka timeout that is sent to broker APIs that expose a timeout field.
  • requestTimeout: client-side timeout that limits how long this client waits for a response on an in-flight request.

In practice, requestTimeout is the guard for TimeoutError: Request timed out coming from the connection layer.

Connecting to Kafka via TLS connection

To connect to a Kafka via TLS connection, simply pass all relevant options in the tls options when creating any subclass of Base. Example:

import { readFile } from 'node:fs/promises'
import { Producer, stringSerializers } from '@platformatic/kafka'

const producer = new Producer({
  clientId: 'my-producer',
  bootstrapBrokers: ['localhost:9092'],
  serializers: stringSerializers,
  tls: {
    rejectUnauthorized: false,
    cert: await readFile(resolve(import.meta.dirname, './ssl/client.pem')),
    key: await readFile(resolve(import.meta.dirname, './ssl/client.key'))
  }
})

Connecting to Kafka via SASL

To connect to a Kafka via SASL authentication, simply pass all relevant options in the sasl options when creating any subclass of Base. Example:

import { readFile } from 'node:fs/promises'
import { Producer, stringSerializers } from '@platformatic/kafka'

const producer = new Producer({
  clientId: 'my-producer',
  bootstrapBrokers: ['localhost:9092'],
  serializers: stringSerializers,
  sasl: {
    mechanism: 'PLAIN', // Also SCRAM-SHA-256, SCRAM-SHA-512 and OAUTHBEARER are supported
    // username, password, token and oauthBearerExtensions can also be (async) functions returning a value
    username: 'username', // This is used from PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512
    password: 'password', // This is used from PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512
    token: 'token', // This is used from OAUTHBEARER
    oauthBearerExtensions: {}, // This is used from OAUTHBEARER to add extension according to RFC 7628
    // This is needed if your Kafka server returns a exitCode 0 when invalid credentials are sent and only stores
    // authentication information in auth bytes.
    //
    // A good example for this is OAuthBearerValidatorCallbackHandler, for which we provide saslOAuthBearer.jwtValidateAuthenticationBytes.
    authBytesValidator: (_authBytes, cb) => cb(null)
  }
})

Connecting to Kafka via SASL using a custom authenticator

For advanced use cases where you need full control over the SASL authentication process, you can provide a custom authenticate function in the sasl options. This allows you to implement custom authentication flows, handle complex credential management, or integrate with external authentication systems.

Example:

import { Producer, stringSerializers } from '@platformatic/kafka'

const producer = new Producer({
  clientId: 'my-producer',
  bootstrapBrokers: ['localhost:9092'],
  serializers: stringSerializers,
  sasl: {
    mechanism: 'PLAIN',
    authenticate: async (
      mechanism,
      connection,
      authenticate,
      usernameProvider,
      passwordProvider,
      tokenProvider,
      callback
    ) => {
      try {
        // Custom logic to retrieve or generate credentials
        const username = typeof usernameProvider === 'function' ? await usernameProvider() : usernameProvider
        const password = typeof passwordProvider === 'function' ? await passwordProvider() : passwordProvider

        // Perform the SASL authentication
        const authData = Buffer.from(`\u0000${username}\u0000${password}`)
        const response = await authenticate({
          authBytes: authData
        })

        callback(null, response)
      } catch (err) {
        callback(err)
      }
    }
  }
})

The authenticate function receives the following parameters:

  • mechanism: The SASL mechanism being used (e.g., 'PLAIN', 'SCRAM-SHA-256')
  • connection: The broker connection being authenticated
  • authenticate: The SASL authentication API function to send auth bytes to the server
  • usernameProvider: The username (string or async function) from the sasl options
  • passwordProvider: The password (string or async function) from the sasl options
  • tokenProvider: The token (string or async function) from the sasl options
  • callback: A callback function to call with the authentication result

Important: The authenticate function should never throw exceptions, especially when using async functions. The function is not awaited and exceptions are not handled, which can lead to memory leaks, resource leaks, and unexpected behavior. Always wrap your code in a try-catch block and pass errors to the callback instead.