Configuration

August 27, 2023 ยท View on GitHub

Listeners

NameDescription
kafkaListenersComma-separated list of URIs that we will listen on and the listener names.
e.g. PLAINTEXT://localhost:9092,SSL://localhost:9093.
Each URI's scheme represents a listener name if kafkaProtocolMap is configured.
Otherwise, the scheme must be a valid protocol in [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL].
If the hostname is not set, it will be bound to the default interface.
kafkaProtocolMapComma-separated map of listener name and protocol.
e.g. PRIVATE:PLAINTEXT,PRIVATE_SSL:SSL,PUBLIC:PLAINTEXT,PUBLIC_SSL:SSL.
listenersDeprecated. kafkaListeners is used.
kafkaAdvertisedListenersListeners to publish to ZooKeeper for clients to use.
The format is the same as kafkaListeners.

NOTE

Among all configurations, only kafkaListeners or listeners (deprecated) is required.

To support multiple listeners, you need to specify different listener names in kafkaListeners and kafkaAdvertisedListeners. Then, map the listener name to the proper protocol in kafkaProtocolMap.

For example, assuming you need to listen on port 9092 and 19092 with the PLAINTEXT protocol, the associated names are kafka_internal and kafka_external. Then you need to add the following configurations:

kafkaListeners=kafka_internal://0.0.0.0:9092,kafka_external://0.0.0.0:19092
kafkaProtocolMap=kafka_internal:PLAINTEXT,kafka_external:PLAINTEXT
kafkaAdvertisedListeners=kafka_internal://localhost:9092,kafka_external://localhost:19092

In the above example,

  • kafkaListener is split into multiple tokens by a comma (,), the token is in a format of <listener-name>://<host>:<port>.
  • kafkaProtocolMap is split into multiple tokens by a comma (,), the token is in a format of <listener-name>:<protocol>.
  • kafkaAdvertisedListeners is split into multiple tokens by a comma(,), the token is in a format of <listener-name>:<scheme>://<host>:<port>.

Logger

KoP shares the same configuration files with the Pulsar broker, e.g. conf/broker.conf or conf/standalone.conf. The log configurations can be configured in conf/log4j2.yaml file like below:

Logger:
  - name: io.streamnative.pulsar.handlers.kop
    level: warn
    additivity: false
    AppenderRef:
      - ref: Console

Namespace

Pulsar is a multi-tenant system that requires users to specify the tenant and namespace. While, most Kafka users just specify the short topic name. So KoP provides following configurations to specify the default namespace.

NameDescriptionDefault
kafkaTenantThe default tenant of Kafka topicspublic
kafkaNamespaceThe default namespace of Kafka topicsdefault
kafkaMetadataTenantThe tenant used for storing Kafka metadata topicspublic
kafkaEnableMultiTenantMetadataUse the SASL username as kafkaMetadataTenanttrue
kafkaMetadataNamespaceThe namespace used for storing Kafka metadata topics__kafka
kopAllowedNamespacesThe allowed namespace to list topics with a comma separator.
For example, kopAllowedNamespaces=public/default,public/kafka.
If it is not configured or is empty, the allowed namespaces will get values from kafkaTenant and kafkaNamespace, which is <kafkaTenant>/<kafkaNamespace>.

When you enable kafkaEnableMultiTenantMetadata, KoP uses separate tenants for handling the system metadata. This enables you to fully isolate your tenants in your Pulsar cluster. This is not available in pure Kafka, because usually you share the system metadata among all the users.

Performance

This section lists configurations that may affect the performance.

NameDescriptionRangeDefault
entryFormatThe format of an entry. If it is set tokafka, there is no unnecessary encoding and decoding work, which helps improve the performance. However, in this situation, a topic cannot be used by mixed Pulsar clients and Kafka clients. If it is set to mixed_kafka, some non-official Kafka clients implementation are supported.
- Note: Compared with performance for mixed_kafka, performance is improved by 2 to 3 times when the parameter is set to kafka.
kafka,
mixed_kafka,
pulsar
pulsar
maxReadEntriesNumThe maximum number of entries that are read from the cursor once per time.
Increasing this value can make FETCH request read more bytes each time.
NOTE: Currently, KoP does not check the maximum byte limit. Therefore, if the value is too great, the response size may be over the network limit.
5

Choose the proper entryFormat

This table lists entryFormat values that are supported in KoP.

