Doris On ES

  1. Multi-index Distributed Join Query in ES
  2. Joint Query of Tables in Doris and ES, More Complex Full-Text Retrieval and Filtering

This document mainly introduces the realization principle and usage of this function.

  • FE: Frontend, the front-end node of Doris. Responsible for metadata management and request access.
  • BE: Backend, Doris’s back-end node. Responsible for query execution and data storage.

Noun in ES

  • DataNode: The data storage and computing node of ES.
  • MasterNode: The Master node of ES, which manages metadata, nodes, data distribution, etc.
  • scroll: The built-in data set cursor feature of ES for streaming scanning and filtering of data.
  • _source: contains the original JSON document body that was passed at index time
  • doc_values: store the same values as the _source but in a column-oriented fashion
  • keyword: string datatype in ES, but the content not analyzed by analyzer
  • text: string datatype in ES, the content analyzed by analyzer

Create ES Index

Add JSON documents to ES index

  1. POST /_bulk
  2. {"index":{"_index":"test","_type":"doc"}}
  3. { "k1" : 100, "k2": "2020-01-01", "k3": "Trying out Elasticsearch", "k4": "Trying out Elasticsearch", "k5": 10.0}
  4. {"index":{"_index":"test","_type":"doc"}}
  5. { "k1" : 100, "k2": "2020-01-01", "k3": "Trying out Doris", "k4": "Trying out Doris", "k5": 10.0}
  6. {"index":{"_index":"test","_type":"doc"}}
  7. { "k1" : 100, "k2": "2020-01-01", "k3": "Doris On ES", "k4": "Doris On ES", "k5": 10.0}
  8. {"index":{"_index":"test","_type":"doc"}}
  9. { "k1" : 100, "k2": "2020-01-01", "k3": "Doris", "k4": "Doris", "k5": 10.0}
  10. {"index":{"_index":"test","_type":"doc"}}
  11. { "k1" : 100, "k2": "2020-01-01", "k3": "ES", "k4": "ES", "k5": 10.0}
  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH // ENGINE must be Elasticsearch
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test",
  11. "type" = "doc",
  12. "user" = "root",
  13. "password" = "root"
  14. );

The following parameters are accepted by ES table:

  • For clusters before 7.x, please pay attention to choosing the correct type when building the table
  • The authentication method only supports Http Basic authentication, need to ensure that this user has access to: /_cluster/state/, _nodes/http and other paths and index read permissions;The cluster has not turned on security authentication, and the user name and password do not need to be set
  • The column names in the Doris table need to exactly match the field names in the ES, and the field types should be as consistent as possible
  • ENGINE must be: Elasticsearch
Filter to push down

An important ability of Doris On ES is the push-down of filter conditions: The filtering conditions are pushed to ES, so that only the data that really meets the conditions will be returned, which can significantly improve query performance and reduce CPU, memory, and IO utilization of Doris and ES

The following operators (Operators) will be optimized to the following ES Query:

SQL syntaxES 5.x+ syntax
=term query
interms query
> , < , >= , ⇐range query
andbool.filter
orbool.should
notbool.must_not
not inbool.must_not + terms query
is_not_nullexists query
is_nullbool.must_not + exists query
esqueryQueryDSL in ES native json form
Data type mapping

Enable column scan to optimize query speed(enable_docvalue_scan=true)

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test",
  11. "type" = "doc",
  12. "user" = "root",
  13. "password" = "root",
  14. "enable_docvalue_scan" = "true"
  15. );

Parameter Description:

ParameterDescription
enable_docvalue_scanwhether to enable ES/Lucene column storage to get the value of the query field, the default is false

Doris obtains data from ES following the following two principles:

  • Best effort: Automatically detect whether the column to be read has column storage enabled (doc_value: true).If all the fields obtained have column storage, Doris will obtain the values ​​of all fields from the column storage(doc_values)
  • Automatic downgrade: If the field to be obtained has one or more field that is not have doc_value, the values ​​of all fields will be parsed from the line store _source
Advantage:

By default, Doris On ES will get all the required columns from the row storage, which is _source, and the storage of _source is the origin json format document, Inferior to column storage in batch read performance, Especially obvious when only a few columns are needed, When only a few columns are obtained, the performance of docvalue is about ten times that of _source

