在 的数据结构中定义 Pulsar schema 。
SchemaInfo
在 topic 上存储和执行,不能存储在命名空间或租户中。
SchemaInfo
由以下字段组成:
示例
字符串的 SchemaInfo
。
Pulsar 支持多种 schema 类型,主要可分为两类:
原始类型
复杂类型
目前,Pulsar 支持以下原始类型:
原始类型 | Description |
---|---|
BOOLEAN | 二进制值 |
INT8 | 8 位带符号整数 |
INT16 | 16 位带符号整数 |
INT32 | 32 位带符号整数 |
INT64 | 64 位带符号整数 |
FLOAT | 单精度(32位)IEEE 754 浮点数 |
DOUBLE | 双精度(64位)IEEE 754 浮点数 |
BYTES | 8 位无符号字节序列 |
STRING | Unicode 字符序列 |
TIMESTAMP (DATE , TIME ) | 表示特定时间点的逻辑类型(精度为毫秒)。 以 INT64 值存储自 1970年1月1日,00:00:00 GMT 起的毫秒数。 |
对于原始类型,Pulsar 不在 SchemaInfo
中存储任何 schema 数据。 SchemaInfo
中的类型
用于决定如何序列化和反序列化数据。
在一些原始 schema 的实现中,可以使用属性
来存储单个实现的调节设置。 例如,字符串
schema 可以将编码集存储在属性
中,用于序列化和反序列化字符串。
The conversions between Pulsar schema types and language-specific primitive types are as below.
示例
此示例展示了如何使用字符串 schema。
创建一个带有字符串 schema 的 producer 并发送消息。
Producer<String> producer = client.newProducer(Schema.STRING).create();
producer.newMessage().value("Hello Pulsar!").send();
创建一个带有字符串 schema 的 consumer 并接收消息。
Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
consumer.receive();
目前,Pulsar 支持以下复杂类型:
复杂类型 | Description |
---|---|
keyvalue | 复杂类型的键/值对。 |
struct | Supports AVRO, JSON, and Protobuf. |
keyvalue
键值
schema 定义应用程序中 schema 的键和值。
对于键值
schema 的 schemaInfo
,Pulsar 同时存储键 schema 的 SchemaInfo
和值 schema 的 SchemaInfo
。
Pulsar 编码消息中键/值对的方式有两种:
嵌入
分离
用户可以在创建键/值 schema 时选择编码类型。
嵌入
在消息有效载荷中同时编码键/值对。
分离
键将在消息键中编码,值将在消息有效载荷中编码。
示例
本示例展示了如何创建一个键/值 schema,并使用它来 produce 和 consume 消息。
创建具有
嵌入
编码类型的键/值 schema。Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
KeyValueEncodingType.INLINE
);
可选,创建一个包含
分离
编码类型的键/值 schema。Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);
使用键/值 schema consume 消息。
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);
Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema)
...
.topic(TOPIC)
.subscriptionName(SubscriptionName).subscribe();
// receive key/value pair
Message<KeyValue<Integer, String>> msg = consumer.receive();
KeyValue<Integer, String> kv = msg.getValue();
struct
Pulsar 使用 声明结构
schema 的定义。
Pulsar 允许:
使用相同的工具管理 schema 定义
使用不同的序列化/反序列化方法处理数据
使用结构
schema 的方式有两种:
静态
通用
静态
可以预定义结构
schema,可以是 Java 中的 POJO 模式,可以是 Go 中的结构
,也可以是 Avro 或 Protobuf 工具生成的类。
示例
Pulsar 使用 Avro 库从预定义结构
中获取 schema 定义。 Schema 定义是 schema 数据,存储在 SchemaInfo
中。
Create the User class to define the messages sent to Pulsar topics.
public class User {
String name;
int age;
}
用
结构
schema 创建 producer 并发送消息。Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send();
用
结构
schema 创建 consumer 并接收消息。Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();
User user = consumer.receive();
通用
有时应用程序中不包含预定义 schema,则可以使用此方法来定义 schema 并访问数据。
可以使用 GenericSchemaBuilder
来定义结构
schema,使用 GenericRecordBuilder
来生成通用结构,并 consume 消息到 GenericRecord
。
示例
使用
RecordSchemaBuilder
创建 schema。使用
RecordBuilder
创建结构记录。.set("intField", 32)
.build()).send();
如果事先不知道 Pulsar topic 的 schema 类型,则可以使用 AUTO schema 向 broker 中 produce 或从 broker 中 consume 通用记录。
AUTO_PRODUCE
AUTO_PRODUCE
schema 帮助 producer 验证其发送的字节是否与 topic 的 schema 兼容。
示例
假设:
You have a producer processing messages from a Kafka topic K.
You have a Pulsar topic P, and you do not know its schema type.
Your application reads the messages from K and writes the messages to P.
In this case, you can use AUTO_PRODUCE
to verify whether the bytes produced by K can be sent to P or not.
Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE())
…
.create();
byte[] kafkaMessageBytes = … ;
AUTO_CONSUME
AUTO_CONSUME
schema 帮助 Pulsar topic 验证其发送的字节是否与 consumer 兼容,即 Pulsar topic 使用从 broker 端检索到的 SchemaInfo
将消息反序列化为特定语言的对象。
Currently, AUTO_CONSUME
only supports AVRO and JSON schemas. 将消息反序列化为通用记录
。
示例
假设:
You have a Pulsar topic P.
You have a consumer (for example, MySQL) receiving messages from the topic P.
You application reads the messages from P and writes the messages to MySQL.
In this case, you can use AUTO_CONSUME
to verify whether the bytes produced by P can be sent to MySQL or not.
Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME())
…
.subscribe();
Message<GenericRecord> msg = consumer.receive() ;
GenericRecord record = msg.getValue();
每个存储在 topic 中的 SchemaInfo
都有一个版本。 Schema 版本管理单个 topic 中 schema 的更改。
将给定 SchemaInfo
产生的消息标记为 schema 版本,这样当消息被 Pulsar 客户端 consume 时,Pulsar 客户端可以使用 schema 版本检索相应的 SchemaInfo
,并使用 SchemaInfo
反序列化数据。
Schema版本是连续的。 Schema 存储在负责处理相关 topic 的 broker 中,以进行版本分配。
一旦分配/提供了schema的版本,后续由该生产者生产的所有消息,都将被恰当的版本所标记。
示例
以下示例展示了 schema 版本如何工作。
假设通过下面的代码创建 Pulsar Java 客户端,尝试连接到 Pulsar 并开始发送消息:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-data")
.sendTimeout(3, TimeUnit.SECONDS)
.create();
下表列出了连接尝试时可能出现的情况以及发生的原因:
场景 | 发生了什么 |
---|---|
| (2)由于现有 schema 与“SensorReading”schema 不兼容,将当前 schema 传输到并存储在 broker 中。 (3)任何使用相同 schema 或 topic 创建的 consumer 都可以 consume 来自 |
| (1)将 schema 传输到 Pulsar broker 中。 (2)Broker 确定此 schema 兼容。 (3)Broker 尝试将此 schema 存储在 中,但又认为其已存储成功,所以此 schema 被用于标记生成的消息。 |
| (1)将 schema 传输到 Pulsar broker 中。 (2)Broker 确定此 schema 兼容,并将新 schema 储存为当前版本(带有新版本号)。 |
Pulsar schemas are applied and enforced at the topic level (schemas cannot be applied at the namespace or tenant level).
Producer 和 consumer 将 schema 上传到 broker,因此 Pulsar schema 同时工作在 producer 端和 consumer 端。
此图表展示了在 producer 中 schema 如何工作。
应用程序使用 schema 实例构造 producer 实例。
Schema 实例定义了使用 producer 实例生成的数据的 schema 模式。
以 AVRO 为例,Pulsar 从 POJO 类中提取 schema 定义,并构建 producer 连接时需要传递给 broker 的
SchemaInfo
。Producer 通过从已通过 schema 实例中提取的
SchemaInfo
连接到 broker。Broker 在 schema 存储中查找 schema 以确认其是否已注册。
如已注册,broker 会跳过 schema 验证(因为已注册 schema 是已知 schema),并将 schema 版本返回给 producer。
如未注册,则 broker 需验证是否可以在此命名空间中自动创建 schema:
If
isAllowAutoUpdateSchema
sets to true, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic.If
isAllowAutoUpdateSchema
sets to false, then a schema can not be created, and the producer is rejected to connect to the broker.
Tip:
isAllowAutoUpdateSchema
can be set via Pulsar admin API or REST API.
如何通过 Pulsar admin API 设置 isAllowAutoUpdateSchema
,请参阅自动更新策略管理。
- 如果方案被允许更新,则进行兼容策略检查。
If the schema is compatible, the broker stores it and returns the schema version to the producer.
All the messages produced by this producer are tagged with the schema version.
If the schema is incompatible, the broker rejects it.
This diagram illustrates how does Schema work on the consumer side.
The application uses a schema instance to construct a consumer instance.
The schema instance defines the schema that the consumer uses for decoding messages received from a broker.
The consumer connects to the broker with the
SchemaInfo
extracted from the passed-in schema instance.Broker 决定该 topic 是否中是否包含 schema / 数据 / 本地 consumer 和本地 producer 三者中的一个。
Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:
If
isAllowAutoUpdateSchema
sets to true, then the consumer registers a schema and it is connected to a broker.If
isAllowAutoUpdateSchema
sets to false, then the consumer is rejected to connect to a broker.
Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。
Schema 通过兼容性检查,则 consumer 连接到 broker。
Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。
Consumer 收到 broker 的信息。