Confluent Avro Format
Avro Schema Registry () 格式能让你读取被 io.confluent.kafka.serializers.KafkaAvroSerializer
序列化的记录,以及可以写入成能被 反序列化的记录。
当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。
当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.subject
参数来制定。
Avro Schema Registry 格式只能与 或 Upsert Kafka SQL 连接器一起使用。
For Maven, SBT, Gradle, or other build automation tools, please also ensure that Confluent’s maven repository at is configured in your project’s build files.
以下是一个使用 Kafka 连接器和 Confluent Avro 格式创建表的示例。
SQL
使用原始的 UTF-8 字符串作为 Kafka 的 key,Schema Registry 中注册的 Avro 记录作为 Kafka 的 values 的表的示例:
Kafka 的 key 和 value 在 Schema Registry 中都注册为 Avro 记录的表的示例:
使用 upsert-kafka 连接器,Kafka 的 value 在 Schema Registry 中注册为 Avro 记录的表的示例:
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | required | (none) | String | Specify what format to use, here should be ‘avro-confluent’ . |
avro-confluent.basic-auth.credentials-source | optional | (none) | String | Basic auth credentials source for Schema Registry |
avro-confluent.basic-auth.user-info | optional | (none) | String | Basic auth user info for schema registry |
avro-confluent.bearer-auth.credentials-source | optional | (none) | String | Bearer auth credentials source for Schema Registry |
avro-confluent.bearer-auth.token | optional | (none) | String | Bearer auth token for Schema Registry |
avro-confluent.properties | optional | (none) | Map | Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence. |
avro-confluent.ssl.keystore.location | optional | (none) | String | Location / File of SSL keystore |
avro-confluent.ssl.keystore.password | optional | (none) | String | Password for SSL keystore |
avro-confluent.ssl.truststore.location | optional | (none) | String | Location / File of SSL truststore |
avro-confluent.ssl.truststore.password | optional | (none) | String | Password for SSL truststore |
avro-confluent.subject | optional | (none) | String | The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, ‘kafka’ and ‘upsert-kafka’ connectors use ‘<topic_name>-value’ or ‘<topic_name>-key’ as the default subject name if this format is used as the value or key format. But for other connectors (e.g. ‘filesystem’), the subject option is required when used as sink. |
avro-confluent.url | required | (none) | String | The URL of the Confluent Schema Registry to fetch/register schemas. |
目前 Apache Flink 都是从 table schema 去推断反序列化期间的 Avro reader schema 和序列化期间的 Avro writer schema。显式地定义 Avro schema 暂不支持。 中描述了 Flink 数据类型和 Avro 类型的对应关系。
除了此处列出的类型之外,Flink 还支持读取/写入可为空(nullable)的类型。 Flink 将可为空的类型映射到 Avro , 其中 something
是从 Flink 类型转换的 Avro 类型。