Broker Load

    Users need to create Broker load imports through MySQL protocol and check the import results by viewing the import commands.

    • Source data in Broker accessible storage systems, such as HDFS.
    • Data volumes range from tens to hundreds of GB.

    Noun Interpretation

    1. Frontend (FE): Metadata and scheduling nodes of Doris system. In the import process, it is mainly responsible for the generation of import plan and the scheduling of import tasks.
    2. Backend (BE): The computing and storage nodes of Doris system. In the import process, it is mainly responsible for ETL and storage of data.
    3. Broker: Broker is an independent stateless process. It encapsulates the file system interface and provides Doris with the ability to read files in the remote storage system.
    4. Plan: Import the execution plan, and BE executes the import execution plan to import data into Doris system.

    After the user submits the import task, FE generates the corresponding plan and distributes the plan to several BEs according to the number of BEs and the size of the file. Each BE performs part of the import data.

    BE pulls data from Broker and imports it into the system after transforming the data. All BEs complete the import, and the FE decides whether the import is successful or not.

    Basic operations

    Broker load create a data load job

    Grammar:

    1. (data_desc, ...)
    2. WITH BROKER broker_name broker_properties
    3. [PROPERTIES (key1=value1, ... )]
    4. * data_desc:
    5. DATA INFILE ('file_path', ...)
    6. [NEGATIVE]
    7. INTO TABLE tbl_name
    8. [PARTITION (p1, p2)]
    9. [COLUMNS TERMINATED BY separator ]
    10. [(col1, ...)]
    11. [PRECEDING FILTER predicate]
    12. [SET (k1=f1(xx), k2=f2(xx))]
    13. [WHERE predicate]
    14. * broker_properties:
    15. (key1=value1, ...)

    Examples:

    1. LOAD LABEL db1.label1
    2. (
    3. DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
    4. INTO TABLE tbl1
    5. COLUMNS TERMINATED BY ","
    6. SET
    7. (
    8. id=tmp_c2,
    9. name=tmp_c1)
    10. ),
    11. DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2")
    12. INTO TABLE tbl2
    13. COLUMNS TERMINATED BY ","
    14. (col1, col2)
    15. where col1 > 1
    16. )
    17. WITH BROKER 'broker'
    18. (
    19. "username"="user",
    20. "password"="pass"
    21. )
    22. PROPERTIES
    23. (
    24. "timeout" = "3600"

    Create the imported detailed grammar execution HELP BROKER LOAD View grammar help. This paper mainly introduces the parametric meaning and points for attention in Broker load’s creation import grammar.

    Label

    Identity of import task. Each import task has a unique Label within a single database. Label is a user-defined name in the import command. With this Label, users can view the execution of the corresponding import task.

    Another function of Label is to prevent users from repeatedly importing the same data. It is strongly recommended that users use the same label for the same batch of data. Thus, repeated requests for the same batch of data can only be accepted once, guaranteeing at-Most-One semantics

    When the corresponding import job status of Label is CANCELLED, it can be used again to submit the import job.

    Data Description Class Parameters

    Data description class parameters mainly refer to the parameters belonging to data_desc in Broker load creating import statements. Each group of data_desc mainly describes the data source address, ETL function, target table and partition information involved in this import.

    The following is a detailed explanation of some parameters of the data description class:

    • Multi-table import

      Broker load supports a single import task involving multiple tables, and each Broker load import task can implement multiple tables import by declaring multiple tables in multiple data_desc. Each individual data_desc can also specify the data source address belonging to the table. Broker load guarantees atomic success or failure between multiple tables imported at a single time.

    • negative

      data_desc can also set up data fetching and anti-importing. This function is mainly used when aggregated columns in data tables are of SUM type. If you want to revoke a batch of imported data. The negative parameter can be used as a batch of data. Doris automatically retrieves this batch of data on aggregated columns to eliminate the same batch of data.

    • partition

      In data_desc, you can specify the partition information of the table to be imported, but it will not be imported if the data to be imported does not belong to the specified partition. At the same time, data that does not specify a Partition is considered error data.

    • preceding filter predicate

      Used to filter original data. The original data is the data without column mapping and transformation. The user can filter the data before conversion, select the desired data, and then perform the conversion.

    • where predicate

      1. The where statement in ```data_desc``` is responsible for filtering the data that has been transformed. The unselected rows which is filtered by where predicate will not be calculated in ```max_filter_ratio``` . If there are more than one where predicate of the same table , the multi where predicate will be merged from different ```data_desc``` and the policy is AND.
    • merge_type The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND is the default value, which means that all this batch of data needs to be appended to the existing data. DELETE means to delete all rows with the same key as this batch of data. MERGE semantics Need to be used in conjunction with the delete condition, which means that the data that meets the delete condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics

    Import job parameters

    Import job parameters mainly refer to the parameters in Broker load creating import statement that belong to opt_properties. Import operation parameters act on the whole import operation.

    The following is a detailed explanation of some parameters of the import operation parameters:

    • time out

      The time-out of the import job (in seconds) allows the user to set the time-out of each import by himself in opt_properties. If the import task is not completed within the set timeout time, it will be cancelled by the system and become CANCELLED. The default import timeout for Broker load is 4 hours.

      Usually, the user does not need to manually set the timeout of the import task. When the import cannot be completed within the default timeout time, the task timeout can be set manually.

      Because the machine environment of each Doris cluster is different and the concurrent query tasks of the cluster are different, the slowest import speed of the user Doris cluster requires the user to guess the import task speed according to the history.

    • max_filter_ratio

      The maximum tolerance rate of the import task is 0 by default, and the range of values is 0-1. When the import error rate exceeds this value, the import fails.

      If the user wishes to ignore the wrong row, the import can be successful by setting this parameter greater than 0.

      The calculation formula is as follows:

      (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio

      dpp.abnorm.ALL denotes the number of rows whose data quality is not up to standard. Such as type mismatch, column mismatch, length mismatch and so on.

      dpp.norm.ALL refers to the number of correct data in the import process. The correct amount of data for the import task can be queried by the SHOW LOAD command.

    • exec_mem_limit

      Memory limit. Default is 2GB. Unit is Bytes.

    • strict_mode

      Broker load can use strict mode. Use properties ("strict_mode" = "true") to enable strict mode, default is false

      The strict mode means that the column type conversion in the import process is strictly filtered. The strategy of strict filtering is as follows:

      1. For column type conversion, if strict mode is true, the wrong data will be filtered. Error data here refers to the kind of data that the original data is not null and the result is null after participating in column type conversion.

      2. Strict mode does not affect the imported column when it is generated by a function transformation.

      3. For a column type imported that contains scope restrictions, strict mode does not affect it if the original data can normally pass type conversion, but cannot pass scope restrictions. For example, if the type is decimal (1,0) and the original data is 10, it falls within the scope of type conversion but not column declaration. This data strict has no effect on it.

    Import Relation between strict mode source data

    Here’s an example of a column type TinyInt

    Here’s an example of column type Decimal (1,0)

    source datasource data examplestring to intstrict_moderesult
    null\NN/Atrue or falseNULL
    not nullaaaNULLtrueinvalid data(filtered)
    not nullaaaNULLfalseNULL
    not null1 or 101true or falsecorrect data

    View load

    Broker load import mode is asynchronous, so the user must create the imported Label record and use Label in the view Import command to view the import result. View import commands are common in all import modes. The specific syntax can be HELP SHOW LOAD.

    Examples:

    1. mysql> show load order by createtime desc limit 1\G
    2. *************************** 1. row ***************************
    3. JobId: 76391
    4. Label: label1
    5. State: FINISHED
    6. Progress: ETL:N/A; LOAD:100%
    7. Type: BROKER
    8. EtlInfo: dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
    9. TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
    10. ErrorMsg: N/A
    11. CreateTime: 2019-07-27 11:46:42
    12. EtlStartTime: 2019-07-27 11:46:44
    13. EtlFinishTime: 2019-07-27 11:46:44
    14. LoadStartTime: 2019-07-27 11:46:44
    15. LoadFinishTime: 2019-07-27 11:50:16
    16. URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317

    The following is mainly about the significance of viewing the parameters in the return result set of the import command:

    • JobId

      The unique ID of the import task is different for each import task, which is automatically generated by the system. Unlike Label, JobId will never be the same, while Label can be reused after the import task fails.

    • Label

      Identity of import task.

    • State

      Import the current phase of the task. In the Broker load import process, PENDING and LOADING are the two main import states. If the Broker load is in the PENDING state, it indicates that the current import task is waiting to be executed; the LOADING state indicates that it is executing.

      There are two final stages of the import task: CANCELLED and FINISHED. When Load job is in these two stages, the import is completed. CANCELLED is the import failure, FINISHED is the import success.

    • Progress

      Import the progress description of the task. There are two kinds of progress: ETL and LOAD, which correspond to the two stages of the import process, ETL and LOADING. At present, Broker load only has the LOADING stage, so ETL will always be displayed as N/A.

      The progress range of LOAD is 0-100%.

      LOAD Progress = Number of tables currently completed / Number of tables designed for this import task * 100%

      If all import tables complete the import, then the progress of LOAD is 99% import enters the final effective stage, and the progress of LOAD will only be changed to 100% after the entire import is completed.

      Import progress is not linear. So if there is no change in progress over a period of time, it does not mean that the import is not being implemented.

    • Type

      Types of import tasks. The type value of Broker load is only BROKER.

    • Etlinfo

      It mainly shows the imported data quantity indicators unselected.rows, dpp.norm.ALL and dpp.abnorm.ALL. The first value shows the rows which has been filtered by where predicate. Users can verify that the error rate of the current import task exceeds max_filter_ratio based on these two indicators.

    • TaskInfo

      It mainly shows the current import task parameters, that is, the user-specified import task parameters when creating the Broker load import task, including cluster, , and max_filter_ratio.

    • ErrorMsg

      When the import task status is CANCELLED, the reason for the failure is displayed in two parts: type and msg. If the import task succeeds, the N/A is displayed.

      The value meaning of type:

    • CreateTime /EtlStartTime /EtlFinishTime /LoadStartTime /LoadFinishTime

      Broker load import has no ETL stage, so its EtlStartTime, EtlFinishTime, LoadStartTime are set to the same value.

      Import tasks stay in CreateTime for a long time, while LoadStartTime is N/A, which indicates that import tasks are heavily stacked at present. Users can reduce the frequency of import submissions.

      1. LoadFinishTime - CreateTime = Time consumed by the entire import task
      2. LoadFinishTime - LoadStartTime = The entire Broker load import task execution time = the time consumed by the entire import task - the time the import task waits
    • URL

      The error data sample of the import task can be obtained by accessing the URL address. When there is no error data in this import, the URL field is N/A.

    • JobDetails

      Display some details of the running status of the job. Including file number, total file size(Bytes), num of sub tasks, scanned rows, related backend ids and unfinished backend ids.

      1. {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}

      This info will be updated every 5 seconds. the ScannedRows only for displaying the job progress, not indicate the real numbers.

    When the Broker load job status is not CANCELLED or FINISHED, it can be manually cancelled by the user. When canceling, you need to specify a Label for the import task to be cancelled. Canceling Import command syntax can perform HELP CANCEL LOAD view.

    FE configuration

    The following configurations belong to the Broker load system-level configuration, which acts on all Broker load import tasks. Configuration values are adjusted mainly by modifying fe.conf.

    • min_bytes_per_broker_scanner/max_bytes_per_broker_scanner/max_broker_concurrency

      The first two configurations limit the minimum and maximum amount of data processed by a single BE. The third configuration limits the maximum number of concurrent imports for a job. The minimum amount of data processed, the maximum number of concurrency, the size of source files and the number of BEs in the current cluster together determine the concurrency of this import.

      1. The number of concurrent imports = Math. min (source file size / minimum throughput, maximum concurrency, current number of BE nodes)
      2. Processing capacity of this import of a single BE = source file size / concurrency of this import

      Usually the maximum amount of data supported by an import job is max_bytes_per_broker_scanner * number of BE nodes. If you need to import a larger amount of data, you need to adjust the size of the max_bytes_per_broker_scanner parameter appropriately.

    Default configuration:

    1. ```
    2. Parameter name: min_bytes_per_broker_scanner, default 64MB, unit bytes.
    3. Parameter name: max_broker_concurrency, default 10.
    4. Parameter name: max_bytes_per_broker_scanner, default 3G, unit bytes.
    5. ```

    Best Practices

    The most appropriate scenario to use Broker load is the scenario of raw data in a file system (HDFS, BOS, AFS). Secondly, since Broker load is the only way of asynchronous import in a single import, users can also consider using Broker load if they need to use asynchronous access in importing large files.

    Data volume

    We will only discuss the case of a single BE. If the user cluster has more than one BE, the amount of data in the heading below should be multiplied by the number of BEs. For example, if the user has three BEs, then the number below 3G (including) should be multiplied by 3, that is, under 9G (including).

    • Below 3G (including)

      Users can submit Broker load to create import requests directly.

    • Over 3G

      Since the maximum processing capacity of a single imported BE is 3G, the imported files over 3G need to be imported by adjusting the import parameters of Broker load to achieve the import of large files.

      1. Modify the maximum number of scans and concurrency of a single BE according to the current number of BEs and the size of the original file.

        After modification, all BEs process import tasks concurrently, and each BE processes part of the original file.

        Note: The configurations in both FEs are system configurations, that is to say, their modifications work on all Broker load tasks.

      2. Customize the timeout time of the current import task when creating the import

        1. Current import task single BE processing data volume / user Doris cluster slowest import speed (MB/s) >= current import task timeout time >= current import task single BE processing data volume / 10M/s
        2. For example, a 100G file with 10 BEs in the cluster
        3. Timeout > 1000s = 10G / 10M /s
      3. When the user finds that the timeout time calculated in the second step exceeds the default maximum time-out time for importing the system by 4 hours.

        At this time, it is not recommended that users directly increase the maximum time-out to solve the problem. If the single import time exceeds the default maximum import timeout of 4 hours, it is better to solve the problem by splitting the file to be imported and importing it several times. The main reason is that if a single import exceeds 4 hours, the time cost of retry after import failure is very high.

        The maximum amount of imported file data expected by the Doris cluster can be calculated by the following formula:

        1. Expected maximum imported file data = 14400s * 10M / s * BE number
        2. For example, the BE number of clusters is 10.
        3. Expected maximum imported file data volume = 14400 * 10M / s * 10 = 1440000M 1440G
        4. Note: The average user's environment may not reach the speed of 10M/s, so it is recommended that more than 500G files be split and imported.

    You can execute set enable_profile=true to open the load job profile before submitting the import job. After the import job is completed, you can view the profile of the import job in the Queris tab of the FE web page.

    This profile can help analyze the running status of the import job.

    Currently, the profile can only be viewed after the job is successfully executed.

    Complete examples

    Data situation: User data in HDFS, file address is hdfs://abc.com:8888/store_sales, HDFS authentication user name is root, password is password, data size is about 30G, hope to import into database bj_sales table store_sales.

    Cluster situation: The number of BEs in the cluster is about 3, and the Broker name is broker.

    • Step 1: After the calculation of the above method, the single BE import quantity is 10G, then the configuration of FE needs to be modified first, and the maximum amount of single BE import is changed to:

      1. max_bytes_per_broker_scanner = 10737418240
    • Step 2: Calculated, the import time is about 1000s, which does not exceed the default timeout time. No custom timeout time for import can be configured.

    • Step 3: Create import statements

      1. LOAD LABEL bj_sales.store_sales_broker_load_01
      2. (
      3. DATA INFILE("hdfs://abc.com:8888/store_sales")
      4. INTO TABLE store_sales
      5. )
      6. WITH BROKER 'broker'
      7. ("username"="root", "password"="password");
    • failed with: Scan bytes per broker scanner exceed limit:xxx

      Refer to the Best Practices section of the document to modify the FE configuration items max_bytes_per_broker_scanner and max_broker_concurrency'.

    • failed with: failed to send batch or TabletWriter add batch with unknown id

      Refer to General System Configuration in BE Configuration in the Import Manual (./load-manual.md), and modify query_timeout and streaming_load_rpc_max_alive_time_sec appropriately.

    • failed with: LOAD_RUN_FAIL; msg: Invalid Column Name: xxx If it is PARQUET or ORC format data, you need to keep the column names in the file header consistent with the column names in the doris table, such as: ` (tmp_c1, tmp_c2) SET ( id = tmp_c2, name = tmp_c1 ) ` Represents getting the column with (tmp_c1, tmp_c2) as the column name in parquet or orc, which is mapped to the (id, name) column in the doris table. If set is not set, the column names in the column are used as the mapping relationship.