Vert.x Kafka 客户端

该组件提供了 Kafka 客户端, 可以用与给 Apache Kafka 集群发送信息,或从中读取信息。

作为消费者,接口提供了订阅主题分区,并异步地 接收消息,或将消息作为流进行处理(甚至可以做到中止或重启数据流)的方法。

作为生产者,接口提供了流式向主题分区发送消息的方法。

使用 Vert.x 的 Kafka 客户端

为了使用该组件, 需要在您的构建描述文件中的依赖配置中添加如下内容:

  • Maven (在您的 pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-kafka-client</artifactId>
 <version>4.1.8</version>
</dependency>
  • Gradle (在您的 build.gradle 文件中):

compile io.vertx:vertx-kafka-client:4.1.8

创建 kafka 客户端

创建 kafka 的消费者和生产者的方式非常详细,它们都基于原生的 kafka 的客户端库工作。

在创建时,需要进行很多配置,这些配置可以参考 Apache Kafka 文档, 参见 消费者生产者.

为了方便配置, 您可以将参数放置在一个 Map 容器中,并在调用 KafkaConsumerKafkaProducer 的静态创建方法时传入。

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// 使用消费者和 Apache Kafka 交互
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

在以上代码中,我们传入了一个 Map 容器实例作为创建 KafkaConsumer 对象实例时 的参数,这样可以指定要连接的 kafka 节点列表(这里只有一个)的地址和 每个接收到的消息的键和内容的反序列化器。

创建 kafka 生产者地方法也大致相同。

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("acks", "1");

// 使用生产者和 Apache Kafka 交互
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);

加入一个消费者群组并从主题中接收消息

要开始从 kafka 的主题中接收消息, 消费者需要使用 subscribe 方法去 作为一个消费者群组(群组在创建时的属性设置里指定)的一员去订阅一组主题。

您也可以使用 subscribe 方法去 指定一个正则表达式,并订阅所有匹配该正则表达式的主题。

为了注册一个处理器去处理接收到的消息,您需要使用 handler 方法。

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// 订阅一组主题
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer.subscribe(topics);

// 或使用正则表达式
Pattern pattern = Pattern.compile("topic\\d");
consumer.subscribe(pattern);

// 或仅订阅一个主题
consumer.subscribe("a-single-topic");

您可以在调用 subscribe() 方法的前后注册消息处理器; 直到您调用了该方法并注册了消息处理器后,消息才会 开始被消费。 举个例子,您可以先调用 subscribe() 方法,再调用 seek() 方法,最后调用 handler() 方法 ,这样您可以在一个特定的偏移处开始消费消息。

消息处理器也可以在订阅时注册,这样您就可以获取订阅的结果并当操作完成时 收到通知。

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// 订阅一组主题
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer
  .subscribe(topics)
  .onSuccess(v ->
    System.out.println("subscribed")
  ).onFailure(cause ->
    System.out.println("Could not subscribe " + cause.getMessage())
  );

// 或仅订阅一个主题
consumer
  .subscribe("a-single-topic")
  .onSuccess(v ->
    System.out.println("subscribed")
  ).onFailure(cause ->
    System.out.println("Could not subscribe " + cause.getMessage())
  );

通过使用消费者群组,Kafka 集群会将同一个消费者群组下的其他消费者正在使用的分区 分配给该消费者, 因此分区可以在消费者群组中传播。

Kafka 集群会在消费者离开集群时(此时原消费者的分区可以分配给其他消费者)或 新的消费者加入集群时(新消费者的需要申请分区来读取)重新平衡分区。

您可以给 KafkaConsumer 注册一个处理器,这样 会在 kafka 集群给消费者分配或撤回主题分区时收到通知,使用 partitionsRevokedHandlerpartitionsAssignedHandler 方法注册该处理器。

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// 注册主题分区撤回和分配的处理器
consumer.partitionsAssignedHandler(topicPartitions -> {
  System.out.println("Partitions assigned");
  for (TopicPartition topicPartition : topicPartitions) {
    System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  }
});

consumer.partitionsRevokedHandler(topicPartitions -> {
  System.out.println("Partitions revoked");
  for (TopicPartition topicPartition : topicPartitions) {
    System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  }
});

// 订阅主题
consumer
  .subscribe("test")
  .onSuccess(v ->
    System.out.println("subscribed")
  ).onFailure(cause ->
    System.out.println("Could not subscribe " + cause.getMessage())
  );

