本章将探讨 Pulsar Schema 如何演化以及 Pulsar Schema 的兼容性检查策略。

在 的数据结构中定义 Pulsar schema 。

每个存储在 topic 中的 SchemaInfo 都有一个版本。 版本信息用于管理 topic 内发生的 schema 更改。

使用 SchemaInfo 生成的消息用 schema 版本进行标记。 当消息被 Pulsar 客户端 consume 时,Pulsar 客户端可以使用 schema 版本检索相应的 SchemaInfo,并使用正确的 schema 信息来反序列化数据。

属性和类型的详细信息存储在 Schemas 中。 To satisfy new business requirements, you need to update schemas inevitably over time, which is called schema evolution.

Schema 的任何变化都会影响到下游 consumers。 Schema 演化确保了下游 consumers 能够无缝地处理以旧 schemas 和以新 schemas 编码的数据。

Pulsar schema 应如何演化?

答案就是 Pulsar schema 兼容性检查策略。 这一策略决定了 schema 如何将 topics 中的新、旧 schema 进行比较。

了解更多信息,请参阅 。

Pulsar 对 Schema 演化提供了怎样的支持?

  1. Producer/consumer/ 阅读器将其客户端 SchemaInfo 发送到 broker。

  2. Broker 知道 schema 类型,并找到此类型的 schema 兼容性检查器。

  3. Broker 使用检查器,以兼容性检查策略来检查 SchemaInfo 是否与 topic 的最新 schema 兼容。

    目前,兼容性检查策略是在命名空间级别配置的,适用于命名空间中的所有 topics。

Pulsar 有 8 种 schema 兼容性检查策略,如下所示。

假设某 topic 包含三个 schema(V1、V2 和 V3),V1 是最旧的,V3 是最新的:

兼容性检查策略

定义

Note

ALWAYS_COMPATIBLE

禁用 schema 兼容性检查。

禁用 schema 演化,即不允许 schema 更改。

  • 除 Avro 和 JSON 以外所有类型的 schema,其默认 schema 兼容性检查策略为 ALWAYS_INCOMPATIBLE

  • Avro 和 JSON 的默认 schema 兼容性检查策略为 `FULL’。

示例

  • 示例 1

    在某些情况下,应用程序需要将几种不同类型的事件存储在同一个 Pulsar topic 中。

    尤其是在 事件来源 风格中开发数据模型时,这种情况下会遇到多种影响实体状态的事件。

    例如,对于用户实体来说,有 用户创建用户地址变更用户查询已收到 等事件。 应用程序要求按相同顺序读取这些事件。

    因此,这些事件需要进入同一个 Pulsar 分区以保证顺序。 应用程序可以使用 ALWAYS_COMPATIBLE 策略,将不同类型的事件存储在同一个 topic 中。

BACKWARD 和 BACKWARD_TRANSITIVE

假设某 topic 包含三个 schema(V1、V2 和 V3),V1 是最旧的,V3 是最新的:

BACKWARD | Consumers using the new schema can process data written by producers using the last schema. | 使用 schema V3 的 consumer 可以处理 producer 使用 schema V3 或 V2 写入的数据。 | BACKWARD_TRANSITIVE | Consumers using the new schema can process data written by producers using all previous schemas. | 使用 schema V3 的 consumer 可以处理 producer 使用 schema V3、V2 或 V1 写入的数据。 |

示例

  • 示例 1

    删除字段。

    为处理不含某一字段的事件而构建的 consumer 可以处理旧 schema 编写的、包含此字段的事件,consumer 在处理时会忽略这一字段。

  • 示例 2

    想要将所有 Pulsar 数据加载到 Hive 数据仓库中,并进行 SQL 查询。

    即使数据被更改,仍然可以进行相同的 SQL 查询。 可以通过 BACKWARD 策略演化 schema 以实现上述操作。

FORWARD 和 FORWARD_TRANSITIVE

假设某 topic 包含三个 schema(V1、V2 和 V3),V1 是最旧的,V3 是最新的:

兼容性检查策略定义Description

FORWARD | Consumers using the last schema can process data written by producers using a new schema, even though they may not be able to use the full capabilities of the new schema. | 使用schema V3 或 V2 的 consumer 可以处理 producer 使用 schema V3 写入的数据。 | FORWARD_TRANSITIVE | Consumers using all previous schemas can process data written by producers using a new schema. | 使用 schema V3、V2 或 V1 的 consumer 可以处理 producer 使用 schema V3 写入的数据。

示例

  • 示例 1

    添加字段。

    在大多数数据格式中,写入 consumer 的进程事件中如果不包含新字段,则该 consumer 在收到包含新字段的新事件后仍然可以写入。

  • 示例 2

    如果 consumer 的一个应用程序逻辑绑定了一个完整版本的 schema ,则当 schema 演化时,应用程序逻辑可能无法立即更新。

    在这种情况下,需要用新 schema 将数据投射到应用程序可以理解的旧 schema 上。

    因此,可以使用 FORWARD 策略来演化 schema,以确保旧 schema 能够处理新schema 编码的数据。

假设某 topic 包含三个 schema(V1、V2 和 V3),V1 是最旧的,V3 是最新的:

示例

在某些数据格式中(例如:Avro),可以使用默认值来定义字段。 这样,添加或删除这些字段的更改是完全可兼容的。

当 producer 或 consumer 尝试连接到某个 topic 时,broker 会进行检查以验证 schema。

Producer

当 producer 试图连接到某个 topic 时(假定忽略 schema 自动创建),broker 会进行以下检查:

  • 检查 producer 携带的 schema 是否存在于 schema 注册表中。

    • 如果 schema 已经注册,那么 producer 就与 broker 连接,并 produce 与此 schema 相关的消息。

    • 如果 schema 没有注册,那么 Pulsar 会验证是否允许基于配置的兼容性检查策略对 schema 进行注册。

Consumer

当 consumer 尝试连接到某个 topic 时,broker 会根据配置的 schema 兼容性检查策略来检查 consumer 所携带的 schema 是否与注册的 schema 兼容。

兼容性检查策略

检查逻辑

ALWAYS_COMPATIBLE

全部通过

ALWAYS_INCOMPATIBLE

不能通过

BACKWARD

可读取上一个 schema

BACKWARD_TRANSITIVE

可以读取所有 schemas

FORWARD

可读取上一个 schema

FORWARD_TRANSITIVE

可读取上一个 schema

FULL

可读取上一个 schema

FULL_TRANSITIVE

可以读取所有 schemas

升级客户端应用程序的顺序由兼容性检查策略决定

比如,当producers使用schemas来将数据写入Pulsar,并且consumers使用schemas来从Pulsar读取数据。