Vert.x Kafka Client

    • consumer:消费者
    • consumer group:消费组()
    • partition:分区(Kafka中的设计
    • producer: 生产者
    • offset:偏移量

    组件介绍

    对于消费者(consumer),API以异步的方式订阅消费指定的 topic 以及相关的分区(partition),或者将消息以 Vert.x Stream 的方式读取(甚至可以支持暂停(pause)和恢复(resume)操作)。

    对于生产者(producer),API提供发送信息到指定 topic 以及相关的分区(partition)的方法,类似于向 Vert.x Stream 中写入数据。

    使用 Vert.x Kafka Client

    要使用 Vert.x Kafka Client 组件,需要添加以下依赖:

    • Maven(在 文件中):
    • Gradle(在build.gradle文件中):
    1. compile 'io.vertx:vertx-kafka-client:3.4.1'

    创建 Kafka Client

    创建 Consumer 和 Producer 以及使用它们的方法其实与原生的 Kafka Client 库非常相似,Vert.x 只是做了一层异步封装。

    我们需要对 Consumer 与 Producer 进行一些相关的配置,具体可以参考 Apache Kafka 的官方文档:

    我们可以通过一个 Map 来包装这些配置,然后将其传入到 KafkaConsumer 接口或 接口中的 create 静态方法里来创建 KafkaConsumerKafkaProducer

    1. Map<String, String> config = new HashMap<>();
    2. config.put("bootstrap.servers", "localhost:9092");
    3. config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    4. config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    5. config.put("group.id", "my_group");
    6. config.put("auto.offset.reset", "earliest");
    7. config.put("enable.auto.commit", "false");
    8. // 创建一个Kafka Consumer
    9. KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

    在上面的例子中,我们在创建 KafkaConsumer 实例时传入了一个 Map 实例,用于指定要连接的 Kafka 节点列表(只有一个)以及如何对接收到的消息进行解析以得到 key 与 value。

    我们可以用类似的方法来创建 Producer:

    1. Map<String, String> config = new HashMap<>();
    2. config.put("bootstrap.servers", "localhost:9092");
    3. config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    4. config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    5. config.put("acks", "1");
    6. // 创建一个Kafka Producer
    7. KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);

    另外也可以使用 来代替 Map:

    1. Properties config = new Properties();
    2. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    3. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    4. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    5. config.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
    6. config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    7. config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    8. KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

    消息的 key 和 value 的序列化格式也可以作为 create 方法的参数直接传进去,而不是在相关配置中指定:

    1. Properties config = new Properties();
    2. config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    3. config.put(ProducerConfig.ACKS_CONFIG, "1");
    4. // 注意这里的第三和第四个参数
    5. KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config, String.class, String.class);

    在这里,我们在创建 实例的时候传入了一个 Properties 实例,用于指定要连接的 Kafka 节点列表(只有一个)和消息确认模式。消息 key 和 value 的解析方式作为参数传入 方法中。

    消费感兴趣 Topic 的消息并加入消费组

    我们可以通过 KafkaConsumer 的的 方法来订阅一个或多个 topic 进行消费,同时加入到某个消费组(consumer group)中(在创建消费者实例时通过配置指定)。当然你需要通过 handler 方法注册一个 Handler 来处理接收的消息:

    1. consumer.handler(record -> {
    2. System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    3. ",partition=" + record.partition() + ",offset=" + record.offset());
    4. });
    5. // 订阅多个topic
    6. Set<String> topics = new HashSet<>();
    7. topics.add("topic1");
    8. topics.add("topic2");
    9. topics.add("topic3");
    10. consumer.subscribe(topics);
    11. // 订阅单个主题
    12. consumer.subscribe("a-single-topic");

    另外如果想知道消息是否成功被消费掉,可以在调用 subscribe 方法时绑定一个 Handler

    1. consumer.handler(record -> {
    2. System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    3. ",partition=" + record.partition() + ",offset=" + record.offset());
    4. });
    5. // subscribe to several topics
    6. Set<String> topics = new HashSet<>();
    7. topics.add("topic1");
    8. topics.add("topic2");
    9. topics.add("topic3");
    10. //这里lambda表达式用于接收消息处理结果
    11. consumer.subscribe(topics, ar -> {
    12. if (ar.succeeded()) {
    13. System.out.println("subscribed");
    14. } else {
    15. System.out.println("Could not subscribe " + ar.cause().getMessage());
    16. }
    17. });
    18. //这里lambda表达式用于接收消息处理结果
    19. consumer.subscribe("a-single-topic", ar -> {
    20. if (ar.succeeded()) {
    21. System.out.println("subscribed");
    22. } else {
    23. System.out.println("Could not subscribe " + ar.cause().getMessage());
    24. }
    25. });

    由于Kafka的消费者会组成一个消费组(consumer group),同一个组只有一个消费者可以消费特定的 partition,同时此消费组也可以接纳其他的消费者,这样可以实现 partition 分配给组内其它消费者继续去消费。

    如果组内的一个消费者挂了,kafka 集群会自动把 partition 重新分配给组内其他消费者,或者新加入一个消费者去消费对应的 partition。您可以通过 和 partitionsAssignedHandler 方法在 里注册一个 Handler 用于监听对应的 partition 是否被删除或者分配。

    1. consumer.handler(record -> {
    2. System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    3. ",partition=" + record.partition() + ",offset=" + record.offset());
    4. });
    5. // 注册一个用于侦听新分配partition的Handler
    6. consumer.partitionsAssignedHandler(topicPartitions -> {
    7. System.out.println("Partitions assigned");
    8. for (TopicPartition topicPartition : topicPartitions) {
    9. System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
    10. }
    11. });
    12. // 注册一个用于侦听撤销partition的Handler
    13. consumer.partitionsRevokedHandler(topicPartitions -> {
    14. System.out.println("Partitions revoked");
    15. for (TopicPartition topicPartition : topicPartitions) {
    16. System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
    17. }
    18. });
    19. // subscribes to the topic
    20. consumer.subscribe("test", ar -> {
    21. if (ar.succeeded()) {
    22. System.out.println("Consumer subscribed");
    23. }
    24. });

    加入某个 consumer group 的消费者,可以通过 unsubscribe 方法退出该消费组,从而不再接受到相关消息:

    1. consumer.unsubscribe();

    当然你也可以在 unsubscribe 方法中传入一个 Handler 用于监听执行结果状态:

    1. consumer.unsubscribe(ar -> {
    2. if (ar.succeeded()) {
    3. System.out.println("Consumer unsubscribed");
    4. }
    5. });

    从 Topic 的特定分区里接收消息

    消费组内的消费者可以消费某个 topic 指定的 partition。如果某个消费者并不属于任何消费组,那么整个程序就不能依赖 Kafka 的 re-balancing 机制去消费消息。

    1. consumer.handler(record -> {
    2. System.out.println("key=" + record.key() + ",value=" + record.value() +
    3. ",partition=" + record.partition() + ",offset=" + record.offset());
    4. });
    5. //
    6. Set<TopicPartition> topicPartitions = new HashSet<>();
    7. topicPartitions.add(new TopicPartition()
    8. .setPartition(0));
    9. // 要求分配到特定的topic以及partitions
    10. consumer.assign(topicPartitions, done -> {
    11. if (done.succeeded()) {
    12. System.out.println("Partition assigned");
    13. // 侦听分配结果
    14. consumer.assignment(done1 -> {
    15. if (done1.succeeded()) {
    16. for (TopicPartition topicPartition : done1.result()) {
    17. System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
    18. }
    19. }
    20. });
    21. }
    22. });

    上面的 assignment 方法可以列出当前分配的 topic partition。

    获取 Topic 以及分区信息

    您可以通过 partitionsFor 方法获取指定 topic 的 partition 信息:

    另外, 方法可以列出消费者下的所有 topic 以及对应的 partition 信息:

    1. consumer.listTopics(ar -> {
    2. if (ar.succeeded()) {
    3. Map<String, List<PartitionInfo>> map = ar.result();
    4. map.forEach((topic, partitions) -> {
    5. System.out.println("topic = " + topic);
    6. System.out.println("partitions = " + map.get(topic));
    7. });
    8. }
    9. });

    在 Apache Kafka 中,消费者负责处理最新读取消息的偏移量(offset)。Consumer 会在每次从某个 topic partition 中读取一批消息的时候自动执行提交偏移量的操作。需要在创建 KafkaConsumer 时将 enable.auto.commit 配置项设为 true 来开启自动提交。

    我们可以通过 commit 方法进行手动提交。手动提交偏移量通常用于确保消息分发的 at least once 语义,以确保消息没有被消费前不会执行提交。

    1. consumer.commit(ar -> {
    2. if (ar.succeeded()) {
    3. System.out.println("Last read message offset committed");
    4. }
    5. });

    分区偏移量定位

    Apache Kafka 中的消息是按顺序持久化在磁盘上的,所以消费者可以在某个 partition 内部进行偏移量定位(seek)操作,并从任意指定的 topic 以及 partition 位置开始消费消息。我们可以通过 seek 方法来更改读取位置对应的偏移量:

    1. TopicPartition topicPartition = new TopicPartition()
    2. .setTopic("test")
    3. .setPartition(0);
    4. // 指定offset位置10
    5. consumer.seek(topicPartition, 10, done -> {
    6. if (done.succeeded()) {
    7. System.out.println("Seeking done");
    8. }
    9. });

    当消费者需要从 Stream 的起始位置读取消息时,可以使用 方法将 offset 位置设置到 partition 的起始端:

    1. TopicPartition topicPartition = new TopicPartition()
    2. .setTopic("test")
    3. .setPartition(0);
    4. // 将offset挪到分区起始端
    5. consumer.seekToBeginning(Collections.singleton(topicPartition), done -> {
    6. if (done.succeeded()) {
    7. System.out.println("Seeking done");
    8. }
    9. });

    最后我们也可以通过 seekToEnd 方法将 offset 位置设置到 partition 的末端:

    1. TopicPartition topicPartition = new TopicPartition()
    2. .setTopic("test")
    3. .setPartition(0);
    4. // 将offset挪到分区末端
    5. consumer.seekToEnd(Collections.singleton(topicPartition), done -> {
    6. if (done.succeeded()) {
    7. System.out.println("Seeking done");
    8. }
    9. });

    偏移量查询

    你可以利用 Kafka 0.10.1.1 引入的新的API beginningOffsets 来获取给定分区的起始偏移量。这个跟上面的 seekToBeginning 方法有一个地方不同: 方法不会更改 offset 的值,仅仅是读取(只读模式)。

    1. Set<TopicPartition> topicPartitions = new HashSet<>();
    2. TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
    3. topicPartitions.add(topicPartition);
    4. consumer.beginningOffsets(topicPartitions, done -> {
    5. if(done.succeeded()) {
    6. Map<TopicPartition, Long> results = done.result();
    7. results.forEach((topic, beginningOffset) ->
    8. System.out.println("Beginning offset for topic="+topic.getTopic()+", partition="+
    9. topic.getPartition()+", beginningOffset="+beginningOffset));
    10. }
    11. });
    12. // partition offset 查询辅助方法
    13. consumer.beginningOffsets(topicPartition, done -> {
    14. if(done.succeeded()) {
    15. Long beginningOffset = done.result();
    16. System.out.println("Beginning offset for topic="+topicPartition.getTopic()+", partition="+
    17. topicPartition.getPartition()+", beginningOffset="+beginningOffset);
    18. }
    19. });

    与此对应的API还有 endOffsets 方法,用于获取给定分区末端的偏移量值。与 方法相比,endOffsets 方法不会更改 offset 的值,仅仅是读取(只读模式)。

    1. Set<TopicPartition> topicPartitions = new HashSet<>();
    2. TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
    3. topicPartitions.add(topicPartition);
    4. consumer.endOffsets(topicPartitions, done -> {
    5. if(done.succeeded()) {
    6. Map<TopicPartition, Long> results = done.result();
    7. results.forEach((topic, endOffset) ->
    8. System.out.println("End offset for topic="+topic.getTopic()+", partition="+
    9. topic.getPartition()+", endOffset="+endOffset));
    10. }
    11. });
    12. consumer.endOffsets(topicPartition, done -> {
    13. if(done.succeeded()) {
    14. Long endOffset = done.result();
    15. System.out.println("End offset for topic="+topicPartition.getTopic()+", partition="+
    16. topicPartition.getPartition()+", endOffset="+endOffset);
    17. }
    18. });

    Kafka 0.10.1.1 还提供了一个根据时间戳(timestamp)来定位 offset 的API方法 offsetsForTimes,调用此API可以返回大于等于给定时间戳的 offset。因为 Kafka 的 offset 低位就是时间戳,所以 Kafka 很容易定位此类offset。

    1. Map<TopicPartition, Long> topicPartitionsWithTimestamps = new HashMap<>();
    2. TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
    3. // 我们只想要60秒之前的消息的offset
    4. long timestamp = (System.currentTimeMillis() - 60000);
    5. topicPartitionsWithTimestamps.put(topicPartition, timestamp);
    6. consumer.offsetsForTimes(topicPartitionsWithTimestamps, done -> {
    7. if(done.succeeded()) {
    8. Map<TopicPartition, OffsetAndTimestamp> results = done.result();
    9. results.forEach((topic, offset) ->
    10. System.out.println("Offset for topic="+topic.getTopic()+
    11. ", partition="+topic.getPartition()+"\n"+
    12. ", timestamp="+timestamp+", offset="+offset.getOffset()+
    13. ", offsetTimestamp="+offset.getTimestamp()));
    14. }
    15. });
    16. consumer.offsetsForTimes(topicPartition, timestamp, done -> {
    17. if(done.succeeded()) {
    18. OffsetAndTimestamp offsetAndTimestamp = done.result();
    19. System.out.println("Offset for topic="+topicPartition.getTopic()+
    20. ", partition="+topicPartition.getPartition()+"\n"+
    21. ", timestamp="+timestamp+", offset="+offsetAndTimestamp.getOffset()+
    22. ", offsetTimestamp="+offsetAndTimestamp.getTimestamp());
    23. }
    24. });

    流量控制

    Consumer 可以对消息流进行流量控制。如果我们读到一批消息,需要花点时间进行处理则可以暂时暂停(pause)消息的流入(这里实际上是把消息全部缓存到内存里了);等我们处理了差不多了,可以再继续消费缓存起来的消息(resume)。

    我们可以利用 pause 方法和 方法来进行流量控制:

    1. TopicPartition topicPartition = new TopicPartition()
    2. .setTopic("test")
    3. .setPartition(0);
    4. //注册一个handler处理进来的消息
    5. consumer.handler(record -> {
    6. System.out.println("key=" + record.key() + ",value=" + record.value() +
    7. ",partition=" + record.partition() + ",offset=" + record.offset());
    8. // 如果我们读到partition0的第5个offset
    9. if ((record.partition() == 0) && (record.offset() == 5)) {
    10. // 则暂停读取
    11. if (ar.succeeded()) {
    12. System.out.println("Paused");
    13. // 5秒后再恢复,继续读取
    14. vertx.setTimer(5000, timeId -> {
    15. // resumi read operations
    16. consumer.resume(topicPartition);
    17. });
    18. }
    19. });
    20. }
    21. });

    关闭 Consumer

    关闭 Consumer 只需要调用 方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放相关资源。

    由于 close 方法是异步的,你并不知道关闭操作什么时候完成或失败,这时你需要注册一个处理器(Handler)来监听关闭完成的消息。当关闭操作彻底完成以后,注册的 Handler 将会被调用。

    1. consumer.close(res -> {
    2. if (res.succeeded()) {
    3. System.out.println("Consumer is now closed");
    4. } else {
    5. System.out.println("close failed");
    6. }
    7. });

    发送消息到某个 Topic

    您可以利用 write 方法来向某个 topic 发送消息(records)。

    最简单的发送消息的方式是仅仅指定目的 topic 以及相应的值而省略消息的 key 以及分区。在这种情况下,消息会以轮询(round robin)的方式发送到对应 topic 的所有分区上。

    1. for (int i = 0; i < 5; i++) {
    2. // 这里指定了topic和 message value,以round robin方式发送的目标partition
    3. KafkaProducerRecord<String, String> record =
    4. KafkaProducerRecord.create("test", "message_" + i);
    5. producer.write(record);
    6. }

    您可以通过绑定 Handler 来接受发送的结果。这个结果其实就是一些元数据(metadata),包含消息的 topic、目的分区 (destination partition) 以及分配的偏移量 (assigned offset)。

    如果希望将消息发送到指定的分区,你可以指定分区的标识(identifier)或者设定消息的 key:

    1. for (int i = 0; i < 10; i++) {
    2. // 这里指定了 partition 为 0
    3. KafkaProducerRecord<String, String> record =
    4. KafkaProducerRecord.create("test", null, "message_" + i, 0);
    5. producer.write(record);
    6. }
    1. for (int i = 0; i < 10; i++) {
    2. // i.e. defining different keys for odd and even messages
    3. int key = i % 2;
    4. //这里指明了key,所有的消息将被发送同一个partition.
    5. KafkaProducerRecord<String, String> record =
    6. KafkaProducerRecord.create("test", String.valueOf(key), "message_" + i);
    7. producer.write(record);
    8. }

    共享 Producer

    有时候您希望在多个 Verticle 或者 Vert.x Context 下共用一个 Producer。您可以通过 KafkaProducer.createShared 方法来创建可以在 Verticle 之间安全共享的 KafkaProducer 实例:

    1. KafkaProducer<String, String> producer1 = KafkaProducer.createShared(vertx, "the-producer", config);
    2. // 关闭
    3. producer1.close();

    返回的 KafkaProducer 实例将复用相关的资源(如线程、连接等)。使用完 KafkaProducer 后,直接调用 close 方法关闭即可,相关的资源会自动释放。

    与关闭 Consumer 类似,关闭 Producer 只需要调用 close 方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放所有相关资源。

    由于 close 方法是异步的,你并不知道关闭操作什么时候完成或失败,这时你需要注册一个处理器(Handler)来监听关闭完成的消息。当关闭操作彻底完成以后,注册的 Handler 将会被调用。

    1. producer.close(res -> {
    2. if (res.succeeded()) {
    3. System.out.println("Producer is now closed");
    4. } else {
    5. System.out.println("close failed");
    6. }
    7. });

    获取 Topic Partition 的相关信息

    您可以通过 partitionsFor 方法获取指定 topic 的分区信息。

    1. producer.partitionsFor("test", ar -> {
    2. if (ar.succeeded()) {
    3. for (PartitionInfo partitionInfo : ar.result()) {
    4. System.out.println(partitionInfo);
    5. }
    6. }
    7. });

    错误处理

    您可以利用 KafkaProducer#exceptionHandler 方法和 方法来处理 Kafka 客户端(生产者和消费者)和 Kafka 集群之间的错误(如超时)。比如:

    1. consumer.exceptionHandler(e -> {
    2. System.out.println("Error = " + e.getMessage());
    3. });

    随 Verticle 自动关闭

    如果您是在 Verticle 内部创建的 Consumer 和 Producer,那么当对应 Verticle 被卸载(undeploy)的时候,相关的 Consumer 和 Producer 会自动关闭。

    使用 Vert.x 自带的序列化与反序列化机制

    Vert.x Kafka Client 自带现成的序列化与反序列化机制,可以处理 BufferJsonObjectJsonArray 等类型。

    KafkaConsumer 里您可以使用 Buffer

    1. Map<String, String> config = new HashMap<>();
    2. config.put("bootstrap.servers", "localhost:9092");
    3. config.put("key.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
    4. config.put("value.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
    5. config.put("group.id", "my_group");
    6. config.put("auto.offset.reset", "earliest");
    7. config.put("enable.auto.commit", "false");
    8. // 创建一个可以反序列化成jsonObject的consumer.
    9. config = new HashMap<>();
    10. config.put("bootstrap.servers", "localhost:9092");
    11. config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
    12. config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
    13. config.put("group.id", "my_group");
    14. config.put("auto.offset.reset", "earliest");
    15. config.put("enable.auto.commit", "false");
    16. // 创建一个可以反序列化成jsonArray的consumer.
    17. config = new HashMap<>();
    18. config.put("bootstrap.servers", "localhost:9092");
    19. config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
    20. config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
    21. config.put("group.id", "my_group");
    22. config.put("auto.offset.reset", "earliest");
    23. config.put("enable.auto.commit", "false");

    同样在 KafkaProducer 中也可以:

    1. Map<String, String> config = new HashMap<>();
    2. config.put("bootstrap.servers", "localhost:9092");
    3. config.put("key.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
    4. config.put("value.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
    5. config.put("acks", "1");
    6. // 创建一个可以序列化成jsonObject的Producer.
    7. config = new HashMap<>();
    8. config.put("bootstrap.servers", "localhost:9092");
    9. config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
    10. config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
    11. config.put("acks", "1");
    12. // 创建一个可以序列化成jsonArray的Producer.
    13. config = new HashMap<>();
    14. config.put("bootstrap.servers", "localhost:9092");
    15. config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
    16. config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
    17. config.put("acks", "1");

    您也可以在 create 方法里指明序列化与反序列化相关的类。

    比如创建 Consumer 时:

    1. Map<String, String> config = new HashMap<>();
    2. config.put("bootstrap.servers", "localhost:9092");
    3. config.put("group.id", "my_group");
    4. config.put("auto.offset.reset", "earliest");
    5. config.put("enable.auto.commit", "false");
    6. // 创建一个可以反序列化成Buffer的Consumer
    7. KafkaConsumer<Buffer, Buffer> bufferConsumer = KafkaConsumer.create(vertx, config, Buffer.class, Buffer.class);
    8. // 创建一个可以反序列化成JsonObject的Consumer
    9. KafkaConsumer<JsonObject, JsonObject> jsonObjectConsumer = KafkaConsumer.create(vertx, config, JsonObject.class, JsonObject.class);
    10. // 创建一个可以反序列化成JsonArray的Consumer
    11. KafkaConsumer<JsonArray, JsonArray> jsonArrayConsumer = KafkaConsumer.create(vertx, config, JsonArray.class, JsonArray.class);

    创建 Producer 时:

    1. Map<String, String> config = new HashMap<>();
    2. config.put("bootstrap.servers", "localhost:9092");
    3. config.put("acks", "1");
    4. // 创建一个可以序列化成Buffer的Producer.
    5. KafkaProducer<Buffer, Buffer> bufferProducer = KafkaProducer.create(vertx, config, Buffer.class, Buffer.class);
    6. // 创建一个可以序列化成jsonObject的Producer.
    7. KafkaProducer<JsonObject, JsonObject> jsonObjectProducer = KafkaProducer.create(vertx, config, JsonObject.class, JsonObject.class);
    8. // 创建一个可以序列化成jsonArray的Producer.
    9. KafkaProducer<JsonArray, JsonArray> jsonArrayProducer = KafkaProducer.create(vertx, config, JsonArray.class, JsonArray.class);

    RxJava API

    Vert.x Kafka Client 组件也提供Rx风格的API。

    1. Observable<KafkaConsumerRecord<String, Long>> observable = consumer.toObservable();
    2. observable
    3. .map(record -> record.value())
    4. .buffer(256)
    5. .map(
    6. list -> list.stream().mapToDouble(n -> n).average()
    7. ).subscribe(val -> {
    8. //获取到一个平均值

    流实现与 Kafka 原生对象

    如果您希望直接操作原生的 Kafka record,您可以使用原生的 Kafka 流式对象,它可以处理原生 Kafka 对象。

    KafkaReadStream用于读取 topic partition。它是 对象的可读流对象,读到的是 ConsumerRecord 对象。

    用于向某些 topic 中写入数据。它是 对象的可写流对象。