在加入一个消费者群组接收消息后, 消费者可以选择使用 unsubscribe 方法 离开群组,这样就不会再收到消息

consumer.unsubscribe();

您还可以设置一个处理器来处理退出的结果

consumer
  .unsubscribe()
  .onSuccess(v ->
    System.out.println("Consumer unsubscribed")
  );

请求指定主题分区以接收消息

在接收消息时,除了加入消费者群组, 消费者也可以主动请求一个 特定的主题分区。 当消费者并不在一个消费者群组内, 那么应用就不能 依赖 kafka 的重平衡特性。

您可以使用 assign 方法 去请求特定的分区。

consumer.handler(record -> {
  System.out.println("key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

//
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(new TopicPartition()
  .setTopic("test")
  .setPartition(0));

// 请求分配特定的分区
consumer
  .assign(topicPartitions)
  .onSuccess(v -> System.out.println("Partition assigned"))
  // 成功后会从该分区获取消息
  .compose(v -> consumer.assignment())
  .onSuccess(partitions -> {
    for (TopicPartition topicPartition : partitions) {
      System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
    }
  });

使用 subscribe() 方法时, 您可以在调用 assign() 方法之前或之后注册接收消息处理器; 因为消息只会在两个方法都生效后才会被消费。 举个例子,您可以先调用 assign() 方法, 再调用 seek() 方法,最后调用 handler() 方法, 这样您就可以只消费特定分区的指定偏移之后的消息。

调用 assignment 可以让您 获取当前分配的消息分区。

通过显式请求获取消息

为了从 Kafka 接收消息,除了使用客户端内部自带的请求机制外, 客户端可以订阅 主题, 并且不注册消息处理器,并使用 poll 方法获取消息。

通过这种方式, 用户的应用可以在其需要时才执行请求以获取消息, 举个例子。

consumer
  .subscribe("test")
  .onSuccess(v -> {
    System.out.println("Consumer subscribed");

    // 每秒请求一次
    vertx.setPeriodic(1000, timerId ->
      consumer
        .poll(Duration.ofMillis(100))
        .onSuccess(records -> {
          for (int i = 0; i < records.size(); i++) {
            KafkaConsumerRecord<String, String> record = records.recordAt(i);
            System.out.println("key=" + record.key() + ",value=" + record.value() +
              ",partition=" + record.partition() + ",offset=" + record.offset());
          }
        })
        .onFailure(cause -> {
          System.out.println("Something went wrong when polling " + cause.toString());
          cause.printStackTrace();

          // 当发生错误时停止请求
          vertx.cancelTimer(timerId);
        })
    );
});

订阅成功后, 应用启动了一个定时器来执行请求并且 周期性地从 kafka 获取消息。

改变订阅或主题分区的分配

您可以在开始消费消息之后修改订阅的主题或主题分区的分配,只需要 重新调用 subscribe() 方法或 assign() 方法。

请记住,由于 kafka 客户端的内部存在消息缓存, 因此很有可能在您 调用 subscribe() 方法或 assign() 方法 之后 ,原先的消息处理器仍然 收到了旧的主题或分区的消息。 但是如果您使用了批处理器就不会发生这种情况: 一旦重新调用订阅或修改方法的完成回调被触发, 那么客户端就只会收到新的主题或分区的消息。

获取主题分区信息

您可以调用 partitionsFor 方法来获取 特定主题的分区信息。

consumer
  .partitionsFor("test")
  .onSuccess(partitions -> {
    for (PartitionInfo partitionInfo : partitions) {
      System.out.println(partitionInfo);
    }
  });

您也可以调用 listTopics 方法获取所有当前主题的 分区信息。

consumer
  .listTopics()
  .onSuccess(partitionsTopicMap ->
    partitionsTopicMap.forEach((topic, partitions) -> {
      System.out.println("topic = " + topic);
      System.out.println("partitions = " + partitions);
    })
  );

手动提交偏移

Apache Kafka 的消费者一般会处理最后一个读取的消息的偏移。

一般情况下,kafka 的客户端会自动地在每次从主题分区获取一批消息 后通过提交操作处理。 配置参数 enable.auto.commit 会在客户端被创建时设置 为 true

手动提交偏移,可以使用 commit 方法。 这样可以确保 至少一次 提交偏移前消息已经被 处理了。

consumer.commit().onSuccess(v ->
  System.out.println("Last read message offset committed")
);

在消息分区内查询

Apache Kafka 可以保存一段时间内的消息数据,并且消费者可以在消息分区内查询 并获取任意一条消息。

您可以使用 seek 方法来改变读取时的偏移,并移动到 特定的位置

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// 移动特定的偏移
consumer
  .seek(topicPartition, 10)
  .onSuccess(v -> System.out.println("Seeking done"));

当消费者需要从开始处重新获取消息时,可以使用 seekToBeginning

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// 移动偏移到分区开头
consumer
  .seekToBeginning(Collections.singleton(topicPartition))
  .onSuccess(v -> System.out.println("Seeking done"));

最后,seekToEnd 可以用于将偏移移动到分区的结尾

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// 移动偏移到分区末尾
consumer
  .seekToEnd(Collections.singleton(topicPartition))
  .onSuccess(v -> System.out.println("Seeking done"));

请记住,由于 kafka 客户端的内部存在消息缓存, 因此很有可能在您 调用完 seek*() 方法 之后 原有的消息处理器仍在获取原先 偏移处的消息。 但是如果您使用了批处理器就不会发生这种情况: 一旦 seek*() 的完成回调被触发, 消息处理器就只会接收到新的偏移处的消息。

查询偏移

您可以使用在 Kafka 0.10.1.1 引入的 beginningOffsets 接口来获取指定分区的 第一个偏移。 与 seekToBeginning 方法不同的是, 该接口并不会改变当前客户端的偏移。

Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);

consumer
  .beginningOffsets(topicPartitions)
  .onSuccess(results ->
    results.forEach((topic, beginningOffset) ->
      System.out.println(
        "Beginning offset for topic=" + topic.getTopic() + ", partition=" +
          topic.getPartition() + ", beginningOffset=" + beginningOffset
      )
    )
  );

// 方便地获取一个分区的偏移
consumer
  .beginningOffsets(topicPartition)
  .onSuccess(beginningOffset ->
    System.out.println(
      "Beginning offset for topic=" + topicPartition.getTopic() + ", partition=" +
        topicPartition.getPartition() + ", beginningOffset=" + beginningOffset
    )
  );

您可以使用在 Kafka 0.10.1.1 引入的 endOffsets 接口来获取指定分区的 结尾偏移。 与 seekToEnd 方法不同的是, 该接口并不会改变当前客户端的偏移。

Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);

consumer.endOffsets(topicPartitions)
  .onSuccess(results ->
    results.forEach((topic, beginningOffset) ->
      System.out.println(
        "End offset for topic=" + topic.getTopic() + ", partition=" +
          topic.getPartition() + ", beginningOffset=" + beginningOffset
      )
    )
  );

// 方便地获取一个分区的偏移
consumer
  .endOffsets(topicPartition)
  .onSuccess(endOffset ->
    System.out.println(
      "End offset for topic=" + topicPartition.getTopic() + ", partition=" +
        topicPartition.getPartition() + ", endOffset=" + endOffset
    )
);

您可以使用在 Kafka 0.10.1.1 引入的 endOffsets 接口来根据时间戳获取指定分区的 偏移。查询参数是一个 unix 时间戳,而返回的结果是满足 摄入时间 >= 给定时间条件的最小偏移。

Map<TopicPartition, Long> topicPartitionsWithTimestamps = new HashMap<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);