NameDescription
pulsarpulsar is the default entryFormat in KoP. It is used to encode or decode formats between the Kafka message and the Pulsar message. Therefore, the performance is the worst. The benefit is that both the Kafka client and the Pulsar client consumers can consume the messages from the Pulsar cluster.
kafkaWhen you set the entryFormat option to kafka, KoP does not encode or decode Kafka messages. The messages will be directly stored in the bookie cluster in entries format, and the Pulsar client can not parse these messages. Therefore, the performance is the best.
mixed_kafkaThe mixed_kafka format works similarly to the kafka format. You can set this option for some non-official Kafka clients for encoding or decoding Kafka messages. The performance is medium.

You can run the io.streamnative.pulsar.handlers.kop.format.EncodePerformanceTest.java to get the performance result among the above formats.

Generally, if you don't have Pulsar consumers that consume messages from Kafka producers, kafka format is perferred because it has much higher performance when Kafka consumers interact with Kafka producers.

However, some non-official Kafka clients might not work for kafka format. For example, old Golang Sarama client didn't assign relative offsets in compressed message sets before Shopify/sarama #1002. In this case, the broker has to assign relative offsets and then do recompression. Since this behavior leads to some performance loss, KoP adds the mixed_kafka format to perform the conversion. The mixed_kafka format should be chosen when you have such an old Kafka client. Like kafka format, in this case, Pulsar consumers still cannot consume messages from Kafka producers.

Kafka payload processor

PIP 96 introduced a message payload processor for Pulsar consumer. KoP provides a processor implementation so that even if you configured entryFormat=kafka for better performance among Kafka clients, it could be also possible for Pulsar consumer to consume messages from Kafka producer.

You just need to configure the processor in your consumer application via messagePayloadProcessor. See the following code example:

PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Consumer<byte[]> consumer = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-sub")
        .messagePayloadProcessor(new KafkaPayloadProcessor()) // this extra line is needed
        .subscribe();

To import the KafkaPayloadProcessor, you should add the additional dependency.

    <dependency>
      <groupId>io.streamnative.pulsar.handlers</groupId>
      <artifactId>kafka-payload-processor</artifactId>
      <version>${pulsar.version}</version>
    </dependency>

The pulsar.version should be same with the version of your pulsar-client dependency.

Network

NameDescriptionDefault
maxQueuedRequestsLimit the queue size for request, like queued.max.requests in Kafka server.500
requestTimeoutMsLimit the timeout in milliseconds for request, like request.timeout.ms in Kafka client.
If a request was not processed in the timeout, KoP would return an error response to client.
30000
connectionMaxIdleMsThe idle connection timeout in milliseconds. If the idle connection timeout (such as connections.max.idle.ms used in the Kafka server) is reached, the server handler will close this idle connection.
Note: If it is set to -1, it indicates that the idle connection timeout is disabled.
600000
failedAuthenticationDelayMsConnection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure, like connection.failed.authentication.delay.ms in Kafka server.300
brokerLookupTimeoutMsThe timeout for broker lookups (in milliseconds).30000

NOTE

These limits are based on each connection.

Prometheus

NameDescriptionDefault
kopPrometheusStatsLatencyRolloverSecondsKop metrics exposed to prometheus rollover latency in seconds.60
kopEnableGroupLevelConsumerMetricsEnable the group level consumer metrics.false

Group Coordinator

This section lists configurations about the group coordinator and the __consumer_offsets topic that is used to store committed offsets.

NameDescriptionDefault
groupMinSessionTimeoutMsThe minimum allowed session timeout for registered consumers.
Shorter timeouts result in quicker failure detection while require more frequent consumer heart beating, which can overwhelm broker resources.
6000
groupMaxSessionTimeoutMsThe maximum allowed session timeout for registered consumers.
Longer timeouts give consumers more time to process messages between heartbeats while require longer time to detect failures.
300000
groupInitialRebalanceDelayMsThe time the group coordinator waits for more consumers to join a new group before performing the first rebalance.
A longer delay potentially reduces rebalances, but increases the time until processing begins.
3000
offsetsTopicCompressionCodecCompression codec for the offsets topic.
Compression may be used to achieve "atomic" commits.
offsetMetadataMaxSizeThe maximum size in bytes for a metadata entry associated with an offset commit.4096
offsetsRetentionMinutesOffsets older than this retention period are discarded.4320
offsetsMessageTTLThe offsets message TTL in seconds.259200
offsetsRetentionCheckIntervalMsThe frequency at which to check for stale offsets.600000
offsetsTopicNumPartitionsThe number of partitions for the offsets topic.50
offsetCommitTimeoutMsOffset commit will be delayed until the offset metadata be persisted or this timeout is reached5000
systemTopicRetentionSizeInMBThe system topic retention size in mb.-1

