Vert.x Kafka
March 21, 2016 ยท View on GitHub
Vert.x Kafka
The Vert.x kafka library allows asynchronous publishing and receiving of messages on Kafka topic through the vert.x event bus.
####To use this library you must have kafka and zookeeper up and running. Follow instructions at Kafka quick start guide
This is a multi-threaded worker library that consumes kafka messages and then re-broadcast them on an address on the vert.x event bus.
Getting Started
Add a dependency to vertx-kafka:
<dependency>
<groupId>com.cyngn.vertx</groupId>
<artifactId>vertx-kafka</artifactId>
<version>3.3.0-SNAPSHOT</version>
</dependency>
| vertx-kafka | vert.x | kafka |
|---|---|---|
| 3.3.0-SNAPSHOT | 3.3.0-SNAPSHOT | 0.9.0 |
| 0.4.1 | 3.1.0 | 0.9.0 |
Consumer
Listening for messages coming from a kafka broker.
Configuration
{
"zookeeper.connect" : "<host1:2181,host:2181...>",
"group.id" : "<kafkaConsumerGroupId>",
"bootstrap.servers" "<host1:9092,host2:9092...>",
"backoff.increment.ms" : "<backTimeInMilli>",
"autooffset.reset" : "<kafkaAutoOffset>",
"topics" : ["<topic1>", "<topic2>"],
"eventbus.address" : "<default kafka.message.consumer>",
"consumer.poll.interval.ms" : <default 100 ms>
}
For example:
{
"zookeeper.host" : "localhost:2181",
"group.id" : "testGroup",
"bootstrap.servers" "localhost:9092",
"backoff.increment.ms" : "100",
"autooffset.reset" : "smallest",
"topics" : ["testTopic"],
"eventbus.address" : "kafka.to.vertx.bridge",
"consumer.poll.interval.ms" : 1000
}
Field breakdown:
zookeeper.connecta zookeeper connection string of form hostname1:port1,hostname2:port2,hostname3:port3/chroot/path used with your kafka clustersgroup.idthe kafka consumer group name that will be consuming related tobootstrap.serversthe list of initial kafka hosts to connect tobackoff.increment.msbackoff interval for contacting broker without messages in millisecondsautooffset.resethow to reset the offsettopicsthe kafka topics to listen foreventbus.addressthe vert.x address to publish messages onto when received form kafkaconsumer.poll.interval.mshow often to try and consume messages
For a deeper look at kafka configuration parameters check this page out.
Usage
You should only need one consumer per application.
Deploy the verticle in your server
vertx = Vertx.vertx();
// sample config
JsonObject consumerConfig = new JsonObject();
consumerConfig.put(ConfigConstants.GROUP_ID, "testGroup");
List<String> topics = new ArrayList<>();
topics.add("testTopic");
consumerConfig.put("topics", new JsonArray(topics));
deployKafka(config);
public void deployKafka(JsonObject config) {
// use your vert.x reference to deploy the consumer verticle
vertx.deployVerticle(MessageConsumer.class.getName(),
new DeploymentOptions().setConfig(config),
deploy -> {
if(deploy.failed()) {
System.err.println(String.format("Failed to start kafka consumer verticle, ex: %s", deploy.cause()));
vertx.close()
return;
}
System.out.println("kafka consumer verticle started");
}
);
}
Listen for messages
vertx.eventBus().consumer(MessageConsumer.EVENTBUS_DEFAULT_ADDRESS,
message -> {
System.out.println(String.format("got message: %s", message.body()))
// message handling code
KafkaEvent event = new KafkaEvent(message.body());
});
Consumer Errors
You can listen on the address kafka.producer.error for errors from the kafka producer.
Producer
Send a message to a kafka cluster on a predefined topic.
Configuration
{
"serializer.class":"<the default encoder>",
"key.serializer":"<the key encoder>",
"value.serializer":"<the value encoder>",
"bootstrap.servers":"<host1:9092,host2:9092>,"
"default_topic":"<default kafka topic to send to>,"
"eventbus.address":"<the event bus topic where you send messages to send to kafka>"
"max.block.ms" : <defaults to 60000>
}
For example:
{
"serializer.class":"org.apache.kafka.common.serialization.StringSerializer",
"bootstrap.servers":"localhost:9092",
"default_topic":"testTopic"
}
serializer.classThe serializer class for messageskey.serializerThe serializer class for keys, defaults to the serializel.class if not setvalue.serializerThe serializer class for values, defaults to the serializel.class if not setbootstrap.serversThe socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.default_topicThe default topic in kafka to send toeventbus.addressThe address to listen to on the event bus, defaults to 'kafka.message.publisher'max.block.msHow long should the sender wait before getting meta data or time out in ms.
For a deeper look at kafka configuration parameters check this page out.
Usage
You should only need one producer per application.
Deploy the verticle in your server
vertx = Vertx.vertx();
// sample config
JsonObject producerConfig = new JsonObject();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("serializer.class", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("default_topic", "testTopic");
deployKafka(producerConfig);
public void deployKafka(JsonObject config) {
// use your vert.x reference to deploy the consumer verticle
vertx.deployVerticle(MessageProducer.class.getName(),
new DeploymentOptions().setConfig(config),
deploy -> {
if(deploy.failed()) {
System.err.println(String.format("Failed to start kafka producer verticle, ex: %s", deploy.cause()));
vertx.close()
return;
}
System.out.println("kafka producer verticle started");
});
}
Send message to kafka topic
KafkaPublisher publisher = new KafkaPublisher(vertx.eventBus());
// send to the default topic
publisher.send("a test message on a default topic");
// send to a specific topic
publisher.send("SomeSpecialTopic", "a test message on a default topic");
// send to a specific topic with custom key
publisher.send("SomeSpecialTopic", "aUserId", "a test message on a default topic");
// send to a specific topic and partition
publisher.send("SomeSpecialTopic", "", 5, "a test message on a default topic");
Producer Errors
You can listen on the address kafka.producer.error for errors from the kafka producer.
Test setup
- cd [yourKafkaInstallDir]
- bin/zookeeper-server-start.sh config/zookeeper.properties
- bin/kafka-server-start.sh config/server.properties
- bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 8 --topic [yourTestTopic]
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic [yourTestTopic]
- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic [yourTestTopic]