Tip
  1. Fields of type text are not column-stored in ES, so if the value of the field to be obtained has a field of type text, it will be automatically downgraded to get from _source
  2. In the case of too many fields obtained (>= 25), the performance of getting field values ​​from docvalue will be basically the same as getting field values ​​from _source

Detect keyword type field(enable_keyword_sniff=true)

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test",
  11. "type" = "doc",
  12. "user" = "root",
  13. "password" = "root",
  14. "enable_keyword_sniff" = "true"
  15. );

Parameter Description:

You can directly import data without creating an index. At this time, ES will automatically create a new index in ES, For a field of type string, a field of type text and field of type keyword will be created meantime, This is the multi-fields feature of ES, mapping is as follows:

  1. "k4": {
  2. "type": "text",
  3. "fields": {
  4. "keyword": {
  5. "type": "keyword",
  6. "ignore_above": 256
  7. }
  8. }
  9. }

When performing conditional filtering on k4, for example =, Doris On ES will convert the query to ES’s TermQuery

SQL filter:

  1. k4 = "Doris On ES"

The query DSL converted into ES is:

Because the first field type of k4 is text, when data is imported, it will perform word segmentation processing according to the word segmentator set by k4 (if it is not set, it is the standard word segmenter) to get three Term of doris, on, and es, as follows ES analyze API analysis:

  1. POST /_analyze
  2. {
  3. "analyzer": "standard",
  4. "text": "Doris On ES"
  5. }
  1. {
  2. "tokens": [
  3. {
  4. "token": "doris",
  5. "start_offset": 0,
  6. "end_offset": 5,
  7. "position": 0
  8. },
  9. {
  10. "token": "on",
  11. "start_offset": 6,
  12. "end_offset": 8,
  13. "type": "<ALPHANUM>",
  14. "position": 1
  15. },
  16. {
  17. "token": "es",
  18. "start_offset": 9,
  19. "end_offset": 11,
  20. "type": "<ALPHANUM>",
  21. "position": 2
  22. }
  23. ]
  24. }

The query uses:

  1. "term" : {
  2. "k4": "Doris On ES"
  3. }

This term does not match any term in the dictionary, and will not return any results, enable enable_keyword_sniff: true will automatically convert k4 = "Doris On ES" into k4.keyword = "Doris On ES"to exactly match SQL semantics, The converted ES query DSL is:

  1. "term" : {
  2. "k4.keyword": "Doris On ES"
  3. }

The type of k4.keyword is keyword, and writing data into ES is a complete term, so it can be matched

Enable node discovery mechanism, default is true(es_nodes_discovery=true)

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test",
  11. "type" = "doc",
  12. "user" = "root",
  13. "password" = "root",
  14. "nodes_discovery" = "true"
  15. );

Parameter Description:

ParameterDescription
es_nodes_discoveryWhether or not to enable ES node discovery. the default is true

Doris would find all available related data nodes (shards allocated on)from ES when this is true. Just set false if address of ES data nodes are not accessed by Doris BE, eg. the ES cluster is deployed in the intranet which isolated from your public Internet, and users access through a proxy

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test",
  11. "type" = "doc",
  12. "user" = "root",
  13. "password" = "root",
  14. "http_ssl_enabled" = "true"
  15. );

Parameter Description:

The current FE/BE implementation is to trust all, this is a temporary solution, and the real user configuration certificate will be used later

Query usage

After create the ES external table in Doris, there is no difference except that the data model (rollup, pre-aggregation, materialized view, etc.) with other table in Doris

Basic usage

Extended esquery(field, QueryDSL)

Through the esquery(field, QueryDSL) function, some queries that cannot be expressed in sql, such as match_phrase, geoshape, etc., are pushed down to the ES for filtering. The first column name parameter of esquery is used to associate the index, the second This parameter is the basic JSON expression of ES’s Query DSL, which is contained in curly braces {}, and there can be only one root key of json, such as match_phrase, geo_shape, bool, etc. Match query:

  1. select * from es_table where esquery(k4, '{
  2. "match": {
  3. "k4": "doris on es"
  4. }
  5. }');

Geo related queries:

  1. select * from es_table where esquery(k4, '{
  2. "geo_shape": {
  3. "location": {
  4. "shape": {
  5. "type": "envelope",
  6. "coordinates": [
  7. [
  8. 13,
  9. 53
  10. ],
  11. [
  12. 14,
  13. ]
  14. ]
  15. },
  16. "relation": "within"
  17. }
  18. }
  19. }');

