自动管理
手动管理
如果某个 schema 通过了 schema 兼容性检查,Pulsar producer 就会自动将此 schema 更新为 topic 默认创建的 schema。
Producer 会在以下情况中:
If a topic doesn’t have a schema, Pulsar registers a schema automatically.
If a topic has a schema:
If a producer doesn’t carry a schema:
If
isSchemaValidationEnforced
orschemaValidationEnforced
is disabled in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data.
If a producer carries a schema:
Broker 根据 topic 所属命名空间中已配置的兼容性检查策略执行兼容性检查。
已注册 schema,producer 将连接到 broker。
未注册 schema:
If
isAllowAutoUpdateSchema
sets to false, the producer is rejected to connect to a broker.If
isAllowAutoUpdateSchema
sets to true:Schema 通过了兼容性检查,则 broker 主动为该 topic 注册一个新的 schema,并连接到 producer。
Schema 未通过兼容性检查,则 broker 不会注册 schema,并且 producer 连接到 broker 的请求被拒。
Consumer 的自动更新
Consumer 会在以下情况中自动更新
:
If a consumer connects to a topic without a schema (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.
If a consumer connects to a topic with a schema.
Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:
If
isAllowAutoUpdateSchema
sets to true, then the consumer registers a schema and it is connected to a broker.If
isAllowAutoUpdateSchema
sets to false, then the consumer is rejected to connect to a broker.
Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。
Schema 通过兼容性检查,则 consumer 连接到 broker。
Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。
管理自动更新策略
可以使用 pulsar-admin
命令来管理 自动更新
策略,如下所示:
启用自动更新
可以使用 pulsar-admin
命令在命名空间上启用 自动更新
。
禁用自动更新
要在名称空间上禁用AutoUpdate
,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace
禁用AutoUpdate
后,你只能使用pulsar-admin
命令注册新的架构。
调整兼容性
要调整名称空间上的架构兼容性级别,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace
By default, schemaValidationEnforced
is disabled for producers:
没有 schema 的 producer 可以使用 schema 向 topic 中生成任何类型的消息,这可能会导致 topic 中有垃圾数据。
这样,不支持 schema 的非 Java 语言客户端就可以使用 schema 向 topic 生成消息。
但是,如果你希望对具有架构的主题有更强的保证,则可以在整个群集中或在每个命名空间的基础上启用schemaValidationEnforced
。
启用 Schema 验证
要在名称空间上启用schemaValidationEnforced
,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
禁用 Schema 验证
要在名称空间上禁用schemaValidationEnforced
,可以使用pulsar-admin
命令。
bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
若要管理架构,可以使用以下方法之一。
更新 Schema
Admin CLI
REST API
Java Admin API
Use the upload
subcommand.
$ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
The schema-definition-file
is in JSON format.
{ "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}
The includes the following fields:
字段 | Description |
---|---|
| Schema 类型。 |
| Schema 定义数据以 UTF 8 字符集编码。
|
| 与 schema 相关的其他属性。 |
Here are examples of the schema-definition-file
for a JSON schema.
示例 1
{ "type": "JSON", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}", "properties": {}}
示例 2
{ "type": "STRING", "schema": "", "properties": { "key1": "value1" }}
Send a POST
request to this endpoint: POST /admin/v2/schemas/:tenant/:namespace/:topic/schema
The post payload is in JSON format.
The post payload includes the following fields:
字段 | Description |
---|---|
| Schema 类型。 |
| Schema 定义数据以 UTF 8 字符集编码。
|
| 与 schema 相关的其他属性。 |
void createSchema(String topic, PostSchemaPayload schemaPayload)
The PostSchemaPayload
includes the following fields:
Here is an example of PostSchemaPayload
:
PulsarAdmin admin = …;PostSchemaPayload payload = new PostSchemaPayload();payload.setType("INT8");payload.setSchema("");admin.createSchema("my-tenant/my-ns/my-topic", payload);
获取 Schema(最新版本)
To get the latest schema for a topic, you can use one of the following methods.
Admin CLI
REST API
Java Admin API
使用 获得
子命令。
$ pulsar-admin schemas get <topic-name>{ "version": 0, "type": "String", "timestamp": 0, "data": "string", "properties": { "property1": "string", "property2": "string" }}
Send a GET
request to this endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema
Here is an example of a response, which is returned in JSON format.
{ "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}
The response includes the following fields:
字段 | Description |
---|---|
| Schema 版本,是一个长数。 |
| Schema 类型。 |
| 创建此版本 schema 的时间戳。 |
| Schema 定义数据以 UTF 8 字符集编码。
|
| 与 schema 相关的其他属性。 |
SchemaInfo createSchema(String topic)
The SchemaInfo
includes the following fields:
字段 | Description |
---|---|
| Schema 名称。 |
| Schema 类型。 |
| Schema 定义数据的字节数组以 UTF 8 字符集编码。 如果该 schema 是“primitive” schema,则此字节数组应为空。 如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。 |
|
Here is an example of SchemaInfo
:
To get a specific version of a schema, you can use one of the following methods.
Admin CLI
REST API
Java Admin API
使用 获得
子命令。
$ pulsar-admin schemas get <topic-name> --version=<version>
Send a GET
request to a schema endpoint:
Here is an example of a response, which is returned in JSON format.
The response includes the following fields:
SchemaInfo createSchema(String topic, long version)
The SchemaInfo
includes the following fields:
字段 | Description |
---|---|
| Schema 名称。 |
| Schema 类型。 |
| Schema 定义数据的字节数组以 UTF 8 字符集编码。 如果该 schema 是“primitive” schema,则此字节数组应为空。 如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。 |
|
Here is an example of SchemaInfo
:
PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
解压缩 schema
To provide a schema via a topic, you can use the following method.
Admin CLI
Use the extract
subcommand.
$ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>
删除 schema
To delete a schema for a topic, you can use one of the following methods.
Admin CLI
REST API
Java Admin API
使用 删除
子命令。
$ pulsar-admin schemas delete <topic-name>
Send a DELETE
request to a schema endpoint: DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema
Here is an example of a response, which is returned in JSON format.
{ "version": "<the-latest-version-number-of-the-schema>",}
The response includes the following field:
字段 | Description |
---|---|
version | Schema 版本,是一个长数。 |
void deleteSchema(String topic)
Here is an example of deleting a schema.
PulsarAdmin admin = …;admin.deleteSchema("my-tenant/my-ns/my-topic");
By default, Pulsar stores various data types of schemas in deployed alongside Pulsar.
However, you can use another storage system if needed.
To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces:
SchemaStorage 接口
SchemaStorage
接口包含以下方法:
SchemaStorageFactory接口
The SchemaStorageFactory
interface has the following method:
public interface SchemaStorageFactory {
@NotNull
SchemaStorage create(PulsarService pulsar) throws Exception;
}
部署
To use your custom schema storage implementation, perform the following steps.
将实现打包到 文件中。
将 JAR 文件添加到 Pulsar 二进制或 source 分布中的
lib
文件夹中。