Confluent Avro Format

    The Avro Schema Registry () format allows you to read records that were serialized by the io.confluent.kafka.serializers.KafkaAvroSerializer and to write records that can in turn be read by the .

    When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured Confluent Schema Registry based on the schema version id encoded in the record while the reader schema is inferred from table schema.

    When writing (serializing) a record with this format the Avro schema is inferred from the table schema and used to retrieve a schema id to be encoded with the data. The lookup is performed with in the configured Confluent Schema Registry under the subject given in avro-confluent.subject.

    In order to use the Avro Schema Registry format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

    For Maven, SBT, Gradle, or other build automation tools, please also ensure that Confluent’s maven repository at is configured in your project’s build files.

    Example of a table using raw UTF-8 string as Kafka key and Avro records registered in the Schema Registry as Kafka values:


    Example of a table with both the Kafka key and value registered as Avro records in the Schema Registry:


    Example of a table using the upsert-kafka connector with the Kafka value registered as an Avro record in the Schema Registry:

    OptionRequiredForwardedDefaultTypeDescription
    format
    requiredno(none)StringSpecify what format to use, here should be ‘avro-confluent’.
    avro-confluent.basic-auth.credentials-source
    optionalyes(none)StringBasic auth credentials source for Schema Registry
    avro-confluent.basic-auth.user-info
    optionalyes(none)StringBasic auth user info for schema registry
    avro-confluent.bearer-auth.credentials-source
    optionalyes(none)StringBearer auth credentials source for Schema Registry
    avro-confluent.bearer-auth.token
    optionalyes(none)StringBearer auth token for Schema Registry
    avro-confluent.properties
    optionalyes(none)MapProperties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.
    avro-confluent.ssl.keystore.location
    optionalyes(none)StringLocation / File of SSL keystore
    avro-confluent.ssl.keystore.password
    optionalyes(none)StringPassword for SSL keystore
    avro-confluent.ssl.truststore.location
    optionalyes(none)StringLocation / File of SSL truststore
    avro-confluent.ssl.truststore.password
    optionalyes(none)StringPassword for SSL truststore
    avro-confluent.subject
    optionalyes(none)StringThe Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, ‘kafka’ and ‘upsert-kafka’ connectors use ‘<topic_name>-value’ or ‘<topic_name>-key’ as the default subject name if this format is used as the value or key format. But for other connectors (e.g. ‘filesystem’), the subject option is required when used as sink.
    avro-confluent.url
    requiredyes(none)StringThe URL of the Confluent Schema Registry to fetch/register schemas.

    Currently, Apache Flink always uses the table schema to derive the Avro reader schema during deserialization and Avro writer schema during serialization. Explicitly defining an Avro schema is not supported yet. See the for the mapping between Avro and Flink DataTypes.

    You can refer to Avro Specification for more information about Avro types.