All the methods in producer, consumer, and reader of a Python client are thread-safe.

pdoc-generated API docs for the Python client are available .

You can install the pulsar-client library either via , using pip, or by building the library from source.

To install the pulsar-client library as a pre-built package using the package manager:

Installation via PyPi is available for the following Python versions:

Install from source

To install the pulsar-client library by building from source, follow and compile the Pulsar C++ client library. That builds the Python binding for the library.

To install the built Python bindings:

  1. $ git clone https://github.com/apache/pulsar
  2. $ cd pulsar/pulsar-client-cpp/python
  3. $ sudo python setup.py install

You can find a variety of Python code examples for the pulsar-client library.

生产者示例

The following example creates a Python producer for the my-topic topic and sends 10 messages on that topic:

  1. import pulsar
  2. client = pulsar.Client('pulsar://localhost:6650')
  3. producer = client.create_producer('my-topic')
  4. for i in range(10):
  5. producer.send(('Hello-%d' % i).encode('utf-8'))

The following example creates a consumer with the my-subscription subscription name on the my-topic topic, receives incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker.

  1. consumer = client.subscribe('my-topic', 'my-subscription')
  2. while True:
  3. msg = consumer.receive()
  4. try:
  5. print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
  6. # 确认已经成功处理消息
  7. consumer.acknowledge(msg)
  8. except:
  9. # 消息处理失败
  10. consumer.negative_acknowledge(msg)
  11. client.close()

读者接口示例

You can use the Pulsar Python API to use the Pulsar reader interface. Here’s an example:

多主题订阅

In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously. To use multi-topic subscriptions, you can supply a regular expression (regex) or a List of topics. 如果通过 regex 选择主题, 则所有主题都必须位于同一Pulsar命名空间中。

The following is an example.

  1. import re
  2. consumer = client.subscribe(re.compile('persistent://public/default/topic-*'), 'my-subscription')
  3. while True:
  4. msg = consumer.receive()
  5. print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
  6. # Acknowledge successful processing of the message
  7. consumer.acknowledge(msg)
  8. except:
  9. # Message failed to be processed
  10. consumer.negative_acknowledge(msg)
  11. client.close()

You can declare a schema by passing a class that inherits from pulsar.schema.Record and defines the fields as class variables. 例如:

  1. from pulsar.schema import *
  2. class Example(Record):
  3. a = String()
  4. c = Boolean()
  1. producer = client.create_producer(
  2. topic='my-topic',
  3. schema=AvroSchema(Example) )
  4. producer.send(Example(a='Hello', b=1))

After creating the producer, the Pulsar broker validates that the existing topic schema is indeed of “Avro” type and that the format is compatible with the schema definition of the Example class.

If there is a mismatch, an exception occurs in the producer creation.

Once a producer is created with a certain schema definition, it will only accept objects that are instances of the declared schema class.

Similarly, for a consumer/reader, the consumer will return an object, instance of the schema record class, rather than the raw bytes:

Supported schema types

You can use different builtin schema types in Pulsar. All the definitions are in the pulsar.schema package.

Schema definition reference

The schema definition is done through a class that inherits from pulsar.schema.Record.

This class has a number of fields which can be of either pulsar.schema.Field type or another nested Record. All the fields are specified in the pulsar.schema package. The fields are matching the AVRO fields types.

字段参数

When adding a field, you can use these parameters in the constructor.

Schema 定义示例

简单定义
  1. class Example(Record):
  2. a = String()
  3. b = Integer()
  4. c = Array(String())
  5. i = Map(String())
使用枚举
  1. from enum import Enum
  2. class Color(Enum):
  3. red = 1
  4. green = 2
  5. blue = 3
  6. class Example(Record):
  7. name = String()
  8. color = Color
复杂类型
  1. class MySubRecord(Record):
  2. x = Integer()
  3. y = Long()
  4. z = String()
  5. class Example(Record):
  6. sub = MySubRecord()