ROUTINE LOAD

    Syntax:

    1. [db.]job_name

      The name of the load job, in the same database, only one job can run with the same name.

    2. tbl_name

      Specifies the name of the table that needs to be loaded.

    3. merge_type

      The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND is the default value, which means that all this batch of data needs to be appended to the existing data. DELETE means to delete all rows with the same key as this batch of data. MERGE semantics Need to be used in conjunction with the delete condition, which means that the data that meets the delete on condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics

    4. load_properties

      Used to describe the load data. grammar:

      1. [columns_mapping],
      2. [where_predicates],
      3. [delete_on_predicates]
      4. [partitions],
      5. [preceding_predicates]
      1. column_separator:

        Specify column separators, such as:

        COLUMNS TERMINATED BY ","

        The default is: \t

      2. columns_mapping:

        Specifies the mapping of columns in the source data and defines how the derived columns are generated.

        1. Map column:

          Specify in order, which columns in the source data correspond to which columns in the destination table. For columns that you want to skip, you can specify a column name that does not exist.

          Suppose the destination table has three columns k1, k2, v1. The source data has 4 columns, of which columns 1, 2, and 4 correspond to k2, k1, and v1, respectively. Write as follows:

          COLUMNS (k2, k1, xxx, v1)

          Where xxx is a column that does not exist and is used to skip the third column in the source data.

        2. Derived columns:

          A column represented in the form of col_name = expr, which we call a derived column. That is, the value of the corresponding column in the destination table is calculated by expr.

          Derived columns are usually arranged after the mapped column. Although this is not mandatory, Doris always parses the mapped columns first and then parses the derived columns.

          Following an example, assume that the destination table also has column 4, v2, which is generated by the sum of k1 and k2. You can write as follows:

          COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);

      3. where_predicates

        Used to specify filter criteria to filter out unwanted columns. Filter columns can be either mapped columns or derived columns.

        For example, if we only want to load a column with k1 greater than 100 and k2 equal to 1000, we would write as follows:

        WHERE k1 > 100 and k2 = 1000

      4. partitions

        Specifies which partitions of the load destination table. If not specified, it will be automatically loaded into the corresponding partition.

        Example:

        PARTITION(p1, p2, p3)

      5. delete_on_predicates:

        Only used when merge type is MERGE

      6. preceding_predicates

        Used to filter original data. The original data is the data without column mapping and transformation. The user can filter the data before conversion, select the desired data, and then perform the conversion.

    5. job_properties

      A generic parameter that specifies a routine load job.

      syntax:

      1. PROPERTIES (
      2. "key1" = "val1",
      3. "key2" = "val2"
      4. )

      Currently we support the following parameters:

      1. desired_concurrent_number

        The degree of concurrency desired. A routine load job is split into multiple subtasks. This parameter specifies how many tasks can be executed simultaneously in a job. Must be greater than 0. The default is 3.

        example:

        "desired_concurrent_number" = "3"

      2. max_batch_interval/max_batch_rows/max_batch_size

        These three parameters represent:

        1. The maximum execution time of each subtask, in seconds. The range is 5 to 60. The default is 10.

        2. The maximum number of rows read per subtask. Must be greater than or equal to 200,000. The default is 200000.

        3. The maximum number of bytes read per subtask. The unit is byte and the range is 100MB to 1GB. The default is 100MB.

        These three parameters are used to control the execution time and throughput of a subtask. When either one reaches the threshold, the task ends.

        example:

        1. "max_batch_interval" = "20",
        2. "max_batch_rows" = "300000",
        3. "max_batch_size" = "209715200"
      3. max_error_number

        The maximum number of error lines allowed in the sampling window. Must be greater than or equal to 0. The default is 0, which means that no error lines are allowed.

        The sampling window is max_batch_rows * 10. That is, if the number of error lines is greater than max_error_number in the sampling window, the routine job will be suspended, and manual intervention is required to check the data quality problem.

        Lines that are filtered by the where condition are not counted as error lines.

      4. strict_mode

        Whether to enable strict mode, the default is disabled. If turned on, the column type transformation of non-null raw data is filtered if the result is NULL. Specified as “strict_mode” = “true”

      5. timezone

        Specifies the time zone in which the job will be loaded. The default by using session variable’s timezone. This parameter affects all function results related to the time zone involved in the load.

      6. format

        Specifies the format of the imported data. Support csv and json, the default is csv.

      7. jsonpaths

        There are two ways to import json: simple mode and matched mode. If jsonpath is set, it will be the matched mode import, otherwise it will be the simple mode import, please refer to the example for details.

      8. strip_outer_array Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false.

      9. json_root json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is “”.

      10. send_batch_parallelism Integer, Used to set the default parallelism for sending batch, if the value for parallelism exceed max_send_batch_parallelism_per_job in BE config, then the coordinator BE will use the value of max_send_batch_parallelism_per_job.

    6. data_source

      The type of data source. Current support:

      KAFKA

    7. data_source_properties

      Specify information about the data source.

      syntax:

      1. (
      2. "key1" = "val1",
      3. "key2" = "val2"
      4. )
      1. KAFKA data source

        Kafka_broker_list

        Kafka’s broker connection information. The format is ip:host. Multiple brokers are separated by commas.

        Example:

        "kafka_broker_list" = "broker1:9092,broker2:9092"

      2. kafka_topic

        Specify the topic of Kafka to subscribe to.

        Example:

        "kafka_topic" = "my_topic"

      3. kafka_partitions/kafka_offsets

        Specify the kafka partition to be subscribed to, and the corresponding star offset for each partition.

        Offset can specify a specific offset from 0 or greater, or:

        1. OFFSET_BEGINNING: Subscribe from the location where the data is available.

        2. Timestamp, the format must be like: “2021-05-11 10:00:00”, the system will automatically locate the offset of the first message greater than or equal to the timestamp. Note that the offset of the timestamp format cannot be mixed with the number type, only one of them can be selected.

        If not specified, all partitions under topic are subscribed by default fromSET_END.

        Example:

      4. property

        Specify custom kafka parameters.

        The function is equivalent to the “—property” parameter in the kafka shel

        When the value of the parameter is a file, you need to add the keyword: “FILE” before the value.

        For information on how to create a file, see “HELP CREATE FILE;”

        For more supported custom parameters, see the configuration items on the nt side in the official CONFIGURATION documentation for librdkafka.

        Example:

        1. "property.client.id" = "12345",
        2. "property.ssl.ca.location" = "FILE:ca.pem"
        1. When connecting to Kafka using SSL, you need to specify the following parameters:

          1. "property.ssl.ca.location" = "FILE:ca.pem",
          2. "property.ssl.certificate.location" = "FILE:client.pem",
          3. "property.ssl.key.location" = "FILE:client.key",
          4. "property.ssl.key.password" = "abcdefg"

          among them:

          “property.security.protocol” and “property.ssl.ca.location” are required to indicate the connection method is SSL and the location of the CA certificate.

          If the client authentication is enabled on the Kafka server, you also need to set:

          1. "property.ssl.certificate.location"
          2. "property.ssl.key.password"

          Used to specify the public key of the client, the private key, and the word of the private key.

        2. Specify the default starting offset for kafka partition

          If kafka_partitions/kafka_offsets is not specified, all partitions are unanmed by default, and you can specify kafka_default_offsets to specify the star offset. The default is OFFSET_END, which starts at the end of the subscription.

          Values:

          1. OFFSET_BEGINNING: Subscribe from the location where the data is available.

          2. OFFSET_END: Subscribe from the end.

          3. Timestamp, the format is the same as kafka_offsets

          Example:

          "property.kafka_default_offsets" = "OFFSET_BEGINNING" "property.kafka_default_offsets" = "2021-05-11 10:00:00"

    8. load data format sample

      Integer class (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234

      Floating point class (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356

      Date class (DATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03.

      String class (CHAR/VARCHAR) (without quotes): I am a student, a

      NULL value: \N

    1. Create a Kafka routine load task named test1 for the example_tbl of example_db. Specify group.id and client.id, and automatically consume all partitions by default, with subscriptions starting at the end (OFFSET_END)

      1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
      2. COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      3. PROPERTIES
      4. (
      5. "desired_concurrent_number"="3",
      6. "max_batch_interval" = "20",
      7. "max_batch_rows" = "300000",
      8. "max_batch_size" = "209715200",
      9. "strict_mode" = "false"
      10. )
      11. FROM KAFKA
      12. (
      13. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
      14. "kafka_topic" = "my_topic",
      15. "property.group.id" = "xxx",
      16. "property.client.id" = "xxx"
      17. );
    2. Create a Kafka routine load task named test1 for the example_tbl of example_db. The load task is in strict mode.

    3. load data from Kafka clusters via SSL authentication. Also set the client.id parameter. The load task is in non-strict mode and the time zone is Africa/Abidjan

      1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
      2. COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
      3. WHERE k1 > 100 and k2 like "%doris%"
      4. PROPERTIES
      5. (
      6. "desired_concurrent_number"="3",
      7. "max_batch_interval" = "20",
      8. "max_batch_rows" = "300000",
      9. "max_batch_size" = "209715200",
      10. "strict_mode" = "false",
      11. "timezone" = "Africa/Abidjan"
      12. )
      13. FROM KAFKA
      14. (
      15. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
      16. "kafka_topic" = "my_topic",
      17. "property.security.protocol" = "ssl",
      18. "property.ssl.ca.location" = "FILE:ca.pem",
      19. "property.ssl.certificate.location" = "FILE:client.pem",
      20. "property.ssl.key.location" = "FILE:client.key",
      21. "property.ssl.key.password" = "abcdefg",
      22. "property.client.id" = "my_client_id"
      23. );
    4. Create a Kafka routine load task named test1 for the example_tbl of example_db. The load data is a simple json.

      1. CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
      2. COLUMNS(category,price,author)
      3. (
      4. "desired_concurrent_number"="3",
      5. "max_batch_interval" = "20",
      6. "max_batch_size" = "209715200",
      7. "strict_mode" = "false",
      8. "format" = "json"
      9. )
      10. FROM KAFKA
      11. (
      12. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
      13. "kafka_topic" = "my_topic",
      14. "kafka_partitions" = "0,1,2",
      15. "kafka_offsets" = "0,0,0"
      16. );

      It support two kinds data style: 1){“category”:”a9jadhx”,”author”:”test”,”price”:895} 2)[ {“category”:”a9jadhx”,”author”:”test”,”price”:895}, {“category”:”axdfa1”,”author”:”EvelynWaugh”,”price”:1299} ]

    5. Matched load json by jsonpaths.

      1. CREATE TABLE `example_tbl` (
      2. `category` varchar(24) NULL COMMENT "",
      3. `author` varchar(24) NULL COMMENT "",
      4. `timestamp` bigint(20) NULL COMMENT "",
      5. `dt` int(11) NULL COMMENT "",
      6. `price` double REPLACE
      7. ) ENGINE=OLAP
      8. AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
      9. COMMENT "OLAP"
      10. PARTITION BY RANGE(`dt`)
      11. (PARTITION p0 VALUES [("-2147483648"), ("20200509")),
      12. PARTITION p20200509 VALUES [("20200509"), ("20200510")),
      13. PARTITION p20200510 VALUES [("20200510"), ("20200511")),
      14. PARTITION p20200511 VALUES [("20200511"), ("20200512")))
      15. DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
      16. PROPERTIES (
      17. "replication_num" = "1"
      18. );
      19. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
      20. COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
      21. PROPERTIES
      22. (
      23. "desired_concurrent_number"="3",
      24. "max_batch_interval" = "20",
      25. "max_batch_rows" = "300000",
      26. "max_batch_size" = "209715200",
      27. "strict_mode" = "false",
      28. "format" = "json",
      29. "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
      30. "strip_outer_array" = "true"
      31. )
      32. FROM KAFKA
      33. (
      34. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
      35. "kafka_topic" = "my_topic",
      36. "kafka_partitions" = "0,1,2",
      37. "kafka_offsets" = "0,0,0"

      For example json data: [ {“category”:”11”,”title”:”SayingsoftheCentury”,”price”:895,”timestamp”:1589191587}, {“category”:”22”,”author”:”2avc”,”price”:895,”timestamp”:1589191487}, {“category”:”33”,”author”:”3avc”,”title”:”SayingsoftheCentury”,”timestamp”:1589191387} ]

    Tips: 1)If the json data starts as an array and each object in the array is a record, you need to set the strip_outer_array to true to represent the flat array. 2)If the json data starts with an array, and each object in the array is a record, our ROOT node is actually an object in the array when we set jsonpath.

    1. User specifies the json_root node CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, ‘%Y%m%d’)) PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “20”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200”, “strict_mode” = “false”, “format” = “json”, “jsonpaths” = “[“$.category”,”$.author”,”$.price”,”$.timestamp”]“, “strip_outer_array” = “true”, “json_root” = “$.RECORDS” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “kafka_partitions” = “0,1,2”, “kafka_offsets” = “0,0,0” ); For example json data: { “RECORDS”:[ {“category”:”11”,”title”:”SayingsoftheCentury”,”price”:895,”timestamp”:1589191587}, {“category”:”22”,”author”:”2avc”,”price”:895,”timestamp”:1589191487}, {“category”:”33”,”author”:”3avc”,”title”:”SayingsoftheCentury”,”timestamp”:1589191387} ] }

      1. Create a Kafka routine load task named test1 for the example_tbl of example_db. delete all data key columns match v3 >100 key columns.

        CREATE ROUTINE LOAD example_db.test1 ON example_tbl WITH MERGE COLUMNS(k1, k2, k3, v1, v2, v3), WHERE k1 > 100 and k2 like “%doris%”, DELETE ON v3 >100 PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “20”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200”, “strict_mode” = “false” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “kafka_partitions” = “0,1,2,3”, “kafka_offsets” = “101,0,0,200” );

      2. Filter original data

        CREATE ROUTINE LOAD example_db.test_job ON example_tbl COLUMNS TERMINATED BY “,”, COLUMNS(k1,k2,source_sequence,v1,v2), PRECEDING FILTER k1 > 2 PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “30”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “kafka_partitions” = “0,1,2,3”, “kafka_offsets” = “101,0,0,200” );

    1. CREATE, ROUTINE, LOAD