Apache Kafka SQL Connector

    The Kafka connector allows for reading data from and writing data into Kafka topics.

    In order to use the Kafka connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

    The Kafka connector is not part of the binary distribution. See how to link with it for cluster execution here.

    How to create a Kafka table

    The example below shows how to create a Kafka table:

    1. `user_id` BIGINT,
    2. `item_id` BIGINT,
    3. `behavior` STRING,
    4. `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
    5. ) WITH (
    6. 'connector' = 'kafka',
    7. 'topic' = 'user_behavior',
    8. 'properties.bootstrap.servers' = 'localhost:9092',
    9. 'properties.group.id' = 'testGroup',
    10. 'scan.startup.mode' = 'earliest-offset',
    11. )

    The following connector metadata can be accessed as metadata columns in a table definition.

    The R/W column defines whether a metadata field is readable (R) and/or writable (W). Read-only columns must be declared VIRTUAL to exclude them during an INSERT INTO operation.

    The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

    1. CREATE TABLE KafkaTable (
    2. `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
    3. `partition` BIGINT METADATA VIRTUAL,
    4. `offset` BIGINT METADATA VIRTUAL,
    5. `user_id` BIGINT,
    6. `item_id` BIGINT,
    7. `behavior` STRING
    8. ) WITH (
    9. 'topic' = 'user_behavior',
    10. 'properties.bootstrap.servers' = 'localhost:9092',
    11. 'properties.group.id' = 'testGroup',
    12. 'scan.startup.mode' = 'earliest-offset',
    13. 'format' = 'csv'
    14. );

    Format Metadata

    The connector is able to expose metadata of the value format for reading. Format metadata keys are prefixed with 'value.'.

    The following example shows how to access both Kafka and Debezium metadata fields:

    Connector Options

    Both the key and value part of a Kafka record can be serialized to and deserialized from raw bytes using one of the given .

    Value Format

    Since a key is optional in Kafka records, the following statement reads and writes records with a configured value format but without a key format. The 'format' option is a synonym for 'value.format'. All format options are prefixed with the format identifier.

    1. CREATE TABLE KafkaTable (
    2. `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
    3. `user_id` BIGINT,
    4. `item_id` BIGINT,
    5. ) WITH (
    6. 'connector' = 'kafka',
    7. 'format' = 'json',
    8. 'json.ignore-parse-errors' = 'true'
    9. )
    1. ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>

    Key and Value Format

    The following example shows how to specify and configure key and value formats. The format options are prefixed with either the 'key' or 'value' plus format identifier.

    The key format includes the fields listed in 'key.fields' (using ';' as the delimiter) in the same order. Thus, it will be configured with the following data type:

    1. ROW<`user_id` BIGINT, `item_id` BIGINT>

    Since the value format is configured with 'value.fields-include' = 'ALL', key fields will also end up in the value format’s data type:

    1. ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>

    Overlapping Format Fields

    The connector cannot split the table’s columns into key and value fields based on schema information if both key and value formats contain fields of the same name. The 'key.fields-prefix' option allows to give key columns a unique name in the table schema while keeping the original names when configuring the key format.

    The following example shows a key and value format that both contain a version field:

    The value format must be configured in 'EXCEPT_KEY' mode. The formats will be configured with the following data types:

    1. key format:
    2. ROW<`version` INT, `user_id` BIGINT, `item_id` BIGINT>
    3. value format:
    4. ROW<`version` INT, `behavior` STRING>

    Topic and Partition Discovery

    The config option topic and topic-pattern specifies the topics or topic pattern to consume for source. The config option topic can accept topic list using semicolon separator like ‘topic-1;topic-2’. The config option topic-pattern will use regular expression to discover the matched topic. For example, if the topic-pattern is test-topic-[0-9], then all topics with names that match the specified regular expression (starting with test-topic- and ending with a single digit)) will be subscribed by the consumer when the job starts running.

    To allow the consumer to discover dynamically created topics after the job started running, set a non-negative value for scan.topic-partition-discovery.interval. This allows the consumer to discover partitions of new topics with names that also match the specified pattern.

    Please refer to for more about topic and partition discovery.

    Note that topic list and topic pattern only work in sources. In sinks, Flink currently only supports a single topic.

    The config option scan.startup.mode specifies the startup mode for Kafka consumer. The valid enumerations are:

    • `group-offsets`: start from committed offsets in ZK / Kafka brokers of a specific consumer group.
    • `earliest-offset`: start from the earliest offset possible.
    • `latest-offset`: start from the latest offset.
    • `timestamp`: start from user-supplied timestamp for each partition.
    • `specific-offsets`: start from user-supplied specific offsets for each partition.

    The default option value is group-offsets which indicates to consume from last committed offsets in ZK / Kafka brokers.

    If specific-offsets is specified, another config option scan.startup.specific-offsets is required to specify specific startup offsets for each partition, e.g. an option value partition:0,offset:42;partition:1,offset:300 indicates offset 42 for partition 0 and offset 300 for partition 1.

    CDC Changelog Source

    Flink natively supports Kafka as a CDC changelog source. If messages in a Kafka topic are change event captured from other databases using a CDC tool, you can use the corresponding Flink CDC format to interpret the messages as INSERT/UPDATE/DELETE statements into a Flink SQL table.

    The changelog source is a very useful feature in many cases, such as synchronizing incremental data from databases to other systems, auditing logs, materialized views on databases, temporal join changing history of a database table and so on.

    Flink provides several CDC formats:

    The config option sink.partitioner specifies output partitioning from Flink’s partitions into Kafka’s partitions. By default, Flink uses the Kafka default partitioner to partition records. It uses the for records with null keys and uses a murmur2 hash to compute the partition for a record with the key defined.

    In order to control the routing of rows into partitions, a custom sink partitioner can be provided. The ‘fixed’ partitioner will write the records in the same Flink partition into the same Kafka partition, which could reduce the cost of the network connections.

    Consistency guarantees

    By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with .

    With Flink’s checkpointing enabled, the kafka connector can provide exactly-once delivery guarantees.

    Besides enabling Flink’s checkpointing, you can also choose three different modes of operating chosen by passing appropriate sink.delivery-guarantee option:

    • none: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
    • exactly-once: Kafka transactions will be used to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desired isolation.level (read_committed or - the latter one is the default value) for any application consuming records from Kafka.

    Please refer to Kafka documentation for more caveats about delivery guarantees.

    Flink supports to emit per-partition watermarks for Kafka. Watermarks are generated inside the Kafka consumer. The per-partition watermarks are merged in the same way as watermarks are merged during streaming shuffles. The output watermark of the source is determined by the minimum watermark among the partitions it reads. If some partitions in the topics are idle, the watermark generator will not advance. You can alleviate this problem by setting the option in the table configuration.

    Please refer to Kafka watermark strategies for more details.

    Data Type Mapping

    Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.