ROUTINE LOAD

    1. [merge_type]
    2. [load_properties]
    3. [job_properties]
    4. FROM data_source
    5. [data_source_properties]
    6. 1. [db.]job_name
    7. 导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。
    8. 2. tbl_name
    9. 指定需要导入的表的名称。
    10. 3. merge_type
    11. 数据的合并类型,一共支持三种类型APPENDDELETEMERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete on条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理, 语法为[WITH MERGE|APPEND|DELETE]
    12. 4. load_properties
    13. 用于描述导入数据。语法:
    14. [column_separator],
    15. [columns_mapping],
    16. [where_predicates],
    17. [delete_on_predicates],
    18. [source_sequence],
    19. [partitions],
    20. [preceding_predicates]
    21. 1. column_separator:
    22. 指定列分隔符,如:
    23. COLUMNS TERMINATED BY ","
    24. 默认为:\t
    25. 2. columns_mapping:
    26. 指定源数据中列的映射关系,以及定义衍生列的生成方式。
    27. 1. 映射列:
    28. 按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。
    29. 假设目的表有三列 k1, k2, v1。源数据有4列,其中第124列分别对应 k2, k1, v1。则书写如下:
    30. COLUMNS (k2, k1, xxx, v1)
    31. 其中 xxx 为不存在的一列,用于跳过源数据中的第三列。
    32. 2. 衍生列:
    33. col_name = expr 的形式表示的列,我们称为衍生列。即支持通过 expr 计算得出目的表中对应列的值。
    34. 衍生列通常排列在映射列之后,虽然这不是强制的规定,但是 Doris 总是先解析映射列,再解析衍生列。
    35. 接上一个示例,假设目的表还有第4 v2v2 k1 k2 的和产生。则可以书写如下:
    36. COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
    37. 3. where_predicates
    38. 用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。
    39. 例如我们只希望导入 k1 大于 100 并且 k2 等于 1000 的列,则书写如下:
    40. WHERE k1 > 100 and k2 = 1000
    41. 4. partitions
    42. 指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。
    43. 示例:
    44. PARTITION(p1, p2, p3)
    45. 5. delete_on_predicates
    46. 表示删除条件,仅在 merge type MERGE 时有意义,语法与where 相同
    47. 6. source_sequence:
    48. 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。
    49. 7. preceding_predicates
    50. PRECEDING FILTER predicate
    51. 用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。
    52. 5. job_properties
    53. 用于指定例行导入作业的通用参数。
    54. 语法:
    55. PROPERTIES (
    56. "key1" = "val1",
    57. )
    58. 目前我们支持以下参数:
    59. 1. desired_concurrent_number
    60. 期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于0。默认为3
    61. 这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。
    62. 例:
    63. "desired_concurrent_number" = "3"
    64. 2. max_batch_interval/max_batch_rows/max_batch_size
    65. 这三个参数分别表示:
    66. 1)每个子任务最大执行时间,单位是秒。范围为 5 60。默认为10
    67. 2)每个子任务最多读取的行数。必须大于等于200000。默认是200000
    68. 3)每个子任务最多读取的字节数。单位是字节,范围是 100MB 1GB。默认是 100MB
    69. 这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。
    70. 例:
    71. "max_batch_interval" = "20",
    72. "max_batch_rows" = "300000",
    73. "max_batch_size" = "209715200"
    74. 3. max_error_number
    75. 采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。
    76. 采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。
    77. where 条件过滤掉的行不算错误行。
    78. 4. strict_mode
    79. 是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为 "strict_mode" = "true"
    80. 5. timezone
    81. 指定导入作业所使用的时区。默认为使用 Session timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果。
    82. 6. format
    83. 指定导入数据格式,默认是csv,支持json格式。
    84. 7. jsonpaths
    85. jsonpaths: 导入json方式分为:简单模式和匹配模式。如果设置了jsonpath则为匹配模式导入,否则为简单模式导入,具体可参考示例。
    86. 8. strip_outer_array
    87. 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false
    88. 9. json_root
    89. json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""
    90. 10. send_batch_parallelism
    91. 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。
    92. 6. data_source
    93. 数据源的类型。当前支持:
    94. KAFKA
    95. 7. data_source_properties
    96. 指定数据源相关的信息。
    97. 语法:
    98. (
    99. "key1" = "val1",
    100. "key2" = "val2"
    101. )
    102. 1. KAFKA 数据源
    103. 1. kafka_broker_list
    104. Kafka broker 连接信息。格式为 ip:host。多个broker之间以逗号分隔。
    105. 示例:
    106. "kafka_broker_list" = "broker1:9092,broker2:9092"
    107. 2. kafka_topic
    108. 指定要订阅的 Kafka topic
    109. 示例:
    110. "kafka_topic" = "my_topic"
    111. 3. kafka_partitions/kafka_offsets
    112. 指定需要订阅的 kafka partition,以及对应的每个 partition 的起始 offset
    113. offset 可以指定从大于等于 0 的具体 offset,或者:
    114. 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
    115. 2) OFFSET_END: 从末尾开始订阅。
    116. 3) 时间戳,格式必须如:"2021-05-11 10:00:00",系统会自动定位到大于等于该时间戳的第一个消息的offset。注意,时间戳格式的offset不能和数字类型混用,只能选其一。
    117. 如果没有指定,则默认从 OFFSET_END 开始订阅 topic 下的所有 partition
    118. 示例:
    119. "kafka_partitions" = "0,1,2,3",
    120. "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
    121. "kafka_partitions" = "0,1",
    122. "kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00"
    123. 4. property
    124. 指定自定义kafka参数。
    125. 功能等同于kafka shell "--property" 参数。
    126. 当参数的 value 为一个文件时,需要在 value 前加上关键词:"FILE:"
    127. 关于如何创建文件,请参阅 "HELP CREATE FILE;"
    128. 更多支持的自定义参数,请参阅 librdkafka 的官方 CONFIGURATION 文档中,client 端的配置项。
    129. 示例:
    130. "property.client.id" = "12345",
    131. "property.ssl.ca.location" = "FILE:ca.pem"
    132. 1.使用 SSL 连接 Kafka 时,需要指定以下参数:
    133. "property.security.protocol" = "ssl",
    134. "property.ssl.ca.location" = "FILE:ca.pem",
    135. "property.ssl.certificate.location" = "FILE:client.pem",
    136. "property.ssl.key.location" = "FILE:client.key",
    137. "property.ssl.key.password" = "abcdefg"
    138. 其中:
    139. "property.security.protocol" "property.ssl.ca.location" 为必须,用于指明连接方式为 SSL,以及 CA 证书的位置。
    140. 如果 Kafka server 端开启了 client 认证,则还需设置:
    141. "property.ssl.certificate.location"
    142. "property.ssl.key.location"
    143. "property.ssl.key.password"
    144. 分别用于指定 client public keyprivate key 以及 private key 的密码。
    145. 2.指定kafka partition的默认起始offset
    146. 如果没有指定kafka_partitions/kafka_offsets,默认消费所有分区,此时可以指定kafka_default_offsets指定起始 offset。默认为 OFFSET_END,即从末尾开始订阅。
    147. 值为
    148. 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
    149. 2) OFFSET_END: 从末尾开始订阅。
    150. 3) 时间戳,格式同 kafka_offsets
    151. 示例:
    152. "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    153. "property.kafka_default_offsets" = "2021-05-11 10:00:00"
    154. 8. 导入数据格式样例
    155. 整型类(TINYINT/SMALLINT/INT/BIGINT/LARGEINT):1, 1000, 1234
    156. 浮点类(FLOAT/DOUBLE/DECIMAL):1.1, 0.23, .356
    157. 日期类(DATE/DATETIME):2017-10-03, 2017-06-13 12:34:03
    158. 字符串类(CHAR/VARCHAR)(无引号):I am a student, a
    159. NULL值:\N
    1. 6. 用户指定根节点json_root
    2. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    3. COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
    4. PROPERTIES
    5. (
    6. "desired_concurrent_number"="3",
    7. "max_batch_interval" = "20",
    8. "max_batch_rows" = "300000",
    9. "max_batch_size" = "209715200",
    10. "strict_mode" = "false",
    11. "format" = "json",
    12. "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
    13. "strip_outer_array" = "true",
    14. "json_root" = "$.RECORDS"
    15. )
    16. FROM KAFKA
    17. (
    18. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    19. "kafka_topic" = "my_topic",
    20. "kafka_partitions" = "0,1,2",
    21. "kafka_offsets" = "0,0,0"
    22. );