Broker Load

    因为 Doris 表里的数据是有序的,所以 Broker load 在导入数据的时是要利用doris 集群资源对数据进行排序,相对于 Spark load 来完成海量历史数据迁移,对 Doris 的集群资源占用要比较大,这种方式是在用户没有 Spark 这种计算资源的情况下使用,如果有 Spark 计算资源建议使用 Spark load

    用户需要通过 MySQL 协议 创建 导入,并通过查看导入命令检查导入结果。

    • 源数据在 Broker 可以访问的存储系统中,如 HDFS。
    • 数据量在 几十到百GB 级别。

    基本原理

    用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。

    BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。

    开始导入

    下面我们通过几个实际的场景示例来看 的使用

    1. 创建 Hive 表
    1. CREATE TABLE `ods_demo_detail`(
    2. `id` string,
    3. `store_id` string,
    4. `company_id` string,
    5. `tower_id` string,
    6. `commodity_id` string,
    7. `commodity_name` string,
    8. `commodity_price` double,
    9. `member_price` double,
    10. `cost_price` double,
    11. `unit` string,
    12. `quantity` double,
    13. `actual_price` double
    14. )
    15. PARTITIONED BY (day string)
    16. row format delimited fields terminated by ','
    17. lines terminated by '\n'

    然后使用 Hive 的 Load 命令将你的数据导入到 Hive 表中

    1. load data local inpath '/opt/custorm' into table ods_demo_detail;
    1. 创建 Doris 表,具体建表语法参照:
    1. CREATE TABLE `doris_ods_test_detail` (
    2. `rq` date NULL,
    3. `id` varchar(32) NOT NULL,
    4. `store_id` varchar(32) NULL,
    5. `company_id` varchar(32) NULL,
    6. `tower_id` varchar(32) NULL,
    7. `commodity_id` varchar(32) NULL,
    8. `commodity_name` varchar(500) NULL,
    9. `commodity_price` decimal(10, 2) NULL,
    10. `member_price` decimal(10, 2) NULL,
    11. `cost_price` decimal(10, 2) NULL,
    12. `unit` varchar(50) NULL,
    13. `quantity` int(11) NULL,
    14. `actual_price` decimal(10, 2) NULL
    15. ) ENGINE=OLAP
    16. UNIQUE KEY(`rq`, `id`, `store_id`)
    17. PARTITION BY RANGE(`rq`)
    18. (
    19. PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
    20. PROPERTIES (
    21. "replication_allocation" = "tag.location.default: 3",
    22. "dynamic_partition.enable" = "true",
    23. "dynamic_partition.time_unit" = "MONTH",
    24. "dynamic_partition.start" = "-2147483648",
    25. "dynamic_partition.end" = "2",
    26. "dynamic_partition.prefix" = "P_",
    27. "dynamic_partition.buckets" = "1",
    28. "in_memory" = "false",
    29. "storage_format" = "V2"
    30. );
    1. 开始导入数据

      具体语法参照: Broker Load

    1. LOAD LABEL broker_load_2022_03_23
    2. (
    3. DATA INFILE("hdfs://192.168.20.123:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*")
    4. INTO TABLE doris_ods_test_detail
    5. COLUMNS TERMINATED BY ","
    6. COLUMNS FROM PATH AS (`day`)
    7. SET
    8. (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
    9. )
    10. WITH BROKER "broker_name_1"
    11. (
    12. "username" = "hdfs",
    13. "password" = ""
    14. )
    15. PROPERTIES
    16. (
    17. "timeout"="1200",
    18. "max_filter_ratio"="0.1"
    19. );

    Hive 分区表导入(ORC格式)

    1. 创建Hive分区表,ORC格式
    1. 创建Doris表,这里的建表语句和上面的Doris建表语句一样,请参考上面的.

    2. 使用 Broker Load 导入数据

      1. LOAD LABEL dish_2022_03_23
      2. (
      3. DATA INFILE("hdfs://10.220.147.151:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")
      4. INTO TABLE doris_ods_test_detail
      5. COLUMNS TERMINATED BY ","
      6. FORMAT AS "orc"
      7. (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
      8. COLUMNS FROM PATH AS (`day`)
      9. SET
      10. (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
      11. )
      12. WITH BROKER "broker_name_1"
      13. (
      14. "username" = "hdfs",
      15. "password" = ""
      16. )
      17. PROPERTIES
      18. (
      19. "timeout"="1200",
      20. "max_filter_ratio"="0.1"
      21. );

      注意:

      • FORMAT AS "orc" : 这里我们指定了要导入的数据格式
      • SET : 这里我们定义了 Hive 表和 Doris 表之间的字段映射关系及字段转换的一些操作

    HDFS文件系统数据导入

    我们继续以上面创建好的 Doris 表为例,演示通过 Broker Load 从 HDFS 上导入数据。

    导入作业的语句如下:

    1. LOAD LABEL demo.label_20220402
    2. (
    3. INTO TABLE `ods_dish_detail_test`
    4. COLUMNS TERMINATED BY "\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
    5. )
    6. with HDFS (
    7. "fs.defaultFS"="hdfs://10.220.147.151:8020",
    8. "hadoop.username"="root"
    9. )
    10. PROPERTIES
    11. (
    12. "timeout"="1200",
    13. "max_filter_ratio"="0.1"
    14. );

    这里的具体 参数可以参照: 及 Broker Load 文档

    我们可以通过下面的命令查看上面导入任务的状态信息,

    具体的查看导入状态的语法参考 SHOW LOAD

    1. mysql> show load order by createtime desc limit 1\G;
    2. *************************** 1. row ***************************
    3. Label: broker_load_2022_03_23
    4. State: FINISHED
    5. Progress: ETL:100%; LOAD:100%
    6. Type: BROKER
    7. EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
    8. TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
    9. ErrorMsg: NULL
    10. CreateTime: 2022-04-01 18:59:06
    11. EtlStartTime: 2022-04-01 18:59:11
    12. EtlFinishTime: 2022-04-01 18:59:11
    13. LoadStartTime: 2022-04-01 18:59:11
    14. LoadFinishTime: 2022-04-01 18:59:11
    15. URL: NULL
    16. JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}
    17. 1 row in set (0.01 sec)

    取消导入

    当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 CANCEL LOAD 查看。

    例如:撤销数据库 demo 上, label 为 broker_load_2022_03_23 的导入作业

    1. CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

    相关系统配置

    FE 配置

    下面几个配置属于 Broker load 的系统级别配置,也就是作用于所有 Broker load 导入任务的配置。主要通过修改 fe.conf来调整配置值。

    • min_bytes_per_broker_scanner/max_bytes_per_broker_scanner/max_broker_concurrency

      前两个配置限制了单个 BE 处理的数据量的最小和最大值。第三个配置限制了一个作业的最大的导入并发数。最小处理的数据量,最大并发数,源文件的大小和当前集群 BE 的个数 共同决定了本次导入的并发数

      通常一个导入作业支持的最大数据量为 max_bytes_per_broker_scanner * BE 节点数。如果需要导入更大数据量,则需要适当调整 max_bytes_per_broker_scanner 参数的大小。

      默认配置:

      1. 参数名:min_bytes_per_broker_scanner 默认 64MB,单位bytes
      2. 参数名:max_broker_concurrency 默认 10
      3. 参数名:max_bytes_per_broker_scanner,默认 3G,单位bytes

    应用场景

    使用 Broker load 最适合的场景就是原始数据在文件系统(HDFS,BOS,AFS)中的场景。其次,由于 Broker load 是单次导入中唯一的一种异步导入的方式,所以如果用户在导入大文件中,需要使用异步接入,也可以考虑使用 Broker load。

    这里仅讨论单个 BE 的情况,如果用户集群有多个 BE 则下面标题中的数据量应该乘以 BE 个数来计算。比如:如果用户有3个 BE,则 3G 以下(包含)则应该乘以 3,也就是 9G 以下(包含)。

    • 3G 以下(包含)

      用户可以直接提交 Broker load 创建导入请求。

    • 3G 以上

      由于单个导入 BE 最大的处理量为 3G,超过 3G 的待导入文件就需要通过调整 Broker load 的导入参数来实现大文件的导入。

      1. 根据当前 BE 的个数和原始文件的大小修改单个 BE 的最大扫描量和最大并发数。

        1. 修改 fe.conf 中配置
        2. max_broker_concurrency = BE 个数
        3. 当前导入任务单个 BE 处理的数据量 = 原始文件大小 / max_broker_concurrency
        4. max_bytes_per_broker_scanner >= 当前导入任务单个 BE 处理的数据量
        5. 比如一个 100G 的文件,集群的 BE 个数为 10
        6. max_broker_concurrency = 10
        7. max_bytes_per_broker_scanner >= 10G = 100G / 10

        修改后,所有的 BE 会并发的处理导入任务,每个 BE 处理原始文件的一部分。

        注意:上述两个 FE 中的配置均为系统配置,也就是说其修改是作用于所有的 Broker load的任务的。

      2. 在创建导入的时候自定义当前导入任务的 timeout 时间

        1. 当前导入任务单个 BE 处理的数据量 / 用户 Doris 集群最慢导入速度(MB/s) >= 当前导入任务的 timeout 时间 >= 当前导入任务单个 BE 处理的数据量 / 10M/s
        2. 比如一个 100G 的文件,集群的 BE 个数为 10
        3. timeout >= 1000s = 10G / 10M/s
      3. 当用户发现第二步计算出的 timeout 时间超过系统默认的导入最大超时时间 4小时

        这时候不推荐用户将导入最大超时时间直接改大来解决问题。单个导入时间如果超过默认的导入最大超时时间4小时,最好是通过切分待导入文件并且分多次导入来解决问题。主要原因是:单次导入超过4小时的话,导入失败后重试的时间成本很高。

        可以通过如下公式计算出 Doris 集群期望最大导入文件数据量:

        1. 期望最大导入文件数据量 = 14400s * 10M/s * BE 个数
        2. 比如:集群的 BE 个数为 10
        3. 期望最大导入文件数据量 = 14400s * 10M/s * 10 = 1440000M 1440G
        4. 注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。

    作业调度

    首先, FE 的配置参数:desired_max_waiting_jobs 会限制一个集群内,未开始或正在运行(作业状态为 PENDING 或 LOADING)的 Broker Load 作业数量。默认为 100。如果超过这个阈值,新提交的作业将会被直接拒绝。

    一个 Broker Load 作业会被分为 pending task 和 loading task 阶段。其中 pending task 负责获取导入文件的信息,而 loading task 会发送给BE执行具体的导入任务。

    FE 的配置参数 async_pending_load_task_pool_size 用于限制同时运行的 pending task 的任务数量。也相当于控制了实际正在运行的导入任务数量。该参数默认为 10。也就是说,假设用户提交了100个Load作业,同时只会有10个作业会进入 LOADING 状态开始执行,而其他作业处于 PENDING 等待状态。

    FE 的配置参数 async_loading_load_task_pool_size 用于限制同时运行的 loading task 的任务数量。一个 Broker Load 作业会有 1 个 pending task 和多个 loading task (等于 LOAD 语句中 DATA INFILE 子句的个数)。所以 async_loading_load_task_pool_size 应该大于等于 async_pending_load_task_pool_size

    性能分析

    可以在提交 LOAD 作业前,先执行 set enable_profile=true 打开会话变量。然后提交导入作业。待导入作业完成后,可以在 FE 的 web 页面的 Queris 标签中查看到导入作业的 Profile。

    可以查看 帮助文档,获取更多使用帮助信息。

    这个 Profile 可以帮助分析导入作业的运行状态。

    当前只有作业成功执行后,才能查看 Profile

    常见问题

    • 导入报错:Scan bytes per broker scanner exceed limit:xxx

      请参照文档中最佳实践部分,修改 FE 配置项 max_bytes_per_broker_scannermax_broker_concurrency

    • 导入报错:failed to send batchTabletWriter add batch with unknown id

      适当修改 query_timeoutstreaming_load_rpc_max_alive_time_sec

      streaming_load_rpc_max_alive_time_sec:

      在导入过程中,Doris 会为每一个 Tablet 开启一个 Writer,用于接收数据并写入。这个参数指定了 Writer 的等待超时时间。如果在这个时间内,Writer 没有收到任何数据,则 Writer 会被自动销毁。当系统处理速度较慢时,Writer 可能长时间接收不到下一批数据,导致导入报错:TabletWriter add batch with unknown id。此时可适当增大这个配置。默认为 600 秒

    • 导入报错:

      如果是PARQUET或者ORC格式的数据,需要再文件头的列名与doris表中的列名一致,如 :

      代表获取在 parquet 或 orc 中以(tmp_c1, tmp_c2)为列名的列,映射到 doris 表中的(id, name)列。如果没有设置set, 则以column中的列作为映射。

      注:如果使用某些 hive 版本直接生成的 orc 文件,orc 文件中的表头并非 hive meta 数据,而是(_col0, _col1, _col2, …), 可能导致 Invalid Column Name 错误,那么则需要使用 set 进行映射

    更多帮助