In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in :

引入 Pulsar Kafka 包装器

现存的代码无需任何更改即可使用新的依赖。 需要修改的是配置信息,应确保将生产者和消费者使用 Pulsar 服务,而不是Kafka,并使用特定的 Pulsar 主题。

使用 Pulsar Kafka 兼容包装器和现有 kafka 客户端

When using this dependency, construct producers using org.apache.kafka.clients.producer.PulsarKafkaProducer instead of org.apache.kafka.clients.producer.KafkaProducer and org.apache.kafka.clients.producer.PulsarKafkaConsumer for consumers.

消费者示例

查看完整的生产者和消费者示例

兼容性列表

目前,Pulsar Kafka 包装器支持 Kafka API 提供的大多数操作。

Producer

属性:

配置属性支持备注
acks已忽略持久性和法定写作已在命名空间级别上配置
auto.offset.resetYes若用户为提供特定设置,则默认值为 latest
batch.size已忽略
bootstrap.serversYes
buffer.memory已忽略
client.id已忽略
compression.typeYesAllows gzip and lz4. No snappy.
connections.max.idle.msYes仅支持最多2 147 483 647 000 (Integrer.MAX_VALUE * 1000)毫秒的闲置时间
interceptor.classesYes
key.serializerYes
linger.msYes批处理时控制提交时间
max.block.ms已忽略
max.in.flight.requests.per.connection已忽略Pulsar中即使有多个请求也会保证顺序
max.request.size已忽略
metric.reporters已忽略
metrics.num.samples已忽略
metrics.sample.window.ms已忽略
partitioner.classYes
receive.buffer.bytes已忽略
reconnect.backoff.ms已忽略
request.timeout.ms已忽略
retries已忽略Pulsar client retries with exponential backoff until the send timeout expires.
send.buffer.bytes已忽略
timeout.msYes
value.serializerYes

Consumer

下述表格列出了consumer 接口

属性:

配置属性支持备注
group.idYes到一个Pulsar订阅名称的映射
max.poll.recordsYes
max.poll.interval.ms已忽略来自消息服务器“推送”的消息
session.timeout.ms已忽略
heartbeat.interval.ms已忽略
bootstrap.serversYes指定一个pulsar服务地址
enable.auto.commitYes
auto.commit.interval.ms已忽略自动提交后, acks 立即发送给 broker.
partition.assignment.strategy已忽略
auto.offset.resetYes仅支持最早和最新的。
fetch.min.bytes已忽略
fetch.max.bytes已忽略
fetch.max.wait.ms已忽略
interceptor.classesYes
metadata.max.age.ms已忽略
max.partition.fetch.bytes已忽略
已忽略
receive.buffer.bytes已忽略
client.id已忽略
配置属性默认值备注
指定 producer 名称。
pulsar.producer.initial.sequence.id为producer指定序列id的基线。
1000设置等待接收broker确认的消息队列最大值。
pulsar.producer.max.pending.messages.across.partitions50000设置所有分区挂起消息的最大值。
true是否允许自动批量接收消息。
pulsar.producer.batching.max.messages1000批量中的消息的最大数量。
Specify the block producer if queue is full.