Bool query:

  1. select * from es_table where esquery(k4, ' {
  2. "bool": {
  3. "must": [
  4. "terms": {
  5. "k1": [
  6. 11,
  7. 12
  8. ]
  9. }
  10. },
  11. {
  12. "terms": {
  13. "k2": [
  14. 100
  15. ]
  16. }
  17. }
  18. ]
  19. }
  20. }');
  1. +----------------------------------------------+
  2. | |
  3. | Doris +------------------+ |
  4. | | FE +--------------+-------+
  5. | | | Request Shard Location
  6. | +--+-------------+-+ | |
  7. | ^ ^ | |
  8. | | | | |
  9. | +-------------------+ +------------------+ | |
  10. | | | | | | | | |
  11. | | +----------+----+ | | +--+-----------+ | | |
  12. | | | BE | | | | BE | | | |
  13. | | +---------------+ | | +--------------+ | | |
  14. +----------------------------------------------+ |
  15. | | | | | | |
  16. | | | | | | |
  17. | HTTP SCROLL | | HTTP SCROLL | |
  18. +-----------+---------------------+------------+ |
  19. | | v | | v | | |
  20. | | +------+--------+ | | +------+-------+ | | |
  21. | | | | | | | | | | |
  22. | | | DataNode | | | | DataNode +<-----------+
  23. | | | | | | | | | | |
  24. | | | +<--------------------------------+
  25. | | +---------------+ | | |--------------| | | |
  26. | +-------------------+ +------------------+ | |
  27. | Same Physical Node | |
  28. | | |
  29. | +-----------------------+ | |
  30. | | | | |
  31. | | MasterNode +<-----------------+
  32. | ES | | |
  33. | +-----------------------+ |
  34. +----------------------------------------------+
  1. FE requests the hosts specified by the table to obtain node‘s HTTP port, shards location of the index. If the request fails, it will traverse the host list sequentially until it succeeds or fails completely.

  2. When querying, the query plan will be generated and sent to the corresponding BE node according to some node information obtained by FE and metadata information of index.

  3. After calculating the result, return it to client

Suggestions for using Date type fields

The use of Datetype fields in ES is very flexible, but in Doris On ES, if the type of the Date type field is not set properly, it will cause the filter condition cannot be pushed down.

When creating an index, do maximum format compatibility with the setting of the Date type format:

  1. "dt": {
  2. "type": "date",
  3. "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
  4. }

When creating this field in Doris, it is recommended to set it to date or datetime, and it can also be set to varchar type. The following SQL statements can be used to directly push the filter condition down to ES

  1. select * from doe where k2 > '2020-06-21';
  2. select * from doe where k2 < '2020-06-21 12:00:00';
  3. select * from doe where k2 < 1593497011;
  4. select * from doe where k2 < now();
  5. select * from doe where k2 < date_format(now(), '%Y-%m-%d');

Notice:

  • If you don’t set the format for the time type field In ES, the default format for Date-type field is
  • If the date field indexed into ES is unix timestamp, it needs to be converted to ms, and the internal timestamp of ES is processed according to ms unit, otherwise Doris On ES will display wrong column data

Fetch ES metadata field _id

When indexing documents without specifying _id, ES will assign a globally unique _id field to each document. Users can also specify a _id with special represent some business meaning for the document when indexing; if needed, Doris On ES can get the value of this field by adding the _id field of type varchar when creating the ES external table

  1. CREATE EXTERNAL TABLE `doe` (
  2. `_id` varchar COMMENT "",
  3. `city` varchar COMMENT ""
  4. ) ENGINE=ELASTICSEARCH
  5. PROPERTIES (
  6. "hosts" = "http://127.0.0.1:8200",
  7. "user" = "root",
  8. "password" = "root",
  9. "index" = "doe",
  10. "type" = "doc"
  11. }

Notice:

  1. The filtering condition of the _id field only supports two types: = and in
  2. The _id field can only be of type varchar
  1. ES Version Requirements

    The main version of ES is larger than 5. The scanning mode of ES data before 2. X and after 5. x is different. At present, the scanning mode of ES data after 5. x is supported.

  2. Does ES Cluster Support X-Pack Authentication

    Support all ES clusters using HTTP Basic authentication

  3. Some queries are much slower than requesting ES

    Yes, for example, query related to _count, etc., the ES internal will directly read the number of documents that meet the requirements of the relevant metadata, without the need to filter the real data.