// 我们想知道 60 秒前摄入消息的偏移
long timestamp = (System.currentTimeMillis() - 60000);

topicPartitionsWithTimestamps.put(topicPartition, timestamp);
consumer
  .offsetsForTimes(topicPartitionsWithTimestamps)
  .onSuccess(results ->
    results.forEach((topic, offset) ->
      System.out.println(
        "Offset for topic=" + topic.getTopic() +
        ", partition=" + topic.getPartition() + "\n" +
        ", timestamp=" + timestamp + ", offset=" + offset.getOffset() +
        ", offsetTimestamp=" + offset.getTimestamp()
      )
    )
);

// 方便地获取一个分区的偏移
consumer.offsetsForTimes(topicPartition, timestamp).onSuccess(offsetAndTimestamp ->
  System.out.println(
    "Offset for topic=" + topicPartition.getTopic() +
    ", partition=" + topicPartition.getPartition() + "\n" +
    ", timestamp=" + timestamp + ", offset=" + offsetAndTimestamp.getOffset() +
    ", offsetTimestamp=" + offsetAndTimestamp.getTimestamp()
  )
);

消息流控制

kafka 的消费者可以控制消息的流入,并且暂停 / 重启从一个主题中读取消息的操作。当消费者需要 更多时间去处理当前消息时,它可以暂停消息流,它也可以重启消息流 去继续处理消息。

