In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in :
引入 Pulsar Kafka 包装器
现存的代码无需任何更改即可使用新的依赖。 需要修改的是配置信息,应确保将生产者和消费者使用 Pulsar 服务,而不是Kafka,并使用特定的 Pulsar 主题。
使用 Pulsar Kafka 兼容包装器和现有 kafka 客户端
When using this dependency, construct producers using org.apache.kafka.clients.producer.PulsarKafkaProducer
instead of org.apache.kafka.clients.producer.KafkaProducer
and org.apache.kafka.clients.producer.PulsarKafkaConsumer
for consumers.
消费者示例
查看完整的生产者和消费者示例
兼容性列表
目前,Pulsar Kafka 包装器支持 Kafka API 提供的大多数操作。
Producer
属性:
配置属性 | 支持 | 备注 |
---|---|---|
acks | 已忽略 | 持久性和法定写作已在命名空间级别上配置 |
auto.offset.reset | Yes | 若用户为提供特定设置,则默认值为 latest 。 |
batch.size | 已忽略 | |
bootstrap.servers | Yes | |
buffer.memory | 已忽略 | |
client.id | 已忽略 | |
compression.type | Yes | Allows gzip and lz4 . No snappy . |
connections.max.idle.ms | Yes | 仅支持最多2 147 483 647 000 (Integrer.MAX_VALUE * 1000)毫秒的闲置时间 |
interceptor.classes | Yes | |
key.serializer | Yes | |
linger.ms | Yes | 批处理时控制提交时间 |
max.block.ms | 已忽略 | |
max.in.flight.requests.per.connection | 已忽略 | Pulsar中即使有多个请求也会保证顺序 |
max.request.size | 已忽略 | |
metric.reporters | 已忽略 | |
metrics.num.samples | 已忽略 | |
metrics.sample.window.ms | 已忽略 | |
partitioner.class | Yes | |
receive.buffer.bytes | 已忽略 | |
reconnect.backoff.ms | 已忽略 | |
request.timeout.ms | 已忽略 | |
retries | 已忽略 | Pulsar client retries with exponential backoff until the send timeout expires. |
send.buffer.bytes | 已忽略 | |
timeout.ms | Yes | |
value.serializer | Yes |
Consumer
下述表格列出了consumer 接口
属性:
配置属性 | 支持 | 备注 |
---|---|---|
group.id | Yes | 到一个Pulsar订阅名称的映射 |
max.poll.records | Yes | |
max.poll.interval.ms | 已忽略 | 来自消息服务器“推送”的消息 |
session.timeout.ms | 已忽略 | |
heartbeat.interval.ms | 已忽略 | |
bootstrap.servers | Yes | 指定一个pulsar服务地址 |
enable.auto.commit | Yes | |
auto.commit.interval.ms | 已忽略 | 自动提交后, acks 立即发送给 broker. |
partition.assignment.strategy | 已忽略 | |
auto.offset.reset | Yes | 仅支持最早和最新的。 |
fetch.min.bytes | 已忽略 | |
fetch.max.bytes | 已忽略 | |
fetch.max.wait.ms | 已忽略 | |
interceptor.classes | Yes | |
metadata.max.age.ms | 已忽略 | |
max.partition.fetch.bytes | 已忽略 | |
已忽略 | ||
receive.buffer.bytes | 已忽略 | |
client.id | 已忽略 |
配置属性 | 默认值 | 备注 |
---|---|---|
指定 producer 名称。 | ||
pulsar.producer.initial.sequence.id | 为producer指定序列id的基线。 | |
1000 | 设置等待接收broker确认的消息队列最大值。 | |
pulsar.producer.max.pending.messages.across.partitions | 50000 | 设置所有分区挂起消息的最大值。 |
true | 是否允许自动批量接收消息。 | |
pulsar.producer.batching.max.messages | 1000 | 批量中的消息的最大数量。 |
Specify the block producer if queue is full. |