Transaction

This section lists configurations about the transaction.

NameDescriptionDefault
kafkaTransactionCoordinatorEnabledWhether to enable transaction coordinator.false
kafkaBrokerIdThe broker ID that is used to create the producer ID.1
kafkaTxnLogTopicNumPartitionsthe number of partitions for the transaction log topic.50
kafkaTxnAbortTimedOutTransactionCleanupIntervalMsThe interval in milliseconds at which to rollback transactions that have timed out.10000
kafkaTransactionalIdExpirationEnableWhether to enable transactional ID expiration.true
kafkaTransactionalIdExpirationMsThe time (in ms) that the transaction coordinator waits without receiving any transaction status updates for the current transaction before expiring its transactional ID.604800
kafkaTransactionsRemoveExpiredTransactionalIdCleanupIntervalMsThe interval (in ms) at which to remove expired transactions.3600

Authentication

This section lists configurations about the authentication.

NameDescriptionRangeDefault
saslAllowedMechanismsA set of supported SASL mechanisms exposed by the broker.PLAIN,
OAUTHBEARER
kopOauth2AuthenticateCallbackHandlerThe fully qualified name of a SASL server callback handler class that implements the
AuthenticateCallbackHandler interface, which is used for OAuth2 authentication.
If it is not set, the class will be Kafka's default server callback handler for
OAUTHBEARER mechanism: OAuthBearerUnsecuredValidatorCallbackHandler.

Authorization

This section lists configurations about the authorization.

NameDescriptionRangeDefault
kafkaEnableAuthorizationForceGroupIdCheckWhether to enable authorization force group ID check. Note: It only support for OAuth2 authentication.true, falsefalse
kopAuthorizationCacheRefreshMsIf it's configured with a positive value N, each connection will cache the authorization results of PRODUCE and FETCH requests for at least N ms.
It could help improve the performance when authorization is enabled, but the permission revoke will also take N ms to take effect.
1 .. 214748364730000
kopAuthorizationCacheMaxCountPerConnectionIf it's configured with a positive value N, each connection will cache at most N entries for PRODUCE or FETCH requests.
If it's non-positive, the cache size will be the default value.
1 .. 2147483647100

SSL encryption

NameDescriptionDefault
kopSslProtocolKafka SSL configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocolTLS
kopSslProviderKafka SSL configuration map with: SSL_PROVIDER_CONFIG = ssl.provider
kopSslCipherSuitesKafka SSL configuration map with: SSL_CIPHER_SUITES_CONFIG = ssl.cipher.suites
kopSslEnabledProtocolsKafka SSL configuration map with: SSL_ENABLED_PROTOCOLS_CONFIG = ssl.enabled.protocolsTLSv1.2, TLSv1.1, TLSv1
kopSslKeystoreTypeKafka SSL configuration map with: SSL_KEYSTORE_TYPE_CONFIG = ssl.keystore.typeJKS
kopSslKeystoreLocationKafka SSL configuration map with: SSL_KEYSTORE_LOCATION_CONFIG = ssl.keystore.location
kopSslKeystorePasswordKafka SSL configuration map with: SSL_TRUSTSTORE_PASSWORD_CONFIG = ssl.truststore.passwordN/A
kopSslTruststoreTypeKafka SSL configuration map with: SSL_KEYSTORE_TYPE_CONFIG = ssl.keystore.typeJKS
kopSslTruststoreLocationKafka SSL configuration map with: SSL_TRUSTSTORE_LOCATION_CONFIG = ssl.truststore.location
kopSslTruststorePasswordKafka SSL configuration map with: SSL_TRUSTSTORE_PASSWORD_CONFIG = ssl.truststore.password
kopSslKeymanagerAlgorithmKafka SSL configuration map with: SSL_KEYMANAGER_ALGORITHM_CONFIG = ssl.keymanager.algorithmSunX509
kopSslTrustmanagerAlgorithmKafka SSL configuration map with: SSL_TRUSTMANAGER_ALGORITHM_CONFIG = ssl.trustmanager.algorithmSunX509
kopSslSecureRandomImplementationKafka SSL configuration map with: SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG = ssl.secure.random.implementation

Schema Registry

NameDescriptionDefault
kopSchemaRegistryEnableWhether to enable the Schema Registryfalse
kopSchemaRegistryPortThe Schema Registry port8001
kopSchemaRegistryNamespaceThe namespace used for storing Kafka Schema Registry__kafka_schemaregistry
kopSchemaRegistryTopicNameThe name of the topic used by the Schema Registry__schema-registry