为了这么做,您可以使用 pause 方法和 resume 方法。

在对特定的主题分区调用了暂停和重启方法后,消息处理器有可能仍然会从 已经暂停了的主题分区接收消息,即使是在 pause() 方法的完成回调 已经被调用之后 。 如果您使用了批处理器,一旦您调用 pause() 方法的完成回调被调用, 消费者就只能从未被暂停的主题分区 接收消息。

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// 注册消息处理器
consumer.handler(record -> {
  System.out.println("key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());

  // 在接收消息的偏移到达 5 之后暂停 / 重启分区 0 的消息流
  if ((record.partition() == 0) && (record.offset() == 5)) {

    // pause the read operations
    consumer.pause(topicPartition)
      .onSuccess(v -> System.out.println("Paused"))
      .onSuccess(v -> vertx.setTimer(5000, timeId ->
        // 重启读操作
        consumer.resume(topicPartition)
      ));
  }
});

关闭消费者

调用 close 方法来关闭消费者。 关闭消费者会关闭其所持有的所有连接并释放它所有的消费者资源。

close 方法是异步的并且在方法返回时可能还未完成。 如果您想在关闭完成后 收到通知,那么可以向其传递一个回调处理器。

该回调处理器会在关闭操作完全完成后被调用。

consumer
  .close()
  .onSuccess(v -> System.out.println("Consumer is now closed"))
  .onFailure(cause -> System.out.println("Close failed: " + cause));

向主题发送消息

您可以使用 write 方法去发送消息 (记录) 给主题。

最简单的发送消息的方法是指定目标主题和相对应的值, 忽略它的键 和分区,这种情况下消息会以轮流循环的方式呗发送给该主题的所有分区。

for (int i = 0; i < 5; i++) {

  // 只设置主题和消息内容的情况下,消息会被循环轮流发送给目的主题的所有分区
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", "message_" + i);

  producer.write(record);
}

在发送消息成功时,您可以接收到该消息在 kafka 中的元数据,例如它的主题,目标分区和它在存储中的偏移。

for (int i = 0; i < 5; i++) {

  // 只设置主题和消息内容的情况下,消息会被循环轮流发送给目的主题的所有分区
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", "message_" + i);

  producer.send(record).onSuccess(recordMetadata ->
    System.out.println(
      "Message " + record.value() + " written on topic=" + recordMetadata.getTopic() +
      ", partition=" + recordMetadata.getPartition() +
      ", offset=" + recordMetadata.getOffset()
    )
  );
}

当您需要指定消息发送的分区时,您需要指定它的分区标识符或 消息的键。

for (int i = 0; i < 10; i++) {

  // 指定分区
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", null, "message_" + i, 0);

  producer.write(record);
}

由于消息的生产者使用键的哈希计算对应的主题分区,您可以利用这一点保证拥有相同键的所有 消息都按照顺序被发送给一个相同的分区。

for (int i = 0; i < 10; i++) {

  // 根据奇偶性设置消息的键
  int key = i % 2;

  // 指定一个消息的键,所有键相同的消息会被发给同一个分区
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", String.valueOf(key), "message_" + i);

  producer.write(record);
}

请记住:可共用的生产者通过 createShared 方法的第一次调用创建,并且它的配置在此时被设置, 可共用的生产者使用时必须确保配置相同。

公用生产者

有时您需要在多个 verticle 或上下文(context)中共享同一个生产者。

使用 KafkaProducer.createShared 方法 返回一个可以被安全地共用的 producer。

KafkaProducer<String, String> producer1 = KafkaProducer.createShared(vertx, "the-producer", config);

// 之后您可以关闭它
producer1.close();

通过该方法返回的生产者会共享相同的资源(线程,连接) 。

当您使用完毕该生产者后,可以简单地关闭它。 当所有共用的生产者被关闭后,所有的资源 也会被释放。

关闭生产者

调用 close 方法来关闭生产者。关闭生产者会关闭其打开的连接并释放其所占有的所有资源。

关闭是异步进行的,因此在调用返回时生产者可能还没有完全关闭。 如果您想在 关闭完成时收到通知,那么您可以传入一个回调。

这个回调会在生产者被完全关闭后调用。

producer
  .close()
  .onSuccess(v -> System.out.println("Producer is now closed"))
  .onFailure(cause -> System.out.println("Close failed: " + cause));

获取主题分片信息

您可以调用 partitionsFor 方法来获取 指定主题的分片信息:

producer
  .partitionsFor("test")
  .onSuccess(partitions ->
    partitions.forEach(System.out::println)
  );

处理错误

kafka 客户端(消费者或生产者)和 kafka 集群间的异常处理 (例如连接超时) 需要用到 exceptionHandler 方法或 exceptionHandler 方法

consumer.exceptionHandler(e -> {
  System.out.println("Error = " + e.getMessage());
});

verticle 的自动清理

如果您是在 verticle 中创建 kafka 的消费者和生产者的,那么这些消费者和生产者会在 该 verticle 被取消部署时被自动清理。

使用 Vert.x 的序列化器 / 反序列化器

Vert.x 的 Kafka 客户端的实现自带了对 Buffer 数据类型, json 对象 和 json 对象数组的序列化器和反序列化器的包装。

使用消费者时您可以直接接收 Buffer 数据类型

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// 创建一个可以反序列化 json 对象的消费者
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// 创建一个可以反序列化 json 对象数组的消费者
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

在生产者端,您也可以这么做

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("acks", "1");

// 创建一个可以序列化 json 对象的生产者
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("acks", "1");

// 创建一个可以序列化 json 对象数组的生产者
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("acks", "1");

您可以在创建时直接指定序列化器/反序列化器:

对于消费者

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// 创建一个可以反序列化 Buffer 数据类型的消费者
KafkaConsumer<Buffer, Buffer> bufferConsumer = KafkaConsumer.create(vertx, config, Buffer.class, Buffer.class);

// 创建一个可以反序列化 json 对象的消费者
KafkaConsumer<JsonObject, JsonObject> jsonObjectConsumer = KafkaConsumer.create(vertx, config, JsonObject.class, JsonObject.class);

// 创建一个可以反序列化 json 对象数组的消费者
KafkaConsumer<JsonArray, JsonArray> jsonArrayConsumer = KafkaConsumer.create(vertx, config, JsonArray.class, JsonArray.class);

而对于生产者

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("acks", "1");

// 创建一个可以序列化 Buffer 数据类型的生产者
KafkaProducer<Buffer, Buffer> bufferProducer = KafkaProducer.create(vertx, config, Buffer.class, Buffer.class);

// 创建一个可以序列化 json 对象的生产者
KafkaProducer<JsonObject, JsonObject> jsonObjectProducer = KafkaProducer.create(vertx, config, JsonObject.class, JsonObject.class);

// 创建一个可以序列化 json 对象数组的生产者
KafkaProducer<JsonArray, JsonArray> jsonArrayProducer = KafkaProducer.create(vertx, config, JsonArray.class, JsonArray.class);

RxJava 3 接口

Kafka 客户端提供了在原有 API 基础上的响应式接口

Observable<KafkaConsumerRecord<String, Long>> observable = consumer.toObservable();

observable
  .map(record -> record.value())
  .buffer(256)
  .map(
  list -> list.stream().mapToDouble(n -> n).average()
).subscribe(val -> {

  // 获取平均值

});

自动追踪传播

当您配置 Vert.x 开启追踪时 (参见 setTracingOptions), 追踪可以通过 Kafka 的消息自动传播。

Kafka 的生产者会在写入消息时自动添加一个 Span 去追踪,追踪的上下文通过 Kafka 消息头部传递。并且消费者会在收到消息后根据消息头部信息重建 Span。

参考以下信息 OpenTracing semantic convention, Span 的标签包括:

  • span.kind,类型是 consumerproducer

  • peer.address 可以使用 setTracePeerAddress 配置。如果没有设置,那么会使用配置中的 Kafka 服务器地址

  • peer.hostname 通过解析 peer.address 得到

  • peer.port 通过解析 peer.address 得到

  • peer.service 一直是 always kafka

  • message_bus.destination, 会设置为 kafka 消息的主题

Unresolved directive in index.adoc - include::admin.adoc[]