自动管理
手动管理
如果某个 schema 通过了 schema 兼容性检查,Pulsar producer 就会自动将此 schema 更新为 topic 默认创建的 schema。
Producer 会在以下情况中:
If a topic doesn’t have a schema, Pulsar registers a schema automatically.
If a topic has a schema:
If a producer doesn’t carry a schema:
If
isSchemaValidationEnforced
orschemaValidationEnforced
is disabled in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data.If
isSchemaValidationEnforced
orschemaValidationEnforced
is enabled in the namespace to which the topic belongs, the producer is rejected and disconnected.
If a producer carries a schema:
Broker 根据 topic 所属命名空间中已配置的兼容性检查策略执行兼容性检查。
已注册 schema,producer 将连接到 broker。
未注册 schema:
If
isAllowAutoUpdateSchema
sets to false, the producer is rejected to connect to a broker.If
isAllowAutoUpdateSchema
sets to true:Schema 通过了兼容性检查,则 broker 主动为该 topic 注册一个新的 schema,并连接到 producer。
Schema 未通过兼容性检查,则 broker 不会注册 schema,并且 producer 连接到 broker 的请求被拒。
Consumer 的自动更新
Consumer 会在以下情况中自动更新
:
If a consumer connects to a topic without a schema (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.
If a consumer connects to a topic with a schema.
Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:
If
isAllowAutoUpdateSchema
sets to true, then the consumer registers a schema and it is connected to a broker.If
isAllowAutoUpdateSchema
sets to false, then the consumer is rejected to connect to a broker.
Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。
Schema 通过兼容性检查,则 consumer 连接到 broker。
Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。
可以使用 pulsar-admin
命令来管理 自动更新
策略,如下所示:
启用自动更新
可以使用 pulsar-admin
命令在命名空间上启用 自动更新
。
禁用自动更新
要在名称空间上禁用AutoUpdate
,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace
禁用AutoUpdate
后,你只能使用pulsar-admin
命令注册新的架构。
调整兼容性
要调整名称空间上的架构兼容性级别,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace
Schema 验证
By default, schemaValidationEnforced
is disabled for producers:
没有 schema 的 producer 可以使用 schema 向 topic 中生成任何类型的消息,这可能会导致 topic 中有垃圾数据。
这样,不支持 schema 的非 Java 语言客户端就可以使用 schema 向 topic 生成消息。
但是,如果你希望对具有架构的主题有更强的保证,则可以在整个群集中或在每个命名空间的基础上启用schemaValidationEnforced
。
启用 Schema 验证
要在名称空间上启用schemaValidationEnforced
,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
禁用 Schema 验证
要在名称空间上禁用schemaValidationEnforced
,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
Schema 手动管理
To upload (register) a new schema for a topic, you can use one of the following methods.
Admin CLI
REST API
Use the upload
subcommand.
$ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
The schema-definition-file
is in JSON format.
{ "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}
The schema-definition-file
includes the following fields:
字段 | Description |
---|---|
| Schema 类型。 |
| Schema 定义数据以 UTF 8 字符集编码。
|
| 与 schema 相关的其他属性。 |
Here are examples of the schema-definition-file
for a JSON schema.
示例 1
示例 2
{ "type": "STRING", "schema": "", "properties": { "key1": "value1" }}
Send a POST
request to this endpoint:
The post payload is in JSON format.
{ "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}
The post payload includes the following fields:
字段 | Description |
---|---|
| Schema 类型。 |
| Schema 定义数据以 UTF 8 字符集编码。
|
与 schema 相关的其他属性。 |
void createSchema(String topic, PostSchemaPayload schemaPayload)
The PostSchemaPayload
includes the following fields:
Here is an example of PostSchemaPayload
:
PulsarAdmin admin = …;PostSchemaPayload payload = new PostSchemaPayload();payload.setType("INT8");payload.setSchema("");admin.createSchema("my-tenant/my-ns/my-topic", payload); ```bash$ pulsar-admin schemas get <topic-name>{ "version": 0, "type": "String", "timestamp": 0, "data": "string", "properties": { "property1": "string", "property2": "string" }}```<!--REST API-->Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchema}Here is an example of a response, which is returned in JSON format.```json{ "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}
The response includes the following fields:
字段 | Description |
---|---|
| Schema 版本,是一个长数。 |
| Schema 类型。 |
创建此版本 schema 的时间戳。 | |
| Schema 定义数据以 UTF 8 字符集编码。
|
| 与 schema 相关的其他属性。 |
SchemaInfo createSchema(String topic)
The SchemaInfo
includes the following fields:
字段 | Description |
---|---|
| Schema 名称。 |
| Schema 类型。 |
| Schema 定义数据的字节数组以 UTF 8 字符集编码。 如果该 schema 是“primitive” schema,则此字节数组应为空。 如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。 |
|
Here is an example of SchemaInfo
:
The response includes the following fields:
SchemaInfo createSchema(String topic, long version)
The SchemaInfo
includes the following fields:
字段 | Description |
---|---|
| Schema 名称。 |
| Schema 类型。 |
| Schema 定义数据的字节数组以 UTF 8 字符集编码。 如果该 schema 是“primitive” schema,则此字节数组应为空。 如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。 |
|
Here is an example of SchemaInfo
:
PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);```bash$ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>
删除 schema
To delete a schema for a topic, you can use one of the following methods.
使用 删除
子命令。
$ pulsar-admin schemas delete <topic-name>
```<!--REST API-->Send a `DELETE` request to a schema endpoint: {@inject: endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema}
Here is an example of a response, which is returned in JSON format.
```json
{
"version": "<the-latest-version-number-of-the-schema>",
}
The response includes the following field:
字段 | Description |
---|---|
Schema 版本,是一个长数。 |
void deleteSchema(String topic)
Here is an example of deleting a schema.
PulsarAdmin admin = …;
```<!--END_DOCUSAURUS_CODE_TABS-->## 自定义schema存储
By default, Pulsar stores various data types of schemas in [Apache BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar.
However, you can use another storage system if needed.
### 实现
To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces:
* [SchemaStorage 接口](#schemastorage-interface)
* [SchemaStorageFactory接口](#schemastoragefactory-interface)
#### SchemaStorage 接口
`SchemaStorage`接口包含以下方法:
```java
public interface SchemaStorage {
// 如何更新schema
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
// 如何从存储中获取schema
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
// 如何删除schema
CompletableFuture<SchemaVersion> delete(String key);
// 用户将schema字节数据转换为SchemaVersion 对象的工具方法
SchemaVersion versionFromBytes(byte[] version);
// 启动schema存储客户端
void start() throws Exception;
// 关闭schema存储客户端
void close() throws Exception;
}
SchemaStorageFactory接口
The SchemaStorageFactory
interface has the following method:
To use your custom schema storage implementation, perform the following steps.
将实现打包到 JAR 文件中。
将 JAR 文件添加到 Pulsar 二进制或 source 分布中的
lib
文件夹中。将 中的
schemaRegistryStorageClassName
配置更改为自定义工厂类。