The Routine Load function allows users to submit a resident import task, and import data into Doris by continuously reading data from a specified data source.

    Currently, only data in CSV or Json format can be imported from Kakfa through unauthenticated or SSL authentication.

    grammar:

    • The name of the import job. Within the same database, only one job with the same name can be running.

    • tbl_name

      Specifies the name of the table to be imported.

    • merge_type

      Data merge type. The default is APPEND, which means that the imported data are ordinary append write operations. The MERGE and DELETE types are only available for Unique Key model tables. The MERGE type needs to be used with the [DELETE ON] statement to mark the Delete Flag column. The DELETE type means that all imported data are deleted data.

    • load_properties

      Used to describe imported data. The composition is as follows:

      1. [column_separator],
      2. [columns_mapping],
      3. [preceding_filter],
      4. [where_predicates],
      5. [partitions],
      6. [DELETE ON],
      7. [ORDER BY]
      • column_separator

        Specifies the column separator, defaults to \t

        COLUMNS TERMINATED BY ","

      • columns_mapping

        It is used to specify the mapping relationship between file columns and columns in the table, as well as various column transformations. For a detailed introduction to this part, you can refer to the [Column Mapping, Transformation and Filtering] document.

        (k1, k2, tmpk1, k3 = tmpk1 + 1)

      • preceding_filter

        Filter raw data. For a detailed introduction to this part, you can refer to the [Column Mapping, Transformation and Filtering] document.

      • where_predicates

        Filter imported data based on conditions. For a detailed introduction to this part, you can refer to the [Column Mapping, Transformation and Filtering] document.

        WHERE k1 > 100 and k2 = 1000

      • partitions

        Specify in which partitions of the import destination table. If not specified, it will be automatically imported into the corresponding partition.

        PARTITION(p1, p2, p3)

      • DELETE ON

        It needs to be used with the MEREGE import mode, only for the table of the Unique Key model. Used to specify the columns and calculated relationships in the imported data that represent the Delete Flag.

        DELETE ON v3 >100

      • ORDER BY

        Tables only for the Unique Key model. Used to specify the column in the imported data that represents the Sequence Col. Mainly used to ensure data order when importing.

    • job_properties

      Common parameters for specifying routine import jobs.

      1. PROPERTIES (
      2. "key1" = "val1",
      3. "key2" = "val2"
      4. )
      1. desired_concurrent_number

        Desired concurrency. A routine import job will be divided into multiple subtasks for execution. This parameter specifies the maximum number of tasks a job can execute concurrently. Must be greater than 0. Default is 3.

        This degree of concurrency is not the actual degree of concurrency. The actual degree of concurrency will be comprehensively considered by the number of nodes in the cluster, the load situation, and the situation of the data source.

        "desired_concurrent_number" = "3"

      2. max_batch_interval/max_batch_rows/max_batch_size

        These three parameters represent:

        1. The maximum execution time of each subtask, in seconds. The range is 5 to 60. Default is 10.
        2. The maximum number of lines read by each subtask. Must be greater than or equal to 200000. The default is 200000.
        3. The maximum number of bytes read by each subtask. The unit is bytes and the range is 100MB to 1GB. The default is 100MB.

        These three parameters are used to control the execution time and processing volume of a subtask. When either one reaches the threshold, the task ends.

        1. "max_batch_interval" = "20",
        2. "max_batch_rows" = "300000",
        3. "max_batch_size" = "209715200"
      3. max_error_number

        The maximum number of error lines allowed within the sampling window. Must be greater than or equal to 0. The default is 0, which means no error lines are allowed.

        The sampling window is max_batch_rows * 10. That is, if the number of error lines is greater than max_error_number within the sampling window, the routine operation will be suspended, requiring manual intervention to check data quality problems.

        Rows that are filtered out by where conditions are not considered error rows.

      4. strict_mode

        Whether to enable strict mode, the default is off. If enabled, the column type conversion of non-null raw data will be filtered if the result is NULL. Specify as:

        "strict_mode" = "true"

        The strict mode mode means strict filtering of column type conversions during the load process. The strict filtering strategy is as follows:

        1. For column type conversion, if strict mode is true, the wrong data will be filtered. The error data here refers to the fact that the original data is not null, and the result is a null value after participating in the column type conversion.
        2. When a loaded column is generated by a function transformation, strict mode has no effect on it.
        3. For a column type loaded with a range limit, if the original data can pass the type conversion normally, but cannot pass the range limit, strict mode will not affect it. For example, if the type is decimal(1,0) and the original data is 10, it is eligible for type conversion but not for column declarations. This data strict has no effect on it.

        strict mode and load relationship of source data

        Here is an example of a column type of TinyInt.

        Here the column type is Decimal(1,0)

      5. timezone

        Specifies the time zone used by the import job. The default is to use the Session’s timezone parameter. This parameter affects the results of all time zone-related functions involved in the import.

      6. format

        Specify the import data format, the default is csv, and the json format is supported.

      7. jsonpaths

        When the imported data format is json, the fields in the Json data can be extracted by specifying jsonpaths.

        -H "jsonpaths: [\"$.k2\", \"$.k1\"]"

      8. strip_outer_array

        When the imported data format is json, strip_outer_array is true, indicating that the Json data is displayed in the form of an array, and each element in the data will be regarded as a row of data. The default value is false.

        -H "strip_outer_array: true"

      9. json_root

        -H "json_root: $.RECORDS"

    • FROM data_source [data_source_properties]

      The type of data source. Currently supports:

      1. FROM KAFKA
      2. (
      3. "key1" = "val1",
      4. "key2" = "val2"
      5. )

      data_source_properties supports the following data source properties:

      1. kafka_broker_list

        Kafka’s broker connection information. The format is ip:host. Separate multiple brokers with commas.

        "kafka_broker_list" = "broker1:9092,broker2:9092"

      2. kafka_topic

        Specifies the Kafka topic to subscribe to.

        "kafka_topic" = "my_topic"

      3. kafka_partitions/kafka_offsets

        Specify the kafka partition to be subscribed to, and the corresponding starting offset of each partition. If a time is specified, consumption will start at the nearest offset greater than or equal to the time.

        offset can specify a specific offset from 0 or greater, or:

        • OFFSET_BEGINNING: Start subscription from where there is data.
        • OFFSET_END: subscribe from the end.
        • Time format, such as: “2021-05-22 11:00:00”

        If not specified, all partitions under topic will be subscribed from OFFSET_END by default.

        1. "kafka_partitions" = "0,1,2,3",
        2. "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"

        Note that the time format cannot be mixed with the OFFSET format.

      4. property

        Specify custom kafka parameters. The function is equivalent to the “—property” parameter in the kafka shell.

        When the value of the parameter is a file, you need to add the keyword: “FILE:” before the value.

        For how to create a file, please refer to the [CREATE FILE](http://palo.baidu.com/docs/SQL Manual/Syntax Help/DML/ROUTINE-LOAD/#Syntax error or this link does not work-) command documentation.

        For more supported custom parameters, please refer to the configuration items on the client side in the official CONFIGURATION document of librdkafka. Such as:

        1. "property.client.id" = "12345",
        2. "property.ssl.ca.location" = "FILE:ca.pem"
        1. When connecting to Kafka using SSL, you need to specify the following parameters:

          1. "property.security.protocol" = "ssl",
          2. "property.ssl.ca.location" = "FILE:ca.pem",
          3. "property.ssl.key.password" = "abcdefg"

          in:

          property.security.protocol and property.ssl.ca.location are required to indicate the connection method is SSL and the location of the CA certificate.

          If client authentication is enabled on the Kafka server side, thenAlso set:

          1. "property.ssl.certificate.location"
          2. "property.ssl.key.location"
          3. "property.ssl.key.password"

          They are used to specify the client’s public key, private key, and password for the private key, respectively.

        2. Specify the default starting offset of the kafka partition

          If kafka_partitions/kafka_offsets is not specified, all partitions are consumed by default.

          At this point, you can specify kafka_default_offsets to specify the starting offset. Defaults to OFFSET_END, i.e. subscribes from the end.

          Example:

          1. "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    1. Create a Kafka routine import task named test1 for example_tbl of example_db. Specify the column separator and group.id and client.id, and automatically consume all partitions by default, and start subscribing from the location where there is data (OFFSET_BEGINNING)
    1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2. COLUMNS TERMINATED BY ",",
    3. COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
    4. PROPERTIES
    5. (
    6. "desired_concurrent_number"="3",
    7. "max_batch_interval" = "20",
    8. "max_batch_rows" = "300000",
    9. "max_batch_size" = "209715200",
    10. "strict_mode" = "false"
    11. )
    12. FROM KAFKA
    13. (
    14. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    15. "kafka_topic" = "my_topic",
    16. "property.group.id" = "xxx",
    17. "property.client.id" = "xxx",
    18. "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    19. );
    1. Create a Kafka routine import task named test1 for example_tbl of example_db. Import tasks are in strict mode.
    1. Import data from the Kafka cluster through SSL authentication. Also set the client.id parameter. The import task is in non-strict mode and the time zone is Africa/Abidjan
    1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2. COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
    3. WHERE k1 > 100 and k2 like "%doris%"
    4. PROPERTIES
    5. (
    6. "desired_concurrent_number"="3",
    7. "max_batch_interval" = "20",
    8. "max_batch_rows" = "300000",
    9. "max_batch_size" = "209715200",
    10. "strict_mode" = "false",
    11. "timezone" = "Africa/Abidjan"
    12. )
    13. FROM KAFKA
    14. (
    15. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    16. "kafka_topic" = "my_topic",
    17. "property.security.protocol" = "ssl",
    18. "property.ssl.ca.location" = "FILE:ca.pem",
    19. "property.ssl.certificate.location" = "FILE:client.pem",
    20. "property.ssl.key.location" = "FILE:client.key",
    21. "property.ssl.key.password" = "abcdefg",
    22. "property.client.id" = "my_client_id"
    23. );
    1. Import data in Json format. By default, the field name in Json is used as the column name mapping. Specify to import three partitions 0, 1, and 2, and the starting offsets are all 0
    1. CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
    2. COLUMNS(category,price,author)
    3. PROPERTIES
    4. (
    5. "desired_concurrent_number"="3",
    6. "max_batch_interval" = "20",
    7. "max_batch_rows" = "300000",
    8. "max_batch_size" = "209715200",
    9. "strict_mode" = "false",
    10. "format" = "json"
    11. )
    12. FROM KAFKA
    13. (
    14. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    15. "kafka_topic" = "my_topic",
    16. "kafka_offsets" = "0,0,0"
    17. );
    1. Import Json data, extract fields through Jsonpaths, and specify the root node of the Json document
    1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2. COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
    3. PROPERTIES
    4. (
    5. "desired_concurrent_number"="3",
    6. "max_batch_interval" = "20",
    7. "max_batch_rows" = "300000",
    8. "max_batch_size" = "209715200",
    9. "strict_mode" = "false",
    10. "format" = "json",
    11. "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
    12. "json_root" = "$.RECORDS"
    13. "strip_outer_array" = "true"
    14. )
    15. FROM KAFKA
    16. (
    17. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    18. "kafka_topic" = "my_topic",
    19. "kafka_partitions" = "0,1,2",
    20. "kafka_offsets" = "0,0,0"
    21. );
    1. Create a Kafka routine import task named test1 for example_tbl of example_db. And use conditional filtering.
    1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2. WITH MERGE
    3. COLUMNS(k1, k2, k3, v1, v2, v3),
    4. WHERE k1 > 100 and k2 like "%doris%",
    5. DELETE ON v3 >100
    6. PROPERTIES
    7. (
    8. "desired_concurrent_number"="3",
    9. "max_batch_interval" = "20",
    10. "max_batch_rows" = "300000",
    11. "max_batch_size" = "209715200",
    12. "strict_mode" = "false"
    13. )
    14. FROM KAFKA
    15. (
    16. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    17. "kafka_topic" = "my_topic",
    18. "kafka_partitions" = "0,1,2,3",
    19. "kafka_offsets" = "101,0,0,200"
    20. );
    1. Import data to Unique with sequence column Key model table
    1. CREATE ROUTINE LOAD example_db.test_job ON example_tbl
    2. COLUMNS TERMINATED BY ",",
    3. COLUMNS(k1,k2,source_sequence,v1,v2),
    4. ORDER BY source_sequence
    5. PROPERTIES
    6. (
    7. "desired_concurrent_number"="3",
    8. "max_batch_interval" = "30",
    9. "max_batch_rows" = "300000",
    10. "max_batch_size" = "209715200"
    11. ) FROM KAFKA
    12. (
    13. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    14. "kafka_topic" = "my_topic",
    15. "kafka_partitions" = "0,1,2,3",
    16. "kafka_offsets" = "101,0,0,200"
    17. );
    1. Consume from a specified point in time
    1. CREATE, ROUTINE, LOAD, CREATE LOAD

    Partition and Offset for specified consumption

    Doris supports the specified Partition and Offset to start consumption, and also supports the function of consumption at a specified time point. The configuration relationship of the corresponding parameters is described here.

    There are three relevant parameters:

    • kafka_partitions: Specify a list of partitions to be consumed, such as “0, 1, 2, 3”.
    • kafka_offsets: Specify the starting offset of each partition, which must correspond to the number of kafka_partitions list. For example: “1000, 1000, 2000, 2000”