This command is mainly used to import data on remote storage (such as S3, HDFS) through the Broker service process.
-
Each import needs to specify a unique Label. You can use this label to view the progress of the job later.
[database.]label_name
data_desc1
Used to describe a set of files that need to be imported.
[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[MERGE|APPEND|DELETE]
Data merge type. The default is APPEND, indicating that this import is a normal append write operation. The MERGE and DELETE types are only available for Unique Key model tables. The MERGE type needs to be used with the
[DELETE ON]
statement to mark the Delete Flag column. The DELETE type indicates that all data imported this time are deleted data.DATA INFILE
Specify the file path to be imported. Can be multiple. Wildcards can be used. The path must eventually match to a file, if it only matches a directory the import will fail.
NEGTIVE
This keyword is used to indicate that this import is a batch of “negative” imports. This method is only for aggregate data tables with integer SUM aggregate type. This method will reverse the integer value corresponding to the SUM aggregate column in the imported data. Mainly used to offset previously imported wrong data.
PARTITION(p1, p2, ...)
You can specify to import only certain partitions of the table. Data that is no longer in the partition range will be ignored.
COLUMNS TERMINATED BY
Specifies the column separator. Only valid in CSV format. Only single-byte delimiters can be specified.
FORMAT AS
Specifies the file type, CSV, PARQUET and ORC formats are supported. Default is CSV.
column list
Used to specify the column order in the original file. For a detailed introduction to this part, please refer to the Column Mapping, Conversion and Filtering document.
(k1, k2, tmpk1)
COLUMNS FROM PATH AS
Specifies the columns to extract from the import file path.
PRECEDING FILTER predicate
Pre-filter conditions. The data is first concatenated into raw data rows in order according to
column list
andCOLUMNS FROM PATH AS
. Then filter according to the pre-filter conditions. For a detailed introduction to this part, please refer to the document.SET (column_mapping)
Specifies the conversion function for the column.
WHERE predicate
Filter imported data based on conditions. For a detailed introduction to this part, please refer to the Column Mapping, Conversion and Filtering document.
-
It needs to be used with the MEREGE import mode, only for the table of the Unique Key model. Used to specify the columns and calculated relationships in the imported data that represent the Delete Flag.
ORDER BY
Tables only for the Unique Key model. Used to specify the column in the imported data that represents the Sequence Col. Mainly used to ensure data order when importing.
WITH BROKER broker_name
Specify the Broker service name to be used. In the public cloud Doris. Broker service name is
bos
broker_properties
Specifies the information required by the broker. This information is usually used by the broker to be able to access remote storage systems. Such as BOS or HDFS. See the documentation for specific information.
(
"key1" = "val1",
"key2" = "val2",
...
)
load_properties
Specifies import-related parameters. The following parameters are currently supported:
timeout
Import timeout. The default is 4 hours. in seconds.
max_filter_ratio
The maximum tolerable proportion of data that can be filtered (for reasons such as data irregularity). Zero tolerance by default. The value range is 0 to 1.
exec_mem_limit
Import memory limit. Default is 2GB. The unit is bytes.
strict_mode
Whether to impose strict restrictions on data. Defaults to false.
timezone
Specify the time zone for some functions that are affected by time zones, such as
strftime/alignment_timestamp/from_unixtime
, etc. Please refer to the timezone documentation for details. If not specified, the “Asia/Shanghai” timezone is usedload_parallelism
It allows the user to set the parallelism of the load execution plan on a single node when the broker load is submitted, default value is 1.
Import a batch of data from HDFS
LOAD LABEL example_db.label1
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file.txt")
INTO TABLE `my_table`
COLUMNS TERMINATED BY ","
)
WITH BROKER hdfs
"password"="hdfs_password"
);
Import the file
file.txt
, separated by commas, into the tablemy_table
.Import data from HDFS, using wildcards to match two batches of files in two batches. into two tables separately.
LOAD LABEL example_db.label2
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*")
INTO TABLE `my_table1`
PARTITION (p1)
COLUMNS TERMINATED BY ","
(k1, tmp_k2, tmp_k3)
SET (
k2 = tmp_k2 + 1,
k3 = tmp_k3 + 1
)
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*")
INTO TABLE `my_table2`
COLUMNS TERMINATED BY ","
(k1, k2, k3)
)
WITH BROKER hdfs
(
"username"="hdfs_user",
"password"="hdfs_password"
);
Import two batches of files
file-10*
andfile-20*
using wildcard matching. Imported into two tablesmy_table1
andmy_table2
respectively. Wheremy_table1
specifies to import into partitionp1
, and will import the values of the second and third columns in the source file +1.Import a batch of data from HDFS.
Specify the delimiter as Hive’s default delimiter
\\x01
, and use the wildcard * to specify all files in all directories under thedata
directory. Use simple authentication while configuring namenode HA.Import data in Parquet format and specify FORMAT as parquet. The default is to judge by the file suffix
LOAD LABEL example_db.label4
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file")
INTO TABLE `my_table`
FORMAT AS "parquet"
(k1, k2, k3)
)
WITH BROKER hdfs
(
"username"="hdfs_user",
"password"="hdfs_password"
);
Import the data and extract the partition field in the file path
LOAD LABEL example_db.label10
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*")
INTO TABLE `my_table`
FORMAT AS "csv"
(k1, k2, k3)
COLUMNS FROM PATH AS (city, utc_date)
)
WITH BROKER hdfs
(
"username"="hdfs_user",
"password"="hdfs_password"
);
The directory includes the following files:
hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv
hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv
hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv
hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv
The file only contains three columns of
k1, k2, k3
, and the two columns ofcity, utc_date
will be extracted from the file path.Filter the data to be imported.
LOAD LABEL example_db.label6
DATA INFILE("hdfs://host:port/input/file")
INTO TABLE `my_table`
(k1, k2, k3)
PRECEDING FILTER k1 = 1
SET (
k2 = k2 + 1
)
WHERE k1 > k2
)
WITH BROKER hdfs
(
"username"="user",
"password"="pass"
);
Only in the original data, k1 = 1, and after transformation, rows with k1 > k2 will be imported.
Import data, extract the time partition field in the file path, and the time contains %3A (in the hdfs path, ‘:’ is not allowed, all ‘:’ will be replaced by %3A)
There are the following files in the path:
/user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
/user/data/data_time=2020-02-18 00%3A00%3A00/test.txt
The table structure is:
data_time DATETIME,
k2 INT,
k3 INT
Import a batch of data from HDFS, specify the timeout and filter ratio. Broker with clear text my_hdfs_broker. Simple authentication. And delete the columns in the original data that match the columns with v2 greater than 100 in the imported data, and other columns are imported normally
LOAD LABEL example_db.label8
(
MERGE DATA INFILE("HDFS://test:802/input/file")
INTO TABLE `my_table`
(k1, k2, k3, v2, v1)
DELETE ON v2 > 100
)
WITH HDFS
(
"hadoop.username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1"
);
Import using the MERGE method.
my_table
must be a table with Unique Key. When the value of the v2 column in the imported data is greater than 100, the row is considered a delete row.The import task timeout is 3600 seconds, and the error rate is allowed to be within 10%.
Specify the source_sequence column when importing to ensure the replacement order in the UNIQUE_KEYS table:
LOAD LABEL example_db.label9
(
DATA INFILE("HDFS://test:802/input/file")
INTO TABLE `my_table`
COLUMNS TERMINATED BY ","
(k1,k2,source_sequence,v1,v2)
ORDER BY source_sequence
)
WITH HDFS
(
"hadoop.username"="user",
"password"="pass"
)
my_table
must be an Unqiue Key model table with Sequence Col specified. The data will be ordered according to the value of thesource_sequence
column in the source data.
Check the import task status
Broker Load is an asynchronous import process. The successful execution of the statement only means that the import task is submitted successfully, and does not mean that the data import is successful. The import status needs to be viewed through the SHOW LOAD command.
Cancel the import task
Import tasks that have been submitted but not yet completed can be canceled by the command. After cancellation, the written data will also be rolled back and will not take effect.
Label, import transaction, multi-table atomicity
All import tasks in Doris are atomic. And the import of multiple tables in the same import task can also guarantee atomicity. At the same time, Doris can also use the Label mechanism to ensure that the data imported is not lost or heavy. For details, see the Import Transactions and Atomicity documentation.
Column mapping, derived columns and filtering
Doris can support very rich column transformation and filtering operations in import statements. Most built-in functions and UDFs are supported. For how to use this function correctly, please refer to the document.
Error data filtering
Doris’ import tasks can tolerate a portion of malformed data. Tolerated via setting. The default is 0, which means that the entire import task will fail when there is an error data. If the user wants to ignore some problematic data rows, the secondary parameter can be set to a value between 0 and 1, and Doris will automatically skip the rows with incorrect data format.
For some calculation methods of the tolerance rate, please refer to the Column Mapping, Conversion and Filtering document.
Strict Mode
The
strict_mode
attribute is used to set whether the import task runs in strict mode. The format affects the results of column mapping, transformation, and filtering. For a detailed description of strict mode, see the documentation.Timeout
The default timeout for Broker Load is 4 hours. from the time the task is submitted. If it does not complete within the timeout period, the task fails.