如果某个 schema 通过了 schema 兼容性检查,Pulsar producer 就会自动将此 schema 更新为 topic 默认创建的 schema。

Producer 会在以下情况中:

  • If a topic doesn’t have a schema, Pulsar registers a schema automatically.

  • If a topic has a schema:

    • If a producer doesn’t carry a schema:

      • If isSchemaValidationEnforced or schemaValidationEnforced is disabled in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data.

      • If isSchemaValidationEnforced or schemaValidationEnforced is enabled in the namespace to which the topic belongs, the producer is rejected and disconnected.

    • If a producer carries a schema:

      Broker 根据 topic 所属命名空间中已配置的兼容性检查策略执行兼容性检查。

      • 已注册 schema,producer 将连接到 broker。

      • 未注册 schema:

        • If isAllowAutoUpdateSchema sets to false, the producer is rejected to connect to a broker.

        • If isAllowAutoUpdateSchema sets to true:

          • Schema 通过了兼容性检查,则 broker 主动为该 topic 注册一个新的 schema,并连接到 producer。

          • Schema 未通过兼容性检查,则 broker 不会注册 schema,并且 producer 连接到 broker 的请求被拒。

Consumer 的自动更新

Consumer 会在以下情况中自动更新

  • If a consumer connects to a topic without a schema (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.

  • If a consumer connects to a topic with a schema.

    • Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:

      • If isAllowAutoUpdateSchema sets to true, then the consumer registers a schema and it is connected to a broker.

      • If isAllowAutoUpdateSchema sets to false, then the consumer is rejected to connect to a broker.

    • Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。

      • Schema 通过兼容性检查,则 consumer 连接到 broker。

      • Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。

Consumer 自动更新

可以使用 pulsar-admin 命令来管理 自动更新 策略,如下所示:

启用自动更新

可以使用 pulsar-admin命令在命名空间上启用 自动更新

禁用自动更新

要在名称空间上禁用AutoUpdate,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace

禁用AutoUpdate后,你只能使用pulsar-admin命令注册新的架构。

调整兼容性

要调整名称空间上的架构兼容性级别,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace

Schema 验证

By default, schemaValidationEnforced is disabled for producers:

  • 没有 schema 的 producer 可以使用 schema 向 topic 中生成任何类型的消息,这可能会导致 topic 中有垃圾数据。

  • 这样,不支持 schema 的非 Java 语言客户端就可以使用 schema 向 topic 生成消息。

但是,如果你希望对具有架构的主题有更强的保证,则可以在整个群集中或在每个命名空间的基础上启用schemaValidationEnforced

启用 Schema 验证

要在名称空间上启用schemaValidationEnforced,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace

禁用 Schema 验证

要在名称空间上禁用schemaValidationEnforced,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace

Schema 手动管理

To upload (register) a new schema for a topic, you can use one of the following methods.

Admin CLI

REST API

Use the upload subcommand.

  1. $ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>

The schema-definition-file is in JSON format.

  1. { "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The schema-definition-file includes the following fields:

字段Description

type

Schema 类型。

schema

Schema 定义数据以 UTF 8 字符集编码。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。

Here are examples of the schema-definition-file for a JSON schema.

示例 1

示例 2

  1. { "type": "STRING", "schema": "", "properties": { "key1": "value1" }}

Send a POST request to this endpoint:

The post payload is in JSON format.

  1. { "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The post payload includes the following fields:

字段Description

type

Schema 类型。

schema

Schema 定义数据以 UTF 8 字符集编码。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

与 schema 相关的其他属性。
  1. void createSchema(String topic, PostSchemaPayload schemaPayload)

The PostSchemaPayload includes the following fields:

Here is an example of PostSchemaPayload:

  1. PulsarAdmin admin = …;PostSchemaPayload payload = new PostSchemaPayload();payload.setType("INT8");payload.setSchema("");admin.createSchema("my-tenant/my-ns/my-topic", payload); ```bash$ pulsar-admin schemas get <topic-name>{ "version": 0, "type": "String", "timestamp": 0, "data": "string", "properties": { "property1": "string", "property2": "string" }}```<!--REST API-->Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchema}Here is an example of a response, which is returned in JSON format.```json{ "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The response includes the following fields:

字段Description

version

Schema 版本,是一个长数。

type

Schema 类型。
创建此版本 schema 的时间戳。

data

Schema 定义数据以 UTF 8 字符集编码。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。
  1. SchemaInfo createSchema(String topic)

The SchemaInfo includes the following fields:

字段Description

name

Schema 名称。

type

Schema 类型。

schema

Schema 定义数据的字节数组以 UTF 8 字符集编码。

如果该 schema 是“primitive” schema,则此字节数组应为空。

如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。

  1. properties
    </td>
    <td> schema 相关的其他属性。</td>

Here is an example of SchemaInfo:

The response includes the following fields:

  1. SchemaInfo createSchema(String topic, long version)

The SchemaInfo includes the following fields:

字段Description

name

Schema 名称。

type

Schema 类型。

schema

Schema 定义数据的字节数组以 UTF 8 字符集编码。

如果该 schema 是“primitive” schema,则此字节数组应为空。

如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。

  1. properties
    </td>
    <td> schema 相关的其他属性。</td>

Here is an example of SchemaInfo:

  1. PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);```bash$ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>

删除 schema

To delete a schema for a topic, you can use one of the following methods.

使用 删除 子命令。

  1. $ pulsar-admin schemas delete <topic-name>
  2. ```<!--REST API-->Send a `DELETE` request to a schema endpoint: {@inject: endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema}
  3. Here is an example of a response, which is returned in JSON format.
  4. ```json
  5. {
  6. "version": "<the-latest-version-number-of-the-schema>",
  7. }

The response includes the following field:

字段Description
Schema 版本,是一个长数。
  1. void deleteSchema(String topic)

Here is an example of deleting a schema.

  1. PulsarAdmin admin = …;
  2. ```<!--END_DOCUSAURUS_CODE_TABS-->## 自定义schema存储
  3. By default, Pulsar stores various data types of schemas in [Apache BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar.
  4. However, you can use another storage system if needed.
  5. ### 实现
  6. To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces:
  7. * [SchemaStorage 接口](#schemastorage-interface)
  8. * [SchemaStorageFactory接口](#schemastoragefactory-interface)
  9. #### SchemaStorage 接口
  10. `SchemaStorage`接口包含以下方法:
  11. ```java
  12. public interface SchemaStorage {
  13. // 如何更新schema
  14. CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
  15. // 如何从存储中获取schema
  16. CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
  17. // 如何删除schema
  18. CompletableFuture<SchemaVersion> delete(String key);
  19. // 用户将schema字节数据转换为SchemaVersion 对象的工具方法
  20. SchemaVersion versionFromBytes(byte[] version);
  21. // 启动schema存储客户端
  22. void start() throws Exception;
  23. // 关闭schema存储客户端
  24. void close() throws Exception;
  25. }

SchemaStorageFactory接口

The SchemaStorageFactory interface has the following method:

To use your custom schema storage implementation, perform the following steps.

  1. 将实现打包到 JAR 文件中。

  2. 将 JAR 文件添加到 Pulsar 二进制或 source 分布中的 lib 文件夹中。

  3. 将 中的 schemaRegistryStorageClassName 配置更改为自定义工厂类。