Native batch ingestion

    To run either kind of native batch indexing task, write an ingestion spec as specified below. Then POST it to the /druid/indexer/v1/task endpoint on the Overlord, or use the bin/post-index-task script included with Druid.

    This page contains reference documentation for native batch ingestion. For a walk-through instead, check out the tutorial, which demonstrates the “simple” (single-task) mode.

    The Parallel task (type index_parallel) is a task for parallel batch indexing. This task only uses Druid’s resource and doesn’t depend on other external systems like Hadoop. The index_parallel task is a supervisor task that orchestrates the whole indexing process. The supervisor task splits the input data and creates worker tasks to process those splits. The created worker tasks are issued to the Overlord so that they can be scheduled and run on MiddleManagers or Indexers. Once a worker task successfully processes the assigned input split, it reports the generated segment list to the supervisor task. The supervisor task periodically checks the status of worker tasks. If one of them fails, it retries the failed task until the number of retries reaches the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalizes ingestion.

    The detailed behavior of the Parallel task is different depending on the partitionsSpec. See each partitionsSpec for more details.

    To use this task, the in the ioConfig should be splittable and maxNumConcurrentSubTasks should be set to larger than 1 in the tuningConfig. Otherwise, this task runs sequentially; the index_parallel task reads each input file one by one and creates segments by itself. The supported splittable input formats for now are:

    • s3 reads data from AWS S3 storage.
    • reads data from Google Cloud Storage.
    • azure reads data from Azure Blob Storage.
    • reads data from HDFS storage.
    • http reads data from HTTP servers.
    • reads data from local storage.
    • druid reads data from a Druid datasource.
    • reads data from a RDBMS source.

    Some other cloud storage types are supported with the legacy firehose. The below firehose types are also splittable. Note that only text formats are supported with the firehose.

    The supported compression formats for native batch ingestion are bz2, gz, xz, zip, sz (Snappy), and zst (ZSTD).

    You may want to consider the below things:

    • You may want to control the amount of input data each worker task processes. This can be controlled using different configurations depending on the phase in parallel ingestion (see partitionsSpec for more details). For the tasks that read data from the inputSource, you can set the in the tuningConfig. For the tasks that merge shuffled segments, you can set the totalNumMergeTasks in the tuningConfig.
    • The number of concurrent worker tasks in parallel ingestion is determined by maxNumConcurrentSubTasks in the tuningConfig. The supervisor task checks the number of current running worker tasks and creates more if it’s smaller than maxNumConcurrentSubTasks no matter how many task slots are currently available. This may affect to other ingestion performance. See the below Capacity Planning section for more details.
    • By default, batch ingestion replaces all data (in your granularitySpec‘s intervals) in any segment that it writes to. If you’d like to add to the segment instead, set the appendToExisting flag in the ioConfig. Note that it only replaces data in segments where it actively adds data: if there are segments in your granularitySpec‘s intervals that have no data written by this task, they will be left alone. If any existing segments partially overlap with the granularitySpec‘s intervals, the portion of those segments outside the new segments’ intervals will still be visible.

    Task syntax

    A sample task is shown below:

    dataSchema

    This field is required.

    See

    If you specify intervals explicitly in your dataSchema’s granularitySpec, batch ingestion will lock the full intervals specified when it starts up, and you will learn quickly if the specified interval overlaps with locks held by other tasks (e.g., Kafka ingestion). Otherwise, batch ingestion will lock each interval as it is discovered, so you may only learn that the task overlaps with a higher-priority task later in ingestion. If you specify intervals explicitly, any rows outside the specified intervals will be thrown away. We recommend setting intervals explicitly if you know the time range of the data so that locking failure happens faster, and so that you don’t accidentally replace data outside that range if there’s some stray data with unexpected timestamps.

    ioConfig

    propertydescriptiondefaultrequired?
    typeThe task type, this should always be index_parallel.noneyes
    inputFormat to specify how to parse input data.noneyes
    appendToExistingCreates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. The current limitation is that you can append to any datasources regardless of their original partitioning scheme, but the appended segments should be partitioned using the dynamic partitionsSpec.falseno

    tuningConfig

    The tuningConfig is optional and default parameters will be used if no tuningConfig is specified. See below for more details.

    propertydescriptiondefaultrequired?
    typeThe task type, this should always be index_parallel.noneyes
    maxRowsPerSegmentDeprecated. Use partitionsSpec instead. Used in sharding. Determines how many rows are in each segment.5000000no
    maxRowsInMemoryUsed in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.1000000no
    maxBytesInMemoryUsed in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory (2 + maxPendingPersists)1/6 of max JVM memoryno
    maxColumnsToMergeA parameter that limits how many segments can be merged in a single phase when merging segments for publishing. This limit is imposed on the total number of columns present in a set of segments being merged. If the limit is exceeded, segment merging will occur in multiple phases. At least 2 segments will be merged in a single phase, regardless of this setting.-1 (unlimited)no
    maxTotalRowsDeprecated. Use partitionsSpec instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.20000000no
    numShardsDeprecated. Use partitionsSpec instead. Directly specify the number of shards to create when using a hashed partitionsSpec. If this is specified and intervals is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.nullno
    splitHintSpecUsed to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See for more details.size-based split hint specno
    partitionsSpecDefines how to partition data in each timeChunk, see PartitionsSpecdynamic if forceGuaranteedRollup = false, hashed or single_dim if forceGuaranteedRollup = trueno
    indexSpecDefines segment storage format options to be used at indexing time, see nullno
    indexSpecForIntermediatePersistsDefines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see IndexSpec for possible values.same as indexSpecno
    maxPendingPersistsMaximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory (2 + maxPendingPersists).0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)no
    forceGuaranteedRollupForces guaranteeing the . The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, intervals in granularitySpec must be set and hashed or single_dim must be used for partitionsSpec. This flag cannot be used with appendToExisting of IOConfig. For more details, see the below Segment pushing modes section.falseno
    reportParseExceptionsIf true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.falseno
    pushTimeoutMilliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.0no
    segmentWriteOutMediumFactorySegment write-out medium to use when creating segments. See SegmentWriteOutMediumFactory.Not specified, the value from druid.peon.defaultSegmentWriteOutMediumFactory.type is usedno
    maxNumConcurrentSubTasksMaximum number of worker tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to maxNumConcurrentSubTasks regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check for more details.1no
    maxRetryMaximum number of retries on task failures.3no
    maxNumSegmentsToMergeMax limit for the number of segments that a single task can merge at the same time in the second phase. Used only forceGuaranteedRollup is set.100no
    totalNumMergeTasksTotal number of tasks to merge segments in the merge phase when partitionsSpec is set to hashed or single_dim.10no
    taskStatusCheckPeriodMsPolling period in milliseconds to check running task statuses.1000no
    chatHandlerTimeoutTimeout for reporting the pushed segments in worker tasks.PT10Sno
    chatHandlerNumRetriesRetries for reporting the pushed segments in worker tasks.5no

    Split Hint Spec

    The split hint spec is used to give a hint when the supervisor task creates input splits. Note that each worker task processes a single input split. You can control the amount of data each worker task will read during the first phase.

    Size-based Split Hint Spec

    The size-based split hint spec is respected by all splittable input sources except for the HTTP input source and SQL input source.

    propertydescriptiondefaultrequired?
    typeThis should always be maxSize.noneyes
    maxSplitSizeMaximum number of bytes of input files to process in a single subtask. If a single file is larger than this number, it will be processed by itself in a single subtask (Files are never split across tasks yet). Note that one subtask will not process more files than maxNumFiles even when their total size is smaller than maxSplitSize. Human-readable format is supported.1GiBno
    maxNumFilesMaximum number of input files to process in a single subtask. This limit is to avoid task failures when the ingestion spec is too long. There are two known limits on the max size of serialized ingestion spec, i.e., the max ZNode size in ZooKeeper (jute.maxbuffer) and the max packet size in MySQL (max_allowed_packet). These can make ingestion tasks fail if the serialized ingestion spec size hits one of them. Note that one subtask will not process more data than maxSplitSize even when the total number of files is smaller than maxNumFiles.1000no

    Segments Split Hint Spec

    The segments split hint spec is used only for DruidInputSource (and legacy ).

    propertydescriptiondefaultrequired?
    typeThis should always be segments.noneyes
    maxInputSegmentBytesPerTaskMaximum number of bytes of input segments to process in a single subtask. If a single segment is larger than this number, it will be processed by itself in a single subtask (input segments are never split across tasks). Note that one subtask will not process more segments than maxNumSegments even when their total size is smaller than maxInputSegmentBytesPerTask. Human-readable format is supported.1GiBno
    maxNumSegmentsMaximum number of input segments to process in a single subtask. This limit is to avoid task failures when the ingestion spec is too long. There are two known limits on the max size of serialized ingestion spec, i.e., the max ZNode size in ZooKeeper (jute.maxbuffer) and the max packet size in MySQL (max_allowed_packet). These can make ingestion tasks fail if the serialized ingestion spec size hits one of them. Note that one subtask will not process more data than maxInputSegmentBytesPerTask even when the total number of segments is smaller than maxNumSegments.1000no

    partitionsSpec

    PartitionsSpec is used to describe the secondary partitioning method. You should use different partitionsSpec depending on the rollup mode you want. For perfect rollup, you should use either hashed (partitioning based on the hash of dimensions in each row) or single_dim (based on ranges of a single dimension). For best-effort rollup, you should use dynamic.

    The three partitionsSpec types have different characteristics.

    PartitionsSpecIngestion speedPartitioning methodSupported rollup modeSecondary partition pruning at query time
    dynamicFastestPartitioning based on number of rows in segment.Best-effort rollupN/A
    hashedModeratePartitioning based on the hash value of partition dimensions. This partitioning may reduce your datasource size and query latency by improving data locality. See for more details.Perfect rollupThe broker can use the partition information to prune segments early to speed up queries. Since the broker knows how to hash partitionDimensions values to locate a segment, given a query including a filter on all the partitionDimensions, the broker can pick up only the segments holding the rows satisfying the filter on partitionDimensions for query processing.

    Note that partitionDimensions must be set at ingestion time to enable secondary partition pruning at query time.
    single_dimSlowestRange partitioning based on the value of the partition dimension. Segment sizes may be skewed depending on the partition key distribution. This may reduce your datasource size and query latency by improving data locality. See Partitioning for more details.Perfect rollupThe broker can use the partition information to prune segments early to speed up queries. Since the broker knows the range of partitionDimension values in each segment, given a query including a filter on the partitionDimension, the broker can pick up only the segments holding the rows satisfying the filter on partitionDimension for query processing.

    The recommended use case for each partitionsSpec is:

    • If your data has a uniformly distributed column which is frequently used in your queries, consider using single_dim partitionsSpec to maximize the performance of most of your queries.
    • If your data doesn’t have a uniformly distributed column, but is expected to have a when you roll up with some dimensions, consider using hashed partitionsSpec. It could reduce the size of datasource and query latency by improving data locality.
    • If the above two scenarios are not the case or you don’t need to roll up your datasource, consider using dynamic partitionsSpec.

    Dynamic partitioning

    propertydescriptiondefaultrequired?
    typeThis should always be dynamicnoneyes
    maxRowsPerSegmentUsed in sharding. Determines how many rows are in each segment.5000000no
    maxTotalRowsTotal number of rows across all segments waiting for being pushed. Used in determining when intermediate segment push should occur.20000000no

    With the Dynamic partitioning, the parallel index task runs in a single phase: it will spawn multiple worker tasks (type single_phase_sub_task), each of which creates segments. How the worker task creates segments is:

    • The task creates a new segment whenever the number of rows in the current segment exceeds maxRowsPerSegment.
    • Once the total number of rows in all segments across all time chunks reaches to maxTotalRows, the task pushes all segments created so far to the deep storage and creates new ones.

    Hash-based partitioning

    propertydescriptiondefaultrequired?
    typeThis should always be hashednoneyes
    numShardsDirectly specify the number of shards to create. If this is specified and intervals is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. This property and targetRowsPerSegment cannot both be set.noneno
    targetRowsPerSegmentA target row count for each partition. If numShards is left unspecified, the Parallel task will determine a partition count automatically such that each partition has a row count close to the target, assuming evenly distributed keys in the input data. A target per-segment row count of 5 million is used if both numShards and targetRowsPerSegment are null.null (or 5,000,000 if both numShards and targetRowsPerSegment are null)no
    partitionDimensionsThe dimensions to partition on. Leave blank to select all dimensions.nullno
    partitionFunctionA function to compute hash of partition dimensions. See Hash partition functionmurmur3_32_absno

    The Parallel task with hash-based partitioning is similar to . The task runs in up to 3 phases: partial dimension cardinality, partial segment generation and partial segment merge.

    • The partial dimension cardinality phase is an optional phase that only runs if numShards is not specified. The Parallel task splits the input data and assigns them to worker tasks based on the split hint spec. Each worker task (type partial_dimension_cardinality) gathers estimates of partitioning dimensions cardinality for each time chunk. The Parallel task will aggregate these estimates from the worker tasks and determine the highest cardinality across all of the time chunks in the input data, dividing this cardinality by targetRowsPerSegment to automatically determine numShards.
    • In the partial segment generation phase, just like the Map phase in MapReduce, the Parallel task splits the input data based on the split hint spec and assigns each split to a worker task. Each worker task (type partial_index_generate) reads the assigned split, and partitions rows by the time chunk from segmentGranularity (primary partition key) in the granularitySpec and then by the hash value of partitionDimensions (secondary partition key) in the partitionsSpec. The partitioned data is stored in local storage of the middleManager or the .
    • The partial segment merge phase is similar to the Reduce phase in MapReduce. The Parallel task spawns a new set of worker tasks (type partial_index_generic_merge) to merge the partitioned data created in the previous phase. Here, the partitioned data is shuffled based on the time chunk and the hash value of partitionDimensions to be merged; each worker task reads the data falling in the same time chunk and the same hash value from multiple MiddleManager/Indexer processes and merges them to create the final segments. Finally, they push the final segments to the deep storage at once.
    Hash partition function

    In hash partitioning, the partition function is used to compute hash of partition dimensions. The partition dimension values are first serialized into a byte array as a whole, and then the partition function is applied to compute hash of the byte array. Druid currently supports only one partition function.

    namedescription
    murmur3_32_absApplies an absolute value function to the result of .

    Single-dimension range partitioning

    propertydescriptiondefaultrequired?
    typeThis should always be single_dimnoneyes
    partitionDimensionThe dimension to partition on. Only rows with a single dimension value are allowed.noneyes
    targetRowsPerSegmentTarget number of rows to include in a partition, should be a number that targets segments of 500MB~1GB.noneeither this or maxRowsPerSegment
    maxRowsPerSegmentSoft max for the number of rows to include in a partition.noneeither this or targetRowsPerSegment
    assumeGroupedAssume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.falseno

    With single-dim partitioning, the Parallel task runs in 3 phases, i.e., partial dimension distribution, partial segment generation, and partial segment merge. The first phase is to collect some statistics to find the best partitioning and the other 2 phases are to create partial segments and to merge them, respectively, as in hash-based partitioning.

    • In the partial dimension distribution phase, the Parallel task splits the input data and assigns them to worker tasks based on the split hint spec. Each worker task (type partial_dimension_distribution) reads the assigned split and builds a histogram for partitionDimension. The Parallel task collects those histograms from worker tasks and finds the best range partitioning based on partitionDimension to evenly distribute rows across partitions. Note that either targetRowsPerSegment or maxRowsPerSegment will be used to find the best partitioning.
    • In the partial segment generation phase, the Parallel task spawns new worker tasks (type partial_range_index_generate) to create partitioned data. Each worker task reads a split created as in the previous phase, partitions rows by the time chunk from the segmentGranularity (primary partition key) in the granularitySpec and then by the range partitioning found in the previous phase. The partitioned data is stored in local storage of the or the indexer.
    • In the partial segment merge phase, the parallel index task spawns a new set of worker tasks (type partial_index_generic_merge) to merge the partitioned data created in the previous phase. Here, the partitioned data is shuffled based on the time chunk and the value of partitionDimension; each worker task reads the segments falling in the same partition of the same range from multiple MiddleManager/Indexer processes and merges them to create the final segments. Finally, they push the final segments to the deep storage.

    Because the task with single-dimension range partitioning makes two passes over the input in partial dimension distribution and partial segment generation phases, the task may fail if the input changes in between the two passes.

    HTTP status endpoints

    The supervisor task provides some HTTP endpoints to get running status.

    • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/mode

    Returns ‘parallel’ if the indexing task is running in parallel. Otherwise, it returns ‘sequential’.

    • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/phase

    Returns the name of the current phase if the task running in the parallel mode.

    • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/progress

    Returns the estimated progress of the current phase if the supervisor task is running in the parallel mode.

    An example of the result is

    1. {
    2. "running":10,
    3. "succeeded":0,
    4. "failed":0,
    5. "complete":0,
    6. "total":10,
    7. "estimatedExpectedSucceeded":10
    8. }
    • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtasks/running

    Returns the task IDs of running worker tasks, or an empty list if the supervisor task is running in the sequential mode.

    • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs

    Returns all worker task specs, or an empty list if the supervisor task is running in the sequential mode.

    • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/running

    Returns running worker task specs, or an empty list if the supervisor task is running in the sequential mode.

    • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/complete

    Returns complete worker task specs, or an empty list if the supervisor task is running in the sequential mode.

    • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}

    Returns the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode.

    • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/state

    Returns the state of the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode. The returned result contains the worker task spec, a current task status if exists, and task attempt history.

    An example of the result is

    1. {
    2. "spec": {
    3. "id": "index_parallel_lineitem_2018-04-20T22:12:43.610Z_2",
    4. "groupId": "index_parallel_lineitem_2018-04-20T22:12:43.610Z",
    5. "supervisorTaskId": "index_parallel_lineitem_2018-04-20T22:12:43.610Z",
    6. "context": null,
    7. "inputSplit": {
    8. "split": "/path/to/data/lineitem.tbl.5"
    9. },
    10. "ingestionSpec": {
    11. "dataSchema": {
    12. "dataSource": "lineitem",
    13. "timestampSpec": {
    14. "column": "l_shipdate",
    15. "format": "yyyy-MM-dd"
    16. },
    17. "dimensionsSpec": {
    18. "dimensions": [
    19. "l_orderkey",
    20. "l_partkey",
    21. "l_suppkey",
    22. "l_linenumber",
    23. "l_returnflag",
    24. "l_linestatus",
    25. "l_shipdate",
    26. "l_commitdate",
    27. "l_receiptdate",
    28. "l_shipinstruct",
    29. "l_shipmode",
    30. "l_comment"
    31. ]
    32. },
    33. "metricsSpec": [
    34. {
    35. "type": "count",
    36. "name": "count"
    37. },
    38. {
    39. "type": "longSum",
    40. "name": "l_quantity",
    41. "fieldName": "l_quantity",
    42. "expression": null
    43. },
    44. {
    45. "type": "doubleSum",
    46. "name": "l_extendedprice",
    47. "fieldName": "l_extendedprice",
    48. "expression": null
    49. },
    50. {
    51. "type": "doubleSum",
    52. "name": "l_discount",
    53. "fieldName": "l_discount",
    54. "expression": null
    55. },
    56. {
    57. "type": "doubleSum",
    58. "name": "l_tax",
    59. "fieldName": "l_tax",
    60. "expression": null
    61. }
    62. ],
    63. "granularitySpec": {
    64. "type": "uniform",
    65. "segmentGranularity": "YEAR",
    66. "queryGranularity": {
    67. "type": "none"
    68. },
    69. "rollup": true,
    70. "intervals": [
    71. "1980-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"
    72. ]
    73. },
    74. "transformSpec": {
    75. "filter": null,
    76. "transforms": []
    77. }
    78. },
    79. "ioConfig": {
    80. "type": "index_parallel",
    81. "inputSource": {
    82. "type": "local",
    83. "baseDir": "/path/to/data/",
    84. "filter": "lineitem.tbl.5"
    85. },
    86. "inputFormat": {
    87. "format": "tsv",
    88. "delimiter": "|",
    89. "columns": [
    90. "l_orderkey",
    91. "l_partkey",
    92. "l_suppkey",
    93. "l_linenumber",
    94. "l_quantity",
    95. "l_extendedprice",
    96. "l_discount",
    97. "l_tax",
    98. "l_returnflag",
    99. "l_linestatus",
    100. "l_shipdate",
    101. "l_commitdate",
    102. "l_receiptdate",
    103. "l_shipinstruct",
    104. "l_shipmode",
    105. "l_comment"
    106. ]
    107. },
    108. "appendToExisting": false
    109. },
    110. "tuningConfig": {
    111. "type": "index_parallel",
    112. "maxRowsPerSegment": 5000000,
    113. "maxRowsInMemory": 1000000,
    114. "maxTotalRows": 20000000,
    115. "numShards": null,
    116. "indexSpec": {
    117. "bitmap": {
    118. "type": "roaring"
    119. },
    120. "metricCompression": "lz4",
    121. "longEncoding": "longs"
    122. },
    123. "indexSpecForIntermediatePersists": {
    124. "bitmap": {
    125. "type": "roaring"
    126. },
    127. "dimensionCompression": "lz4",
    128. "metricCompression": "lz4",
    129. "longEncoding": "longs"
    130. },
    131. "maxPendingPersists": 0,
    132. "reportParseExceptions": false,
    133. "pushTimeout": 0,
    134. "segmentWriteOutMediumFactory": null,
    135. "maxNumConcurrentSubTasks": 4,
    136. "maxRetry": 3,
    137. "taskStatusCheckPeriodMs": 1000,
    138. "chatHandlerTimeout": "PT10S",
    139. "chatHandlerNumRetries": 5,
    140. "maxParseExceptions": 2147483647,
    141. "maxSavedParseExceptions": 0,
    142. "forceGuaranteedRollup": false,
    143. "buildV9Directly": true
    144. }
    145. }
    146. },
    147. "currentStatus": {
    148. "id": "index_sub_lineitem_2018-04-20T22:16:29.922Z",
    149. "type": "index_sub",
    150. "createdTime": "2018-04-20T22:16:29.925Z",
    151. "queueInsertionTime": "2018-04-20T22:16:29.929Z",
    152. "statusCode": "RUNNING",
    153. "duration": -1,
    154. "location": {
    155. "host": null,
    156. "port": -1,
    157. "tlsPort": -1
    158. },
    159. "dataSource": "lineitem",
    160. "errorMsg": null
    161. },
    162. "taskHistory": []
    163. }
    • http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/history

    Returns the task attempt history of the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode.

    Capacity planning

    The supervisor task can create up to maxNumConcurrentSubTasks worker tasks no matter how many task slots are currently available. As a result, total number of tasks which can be run at the same time is (maxNumConcurrentSubTasks + 1) (including the supervisor task). Please note that this can be even larger than total number of task slots (sum of the capacity of all workers). If maxNumConcurrentSubTasks is larger than n (available task slots), then maxNumConcurrentSubTasks tasks are created by the supervisor task, but only n tasks would be started. Others will wait in the pending state until any running task is finished.

    If you are using the Parallel Index Task with stream ingestion together, we would recommend to limit the max capacity for batch ingestion to prevent stream ingestion from being blocked by batch ingestion. Suppose you have t Parallel Index Tasks to run at the same time, but want to limit the max number of tasks for batch ingestion to b. Then, (sum of maxNumConcurrentSubTasks of all Parallel Index Tasks + t (for supervisor tasks)) must be smaller than b.

    If you have some tasks of a higher priority than others, you may set their maxNumConcurrentSubTasks to a higher value than lower priority tasks. This may help the higher priority tasks to finish earlier than lower priority tasks by assigning more task slots to them.

    Task syntax

    A sample task is shown below:

    1. {
    2. "type" : "index",
    3. "spec" : {
    4. "dataSchema" : {
    5. "dataSource" : "wikipedia",
    6. "timestampSpec" : {
    7. "column" : "timestamp",
    8. "format" : "auto"
    9. },
    10. "dimensionsSpec" : {
    11. "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
    12. "dimensionExclusions" : []
    13. },
    14. "metricsSpec" : [
    15. {
    16. "type" : "count",
    17. "name" : "count"
    18. },
    19. {
    20. "type" : "doubleSum",
    21. "name" : "added",
    22. "fieldName" : "added"
    23. },
    24. {
    25. "type" : "doubleSum",
    26. "name" : "deleted",
    27. "fieldName" : "deleted"
    28. },
    29. {
    30. "type" : "doubleSum",
    31. "name" : "delta",
    32. "fieldName" : "delta"
    33. }
    34. ],
    35. "granularitySpec" : {
    36. "type" : "uniform",
    37. "segmentGranularity" : "DAY",
    38. "queryGranularity" : "NONE",
    39. "intervals" : [ "2013-08-31/2013-09-01" ]
    40. }
    41. },
    42. "ioConfig" : {
    43. "type" : "index",
    44. "inputSource" : {
    45. "type" : "local",
    46. "baseDir" : "examples/indexing/",
    47. "filter" : "wikipedia_data.json"
    48. },
    49. "inputFormat": {
    50. "type": "json"
    51. }
    52. },
    53. "tuningConfig" : {
    54. "type" : "index",
    55. "maxRowsPerSegment" : 5000000,
    56. "maxRowsInMemory" : 1000000
    57. }
    58. }
    59. }
    propertydescriptionrequired?
    typeThe task type, this should always be index.yes
    idThe task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp.no
    specThe ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details.yes
    contextContext containing various task configuration parameters. See below for more details.no

    dataSchema

    This field is required.

    See the section of the ingestion docs for details.

    If you do not specify intervals explicitly in your dataSchema’s granularitySpec, the Local Index Task will do an extra pass over the data to determine the range to lock when it starts up. If you specify intervals explicitly, any rows outside the specified intervals will be thrown away. We recommend setting intervals explicitly if you know the time range of the data because it allows the task to skip the extra pass, and so that you don’t accidentally replace data outside that range if there’s some stray data with unexpected timestamps.

    propertydescriptiondefaultrequired?
    typeThe task type, this should always be “index”.noneyes
    inputFormatinputFormat to specify how to parse input data.noneyes
    appendToExistingCreates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. The current limitation is that you can append to any datasources regardless of their original partitioning scheme, but the appended segments should be partitioned using the dynamic partitionsSpec.falseno

    tuningConfig

    The tuningConfig is optional and default parameters will be used if no tuningConfig is specified. See below for more details.

    propertydescriptiondefaultrequired?
    typeThe task type, this should always be “index”.noneyes
    maxRowsPerSegmentDeprecated. Use partitionsSpec instead. Used in sharding. Determines how many rows are in each segment.5000000no
    maxRowsInMemoryUsed in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.1000000no
    maxBytesInMemoryUsed in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory (2 + maxPendingPersists)1/6 of max JVM memoryno
    maxTotalRowsDeprecated. Use partitionsSpec instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.20000000no
    numShardsDeprecated. Use partitionsSpec instead. Directly specify the number of shards to create. If this is specified and intervals is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.nullno
    partitionDimensionsDeprecated. Use partitionsSpec instead. The dimensions to partition on. Leave blank to select all dimensions. Only used with forceGuaranteedRollup = true, will be ignored otherwise.nullno
    partitionsSpecDefines how to partition data in each timeChunk, see PartitionsSpecdynamic if forceGuaranteedRollup = false, hashed if forceGuaranteedRollup = trueno
    indexSpecDefines segment storage format options to be used at indexing time, see nullno
    indexSpecForIntermediatePersistsDefines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see IndexSpec for possible values.same as indexSpecno
    maxPendingPersistsMaximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory (2 + maxPendingPersists).0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)no
    forceGuaranteedRollupForces guaranteeing the . The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. This flag cannot be used with appendToExisting of IOConfig. For more details, see the below Segment pushing modes section.falseno
    reportParseExceptionsDEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting reportParseExceptions to true will override existing configurations for maxParseExceptions and maxSavedParseExceptions, setting maxParseExceptions to 0 and limiting maxSavedParseExceptions to no more than 1.falseno
    pushTimeoutMilliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.0no
    segmentWriteOutMediumFactorySegment write-out medium to use when creating segments. See SegmentWriteOutMediumFactory.Not specified, the value from druid.peon.defaultSegmentWriteOutMediumFactory.type is usedno
    logParseExceptionsIf true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.falseno
    maxParseExceptionsThe maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if reportParseExceptions is set.unlimitedno
    maxSavedParseExceptionsWhen a parse exception occurs, Druid can keep track of the most recent parse exceptions. “maxSavedParseExceptions” limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the . Overridden if reportParseExceptions is set.0no

    partitionsSpec

    PartitionsSpec is to describe the secondary partitioning method. You should use different partitionsSpec depending on the you want. For perfect rollup, you should use hashed.

    propertydescriptiondefaultrequired?
    typeThis should always be hashednoneyes
    maxRowsPerSegmentUsed in sharding. Determines how many rows are in each segment.5000000no
    numShardsDirectly specify the number of shards to create. If this is specified and intervals is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.nullno
    partitionDimensionsThe dimensions to partition on. Leave blank to select all dimensions.nullno
    partitionFunctionA function to compute hash of partition dimensions. See Hash partition functionmurmur3_32_absno

    For best-effort rollup, you should use dynamic.

    segmentWriteOutMediumFactory

    FieldTypeDescriptionRequired
    typeStringSee Additional Peon Configuration: SegmentWriteOutMediumFactory for explanation and available options.yes

    Segment pushing modes

    While ingesting data using the Index task, it creates segments from the input data and pushes them. For segment pushing, the Index task supports two segment pushing modes, i.e., bulk pushing mode and incremental pushing mode for perfect rollup and best-effort rollup, respectively.

    In the bulk pushing mode, every segment is pushed at the very end of the index task. Until then, created segments are stored in the memory and local storage of the process running the index task. As a result, this mode might cause a problem due to limited storage capacity, and is not recommended to use in production.

    On the contrary, in the incremental pushing mode, segments are incrementally pushed, that is they can be pushed in the middle of the index task. More precisely, the index task collects data and stores created segments in the memory and disks of the process running that task until the total number of collected rows exceeds maxTotalRows. Once it exceeds, the index task immediately pushes all segments created until that moment, cleans all pushed segments up, and continues to ingest remaining data.

    To enable bulk pushing mode, forceGuaranteedRollup should be set in the TuningConfig. Note that this option cannot be used with appendToExisting of IOConfig.

    The input source is the place to define from where your index task reads data. Only the native Parallel task and Simple task support the input source.

    S3 Input Source

    You need to include the druid-s3-extensions as an extension to use the S3 input source.

    The S3 input source is to support reading objects directly from S3. Objects can be specified either via a list of S3 URI strings or a list of S3 location prefixes, which will attempt to list the contents and ingest all objects contained in the locations. The S3 input source is splittable and can be used by the , where each worker task of index_parallel will read one or multiple objects.

    Sample specs:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "s3",
    6. "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "s3",
    6. "prefixes": ["s3://foo/bar", "s3://bar/foo"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "s3",
    6. "objects": [
    7. { "bucket": "foo", "path": "bar/file1.json"},
    8. { "bucket": "bar", "path": "foo/file2.json"}
    9. ]
    10. },
    11. "inputFormat": {
    12. "type": "json"
    13. },
    14. ...
    15. },
    16. ...
    propertydescriptiondefaultrequired?
    typeThis should be s3.Noneyes
    urisJSON array of URIs where S3 objects to be ingested are located.Noneuris or prefixes or objects must be set
    prefixesJSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.Noneuris or prefixes or objects must be set
    objectsJSON array of S3 Objects to be ingested.Noneuris or prefixes or objects must be set
    propertiesProperties Object for overriding the default S3 configuration. See below for more information.NoneNo (defaults will be used if not given)

    Note that the S3 input source will skip all empty objects only when prefixes is specified.

    S3 Object:

    propertydescriptiondefaultrequired?
    bucketName of the S3 bucketNoneyes
    pathThe path where data is located.Noneyes

    Properties Object:

    propertydescriptiondefaultrequired?
    accessKeyIdThe Password Provider or plain text string of this S3 InputSource’s access keyNoneyes if secretAccessKey is given
    secretAccessKeyThe or plain text string of this S3 InputSource’s secret keyNoneyes if accessKeyId is given

    Note : If accessKeyId and secretAccessKey are not given, the default S3 credentials provider chain is used.

    Google Cloud Storage Input Source

    The Google Cloud Storage input source is to support reading objects directly from Google Cloud Storage. Objects can be specified as list of Google Cloud Storage URI strings. The Google Cloud Storage input source is splittable and can be used by the Parallel task, where each worker task of index_parallel will read one or multiple objects.

    Sample specs:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "google",
    6. "uris": ["gs://foo/bar/file.json", "gs://bar/foo/file2.json"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "google",
    6. "prefixes": ["gs://foo/bar", "gs://bar/foo"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "google",
    6. "objects": [
    7. { "bucket": "foo", "path": "bar/file1.json"},
    8. { "bucket": "bar", "path": "foo/file2.json"}
    9. ]
    10. },
    11. "inputFormat": {
    12. "type": "json"
    13. },
    14. ...
    15. },
    16. ...
    propertydescriptiondefaultrequired?
    typeThis should be google.Noneyes
    urisJSON array of URIs where Google Cloud Storage objects to be ingested are located.Noneuris or prefixes or objects must be set
    prefixesJSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.Noneuris or prefixes or objects must be set
    objectsJSON array of Google Cloud Storage objects to be ingested.Noneuris or prefixes or objects must be set

    Note that the Google Cloud Storage input source will skip all empty objects only when prefixes is specified.

    Google Cloud Storage object:

    propertydescriptiondefaultrequired?
    bucketName of the Google Cloud Storage bucketNoneyes
    pathThe path where data is located.Noneyes

    Azure Input Source

    You need to include the druid-azure-extensions as an extension to use the Azure input source.

    The Azure input source is to support reading objects directly from Azure Blob store. Objects can be specified as list of Azure Blob store URI strings. The Azure input source is splittable and can be used by the , where each worker task of index_parallel will read a single object.

    Sample specs:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "azure",
    6. "uris": ["azure://container/prefix1/file.json", "azure://container/prefix2/file2.json"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "azure",
    6. "prefixes": ["azure://container/prefix1", "azure://container/prefix2"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    propertydescriptiondefaultrequired?
    typeThis should be azure.Noneyes
    urisJSON array of URIs where Azure Blob objects to be ingested are located. Should be in form “azure://<container>/<path-to-file>”Noneuris or prefixes or objects must be set
    prefixesJSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form “azure://<container>/<prefix>”. Empty objects starting with one of the given prefixes will be skipped.Noneuris or prefixes or objects must be set
    objectsJSON array of Azure Blob objects to be ingested.Noneuris or prefixes or objects must be set

    Note that the Azure input source will skip all empty objects only when prefixes is specified.

    Azure Blob object:

    propertydescriptiondefaultrequired?
    bucketName of the Azure Blob Storage containerNoneyes
    pathThe path where data is located.Noneyes

    HDFS Input Source

    You need to include the as an extension to use the HDFS input source.

    The HDFS input source is to support reading files directly from HDFS storage. File paths can be specified as an HDFS URI string or a list of HDFS URI strings. The HDFS input source is splittable and can be used by the Parallel task, where each worker task of index_parallel will read one or multiple files.

    Sample specs:

    1. ...
    2. "type": "index_parallel",
    3. "inputSource": {
    4. "type": "hdfs",
    5. "paths": "hdfs://foo/bar/", "hdfs://bar/foo"
    6. },
    7. "inputFormat": {
    8. "type": "json"
    9. },
    10. ...
    11. },
    12. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "hdfs",
    6. "paths": ["hdfs://foo/bar", "hdfs://bar/foo"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "hdfs",
    6. "paths": "hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json"
    7. },
    8. "type": "json"
    9. },
    10. ...
    11. },
    12. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "hdfs",
    6. "paths": ["hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    propertydescriptiondefaultrequired?
    typeThis should be hdfs.Noneyes
    pathsHDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like * are supported in these paths. Empty files located under one of the given paths will be skipped.Noneyes

    You can also ingest from cloud storage using the HDFS input source. However, if you want to read from AWS S3 or Google Cloud Storage, consider using the or the Google Cloud Storage input source instead.

    HTTP Input Source

    The HTTP input source is to support reading files directly from remote sites via HTTP. The HTTP input source is splittable and can be used by the Parallel task, where each worker task of index_parallel will read only one file. This input source does not support Split Hint Spec.

    Sample specs:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "http",
    6. "uris": ["http://example.com/uri1", "http://example2.com/uri2"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...

    Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "http",
    6. "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
    7. "httpAuthenticationUsername": "username",
    8. "httpAuthenticationPassword": "password123"
    9. },
    10. "inputFormat": {
    11. "type": "json"
    12. },
    13. ...
    14. },
    15. ...

    You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "http",
    6. "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
    7. "httpAuthenticationUsername": "username",
    8. "httpAuthenticationPassword": {
    9. "type": "environment",
    10. "variable": "HTTP_INPUT_SOURCE_PW"
    11. }
    12. },
    13. "inputFormat": {
    14. "type": "json"
    15. },
    16. ...
    17. },
    18. ...
    19. }
    propertydescriptiondefaultrequired?
    typeThis should be httpNoneyes
    urisURIs of the input files.Noneyes
    httpAuthenticationUsernameUsername to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.Noneno
    httpAuthenticationPasswordPasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.Noneno

    Inline Input Source

    The Inline input source can be used to read the data inlined in its own spec. It can be used for demos or for quickly testing out parsing and schema.

    Sample spec:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "inline",
    6. "data": "0,values,formatted\n1,as,CSV"
    7. },
    8. "inputFormat": {
    9. "type": "csv"
    10. },
    11. ...
    12. },
    13. ...
    propertydescriptionrequired?
    typeThis should be “inline”.yes
    dataInlined data to ingest.yes

    The Local input source is to support reading files directly from local storage, and is mainly intended for proof-of-concept testing. The Local input source is splittable and can be used by the Parallel task, where each worker task of index_parallel will read one or multiple files.

    Sample spec:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "local",
    6. "filter" : "*.csv",
    7. "baseDir": "/data/directory",
    8. "files": ["/bar/foo", "/foo/bar"]
    9. },
    10. "inputFormat": {
    11. "type": "csv"
    12. },
    13. ...
    14. },
    15. ...
    propertydescriptionrequired?
    typeThis should be “local”.yes
    filterA wildcard filter for files. See for more information.yes if baseDir is specified
    baseDirDirectory to search recursively for files to be ingested. Empty files under the baseDir will be skipped.At least one of baseDir or files should be specified
    filesFile paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified baseDir. Empty files will be skipped.At least one of baseDir or files should be specified

    Druid Input Source

    propertydescriptionrequired?
    typeThis should be “druid”.yes
    dataSourceA String defining the Druid datasource to fetch rows fromyes
    intervalA String representing an ISO-8601 interval, which defines the time range to fetch the data over.yes
    dimensionsA list of Strings containing the names of dimension columns to select from the Druid datasource. If the list is empty, no dimensions are returned. If null, all dimensions are returned.no
    metricsThe list of Strings containing the names of metric columns to select. If the list is empty, no metrics are returned. If null, all metrics are returned.no
    filterSee . Only rows that match the filter, if specified, will be returned.no

    A minimal example DruidInputSource spec is shown below:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "druid",
    6. "dataSource": "wikipedia",
    7. "interval": "2013-01-01/2013-01-02"
    8. }
    9. ...
    10. },
    11. ...

    The spec above will read all existing dimension and metric columns from the wikipedia datasource, including all rows with a timestamp (the __time column) within the interval 2013-01-01/2013-01-02.

    A spec that applies a filter and reads a subset of the original datasource’s columns is shown below.

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "druid",
    6. "dataSource": "wikipedia",
    7. "interval": "2013-01-01/2013-01-02",
    8. "dimensions": [
    9. "page",
    10. "user"
    11. ],
    12. "metrics": [
    13. "added"
    14. ],
    15. "filter": {
    16. "type": "selector",
    17. "dimension": "page",
    18. "value": "Druid"
    19. }
    20. }
    21. ...
    22. },
    23. ...

    This spec above will only return the page, user dimensions and added metric. Only rows where page = Druid will be returned.

    SQL Input Source

    The SQL input source is used to read data directly from RDBMS. The SQL input source is splittable and can be used by the , where each worker task will read from one SQL query from the list of queries. This input source does not support Split Hint Spec. Since this input source has a fixed input format for reading events, no inputFormat field needs to be specified in the ingestion spec when using this input source. Please refer to the Recommended practices section below before using this input source.

    An example SqlInputSource spec is shown below:

    The spec above will read all events from two separate SQLs for the interval 2013-01-01/2013-01-02. Each of the SQL queries will be run in its own sub-task and thus for the above example, there would be two sub-tasks.

    Recommended practices

    Compared to the other native batch InputSources, SQL InputSource behaves differently in terms of reading the input data and so it would be helpful to consider the following points before using this InputSource in a production environment:

    • During indexing, each sub-task would execute one of the SQL queries and the results are stored locally on disk. The sub-tasks then proceed to read the data from these local input files and generate segments. Presently, there isn’t any restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed.

    • Filtering the SQL queries based on the intervals specified in the granularitySpec can avoid unwanted data being retrieved and stored locally by the indexing sub-tasks. For example, if the intervals specified in the granularitySpec is ["2013-01-01/2013-01-02"] and the SQL query is SELECT * FROM table1, SqlInputSource will read all the data for table1 based on the query, even though only data between the intervals specified will be indexed into Druid.

    • Pagination may be used on the SQL queries to ensure that each query pulls a similar amount of data, thereby improving the efficiency of the sub-tasks.

    • Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the granularitySpec.

    Combining Input Source

    The Combining input source is used to read data from multiple InputSources. This input source should be only used if all the delegate input sources are splittable and can be used by the . This input source will identify the splits from its delegates and each split will be processed by a worker task. Similar to other input sources, this input source supports a single inputFormat. Therefore, please note that delegate input sources requiring an inputFormat must have the same format for input data.

    propertydescriptionrequired?
    typeThis should be “combining”.Yes
    delegatesList of splittable InputSources to read data from.Yes

    Sample spec:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "combining",
    6. "delegates" : [
    7. {
    8. "type": "local",
    9. "filter" : "*.csv",
    10. "baseDir": "/data/directory",
    11. "files": ["/bar/foo", "/foo/bar"]
    12. },
    13. {
    14. "type": "druid",
    15. "dataSource": "wikipedia",
    16. "interval": "2013-01-01/2013-01-02"
    17. }
    18. ]
    19. },
    20. "inputFormat": {
    21. "type": "csv"
    22. },
    23. ...
    24. },
    25. ...

    Firehoses are deprecated in 0.17.0. It’s highly recommended to use the Input source instead. There are several firehoses readily available in Druid, some are meant for examples, others can be used directly in a production environment.

    StaticS3Firehose

    This firehose ingests events from a predefined list of S3 objects. This firehose is splittable and can be used by the Parallel task. Since each split represents an object in this firehose, each worker task of index_parallel will read an object.

    Sample spec:

    1. "firehose" : {
    2. "type" : "static-s3",
    3. "uris": ["s3://foo/bar/file.gz", "s3://bar/foo/file2.gz"]
    4. }

    This firehose provides caching and prefetching features. In the Simple task, a firehose can be read twice if intervals or shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow. Note that prefetching or caching isn’t that useful in the Parallel task.

    propertydescriptiondefaultrequired?
    typeThis should be static-s3.Noneyes
    urisJSON array of URIs where s3 files to be ingested are located.Noneuris or prefixes must be set
    prefixesJSON array of URI prefixes for the locations of s3 files to be ingested.Noneuris or prefixes must be set
    maxCacheCapacityBytesMaximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.1073741824no
    maxFetchCapacityBytesMaximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.1073741824no
    prefetchTriggerBytesThreshold to trigger prefetching s3 objects.maxFetchCapacityBytes / 2no
    fetchTimeoutTimeout for fetching an s3 object.60000no
    maxFetchRetryMaximum retry for fetching an s3 object.3no

    StaticGoogleBlobStoreFirehose

    You need to include the druid-google-extensions as an extension to use the StaticGoogleBlobStoreFirehose.

    This firehose ingests events, similar to the StaticS3Firehose, but from an Google Cloud Store.

    As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz

    This firehose is splittable and can be used by the . Since each split represents an object in this firehose, each worker task of index_parallel will read an object.

    Sample spec:

    1. "firehose" : {
    2. "type" : "static-google-blobstore",
    3. "blobs": [
    4. {
    5. "bucket": "foo",
    6. "path": "/path/to/your/file.json"
    7. },
    8. {
    9. "bucket": "bar",
    10. "path": "/another/path.json"
    11. }
    12. ]
    13. }

    This firehose provides caching and prefetching features. In the Simple task, a firehose can be read twice if intervals or shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow. Note that prefetching or caching isn’t that useful in the Parallel task.

    propertydescriptiondefaultrequired?
    typeThis should be static-google-blobstore.Noneyes
    blobsJSON array of Google Blobs.Noneyes
    maxCacheCapacityBytesMaximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.1073741824no
    maxFetchCapacityBytesMaximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.1073741824no
    prefetchTriggerBytesThreshold to trigger prefetching Google Blobs.maxFetchCapacityBytes / 2no
    fetchTimeoutTimeout for fetching a Google Blob.60000no
    maxFetchRetryMaximum retry for fetching a Google Blob.3no

    Google Blobs:

    propertydescriptiondefaultrequired?
    bucketName of the Google Cloud bucketNoneyes
    pathThe path where data is located.Noneyes

    HDFSFirehose

    You need to include the as an extension to use the HDFSFirehose.

    This firehose ingests events from a predefined list of files from the HDFS storage. This firehose is splittable and can be used by the Parallel task. Since each split represents an HDFS file, each worker task of index_parallel will read files.

    Sample spec:

    1. "firehose" : {
    2. "type" : "hdfs",
    3. "paths": "/foo/bar,/foo/baz"
    4. }

    This firehose provides caching and prefetching features. During native batch indexing, a firehose can be read twice if intervals are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scanning of files is slow. Note that prefetching or caching isn’t that useful in the Parallel task.

    PropertyDescriptionDefault
    typeThis should be hdfs.none (required)
    pathsHDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like * are supported in these paths.none (required)
    maxCacheCapacityBytesMaximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.1073741824
    maxFetchCapacityBytesMaximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.1073741824
    prefetchTriggerBytesThreshold to trigger prefetching files.maxFetchCapacityBytes / 2
    fetchTimeoutTimeout for fetching each file.60000
    maxFetchRetryMaximum number of retries for fetching each file.3

    LocalFirehose

    This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with string typed parsers. This Firehose is splittable and can be used by native parallel index tasks. Since each split represents a file in this Firehose, each worker task of index_parallel will read a file. A sample local Firehose spec is shown below:

    1. {
    2. "type": "local",
    3. "filter" : "*.csv",
    4. "baseDir": "/data/directory"
    5. }
    propertydescriptionrequired?
    typeThis should be “local”.yes
    filterA wildcard filter for files. See for more information.yes
    baseDirdirectory to search recursively for files to be ingested.yes

    HttpFirehose

    This Firehose can be used to read the data from remote sites via HTTP, and works with string typed parsers. This Firehose is splittable and can be used by . Since each split represents a file in this Firehose, each worker task of index_parallel will read a file. A sample HTTP Firehose spec is shown below:

    1. {
    2. "type": "http",
    3. "uris": ["http://example.com/uri1", "http://example2.com/uri2"]
    4. }

    The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header. Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.

    propertydescriptiondefault
    httpAuthenticationUsernameUsername to use for authentication with specified URIsNone
    httpAuthenticationPasswordPasswordProvider to use with specified URIsNone

    Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):

    1. {
    2. "type": "http",
    3. "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
    4. "httpAuthenticationUsername": "username",
    5. "httpAuthenticationPassword": "password123"
    6. }

    You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:

    1. {
    2. "type": "http",
    3. "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
    4. "httpAuthenticationUsername": "username",
    5. "httpAuthenticationPassword": {
    6. "type": "environment",
    7. "variable": "HTTP_FIREHOSE_PW"
    8. }
    9. }

    The below configurations can optionally be used for tuning the Firehose performance. Note that prefetching or caching isn’t that useful in the Parallel task.

    propertydescriptiondefault
    maxCacheCapacityBytesMaximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.1073741824
    maxFetchCapacityBytesMaximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.1073741824
    prefetchTriggerBytesThreshold to trigger prefetching HTTP objects.maxFetchCapacityBytes / 2
    fetchTimeoutTimeout for fetching an HTTP object.60000
    maxFetchRetryMaximum retries for fetching an HTTP object.3

    IngestSegmentFirehose

    This Firehose can be used to read the data from existing druid segments, potentially using a new schema and changing the name, dimensions, metrics, rollup, etc. of the segment. This Firehose is splittable and can be used by . This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification. A sample ingest Firehose spec is shown below:

    1. {
    2. "type": "ingestSegment",
    3. "dataSource": "wikipedia",
    4. "interval": "2013-01-01/2013-01-02"
    5. }
    propertydescriptionrequired?
    typeThis should be “ingestSegment”.yes
    dataSourceA String defining the data source to fetch rows from, very similar to a table in a relational databaseyes
    intervalA String representing the ISO-8601 interval. This defines the time range to fetch the data over.yes
    dimensionsThe list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned.no
    metricsThe list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.no
    filterSee Filtersno
    maxInputSegmentBytesPerTaskDeprecated. Use instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.no

    SqlFirehose

    This Firehose can be used to ingest events residing in an RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background, up to maxFetchCapacityBytes bytes. This Firehose is splittable and can be used by . This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification. See the extension documentation for more detailed ingestion examples.

    Requires one of the following extensions:

    1. {
    2. "type": "sql",
    3. "database": {
    4. "type": "mysql",
    5. "connectorConfig": {
    6. "connectURI": "jdbc:mysql://host:port/schema",
    7. "user": "user",
    8. "password": "password"
    9. }
    10. },
    11. "sqls": ["SELECT * FROM table1", "SELECT * FROM table2"]
    12. }
    propertydescriptiondefaultrequired?
    typeThis should be “sql”.Yes
    databaseSpecifies the database connection details. The database type corresponds to the extension that supplies the connectorConfig support. The specified extension must be loaded into Druid:



    You can selectively allow JDBC properties in connectURI. See JDBC connections security config for more details.
    Yes
    maxCacheCapacityBytesMaximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.1073741824No
    maxFetchCapacityBytesMaximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.1073741824No
    prefetchTriggerBytesThreshold to trigger prefetching SQL result objects.maxFetchCapacityBytes / 2No
    fetchTimeoutTimeout for fetching the result set.60000No
    foldCaseToggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.falseNo
    sqlsList of SQL queries where each SQL query would retrieve the data to be indexed.Yes

    Database

    propertydescriptiondefaultrequired?
    typeThe type of database to query. Valid values are mysql and postgresql_Yes
    connectorConfigSpecify the database connection properties via connectURI, user and passwordYes

    InlineFirehose

    This Firehose can be used to read the data inlined in its own spec. It can be used for demos or for quickly testing out parsing and schema, and works with string typed parsers. A sample inline Firehose spec is shown below:

    1. {
    2. "type": "inline",
    3. "data": "0,values,formatted\n1,as,CSV"
    propertydescriptionrequired?
    typeThis should be “inline”.yes
    dataInlined data to ingest.yes
    propertydescriptionrequired?
    typeThis should be “combining”yes
    delegatesList of Firehoses to combine data fromyes