Query execution
Druid’s approach to query execution varies depending on the kind of datasource you are querying.
Queries that operate directly on are executed using a scatter-gather approach led by the Broker process. The process looks like this:
The Broker identifies which segments are relevant to the query based on the
"intervals"
parameter. Segments are always partitioned by time, so any segment whose interval overlaps the query interval is potentially relevant.The Broker may additionally further prune the segment list based on the
"filter"
, if the input data was partitioned by range using the , and if the filter matches the dimension used for partitioning.The Broker, having pruned the list of segments for the query, forwards the query to data servers (like Historicals and tasks running on MiddleManagers) that are currently serving those segments.
For all query types except Scan, data servers process each segment in parallel and generate partial results for each segment. The specific processing that is done depends on the query type. These partial results may be cached if is enabled. For Scan queries, segments are processed in order by a single thread, and results are not cached.
lookup
Queries that operate directly on (without a join) are executed on the Broker that received the query, using its local copy of the lookup. All registered lookup tables are preloaded in-memory on the Broker. The query runs single-threaded.
Execution of queries that use lookups as right-hand inputs to a join are executed in a way that depends on their “base” (bottom-leftmost) datasource, as described in the join section below.
Queries that operate directly on union datasources are split up on the Broker into a separate query for each table that is part of the union. Each of these queries runs separately, and the Broker merges their results together.
inline
Queries that operate directly on inline datasources are executed on the Broker that received the query. The query runs single-threaded.
Execution of queries that use inline datasources as right-hand inputs to a join are executed in a way that depends on their “base” (bottom-leftmost) datasource, as described in the section below.
are subqueries. Each subquery is executed as if it was its own query and the results are brought back to the Broker. Then, the Broker continues on with the rest of the query as if the subquery was replaced with an inline datasource.
There is one exception: if the outer query and all subqueries are the groupBy type, then subquery results can be processed in a streaming fashion and the druid.server.http.maxSubqueryRows
limit does not apply.
join
Join datasources are handled using a broadcast hash-join approach.
The Broker executes any subqueries that are inputs the join, as described in the section, and replaces them with inline datasources.
The Broker flattens a join tree, if present, into a “base” datasource (the bottom-leftmost one) and other leaf datasources (the rest).
Query execution proceeds using the same structure that the base datasource would use on its own. If the base datasource is a table, segments are pruned based on as usual, and the query is executed on the cluster by forwarding it to all relevant data servers in parallel. If the base datasource is a or inline datasource (including an inline datasource that was the result of inlining a subquery), the query is executed on the Broker itself. The base query cannot be a union, because unions are not currently supported as inputs to a join.
Before beginning to process the base datasource, the server(s) that will execute the query first inspect all the non-base leaf datasources to determine if a new hash table needs to be built for the upcoming hash join. Currently, lookups do not require new hash tables to be built (because they are preloaded), but inline datasources do.