Hadoop-based ingestion

To run a Hadoop-based ingestion task, write an ingestion spec as specified below. Then POST it to the endpoint on the Overlord, or use the bin/post-index-task script included with Druid.

This page contains reference documentation for Hadoop-based ingestion. For a walk-through instead, check out the tutorial.

Task syntax

A sample task is shown below:

Also note that Druid automatically computes the classpath for Hadoop job containers that run in the Hadoop cluster. But in case of conflicts between Hadoop and Druid’s dependencies, you can manually specify the classpath by setting druid.extensions.hadoopContainerDruidClasspath property. See the extensions config in .

dataSchema

This field is required. See the section of the main ingestion page for details on what it should contain.

This field is required.

FieldTypeDescriptionRequired
typeStringThis should always be ‘hadoop’.yes
inputSpecObjectA specification of where to pull the data in from. See below.yes
segmentOutputPathStringThe path to dump segments into.Only used by the . This field must be null otherwise.
metadataUpdateSpecObjectA specification of how to update the metadata for the druid cluster these segments belong to.Only used by the Command-line Hadoop indexer. This field must be null otherwise.

There are multiple types of inputSpecs:

static

A type of inputSpec where a static path to the data files is provided.

FieldTypeDescriptionRequired
inputFormatStringSpecifies the Hadoop InputFormat class to use. e.g. org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormatno
pathsStringComma-separated input paths to the raw data. Druid ingests data only from the configured paths. It does not search recursively for data in subdirectories.yes

For example, using the static input paths:

  1. "paths" : "hdfs://path/to/data/is/here/data.gz,hdfs://path/to/data/is/here/moredata.gz,hdfs://path/to/data/is/here/evenmoredata.gz"

You can also read from cloud storage such as AWS S3 or Google Cloud Storage. To do so, you need to install the necessary library under Druid’s classpath in all MiddleManager or Indexer processes. For S3, you can run the below command to install the .

  1. java -classpath "${DRUID_HOME}lib/*" org.apache.druid.cli.Main tools pull-deps -h "org.apache.hadoop:hadoop-aws:${HADOOP_VERSION}";
  2. cp ${DRUID_HOME}/hadoop-dependencies/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar ${DRUID_HOME}/extensions/druid-hdfs-storage/

Once you install the Hadoop AWS module in all MiddleManager and Indexer processes, you can put your S3 paths in the inputSpec with the below job properties. For more configurations, see the Hadoop AWS module.

  1. "paths" : "s3a://billy-bucket/the/data/is/here/data.gz,s3a://billy-bucket/the/data/is/here/moredata.gz,s3a://billy-bucket/the/data/is/here/evenmoredata.gz"
  1. "jobProperties" : {
  2. "fs.s3a.impl" : "org.apache.hadoop.fs.s3a.S3AFileSystem",
  3. "fs.AbstractFileSystem.s3a.impl" : "org.apache.hadoop.fs.s3a.S3A",
  4. "fs.s3a.access.key" : "YOUR_ACCESS_KEY",
  5. "fs.s3a.secret.key" : "YOUR_SECRET_KEY"
  6. }

For Google Cloud Storage, you need to install under ${DRUID_HOME}/hadoop-dependencies in all MiddleManager or Indexer processes. Once you install the GCS Connector jar in all MiddleManager and Indexer processes, you can put your Google Cloud Storage paths in the inputSpec with the below job properties. For more configurations, see the instructions to configure Hadoop, and GCS core template.

  1. "paths" : "gs://billy-bucket/the/data/is/here/data.gz,gs://billy-bucket/the/data/is/here/moredata.gz,gs://billy-bucket/the/data/is/here/evenmoredata.gz"

granularity

A type of inputSpec that expects data to be organized in directories according to datetime using the path format: y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX (where date is represented by lowercase and time is represented by uppercase).

FieldTypeDescriptionRequired
dataGranularityStringSpecifies the granularity to expect the data at, e.g. hour means to expect directories y=XXXX/m=XX/d=XX/H=XX.yes
inputFormatStringSpecifies the Hadoop InputFormat class to use. e.g. org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormatno
inputPathStringBase path to append the datetime path to.yes
filePatternStringPattern that files should match to be included.yes
pathFormatStringJoda datetime format for each directory. Default value is “‘y’=yyyy/‘m’=MM/‘d’=dd/‘H’=HH”, or see Joda documentationno

For example, if the sample config were run with the interval 2012-06-01/2012-06-02, it would expect data at the paths:

  1. s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=00
  2. s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=01
  3. ...
  4. s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=23

dataSource

FieldTypeDescriptionRequired
typeString.This should always be ‘dataSource’.yes
ingestionSpecJSON object.Specification of Druid segments to be loaded. See below.yes
maxSplitSizeNumberEnables combining multiple segments into single Hadoop InputSplit according to size of segments. With -1, druid calculates max split size based on user specified number of map task(mapred.map.tasks or mapreduce.job.maps). By default, one split is made for one segment. maxSplitSize is specified in bytes.no
useNewAggsBooleanIf “false”, then list of aggregators in “metricsSpec” of hadoop indexing task must be same as that used in original indexing task while ingesting raw data. Default value is “false”. This field can be set to “true” when “inputSpec” type is “dataSource” and not “multi” to enable arbitrary aggregators while reindexing. See below for “multi” type support for delta-ingestion.no

Here is what goes inside ingestionSpec:

For example

  1. "ioConfig" : {
  2. "type" : "hadoop",
  3. "inputSpec" : {
  4. "ingestionSpec" : {
  5. "dataSource": "wikipedia",
  6. "intervals": ["2014-10-20T00:00:00Z/P2W"]
  7. }
  8. },
  9. ...
  10. }

multi

This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. You can also use a multi inputSpec to combine data from multiple dataSources. However, each particular dataSource can only be specified one time. Note that, “useNewAggs” must be set to default value false to support delta-ingestion.

FieldTypeDescriptionRequired
childrenArray of JSON objectsList of JSON objects containing other inputSpecs.yes

For example:

  1. "ioConfig" : {
  2. "type" : "hadoop",
  3. "inputSpec" : {
  4. "type" : "multi",
  5. "children": [
  6. "type" : "dataSource",
  7. "ingestionSpec" : {
  8. "dataSource": "wikipedia",
  9. "intervals": ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"],
  10. "segments": [
  11. {
  12. "dataSource": "test1",
  13. "interval": "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000",
  14. "version": "v2",
  15. "loadSpec": {
  16. "type": "local",
  17. "path": "/tmp/index1.zip"
  18. },
  19. "dimensions": "host",
  20. "metrics": "visited_sum,unique_hosts",
  21. "shardSpec": {
  22. "type": "none"
  23. },
  24. "binaryVersion": 9,
  25. "size": 2,
  26. "identifier": "test1_2000-01-01T00:00:00.000Z_3000-01-01T00:00:00.000Z_v2"
  27. }
  28. ]
  29. }
  30. },
  31. {
  32. "type" : "static",
  33. "paths": "/path/to/more/wikipedia/data/"
  34. }
  35. ]
  36. },
  37. }

It is STRONGLY RECOMMENDED to provide list of segments in dataSource inputSpec explicitly so that your delta ingestion task is idempotent. You can obtain that list of segments by making following call to the Coordinator. POST /druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments?full Request Body: [interval1, interval2,…] for example [“2012-01-01T00:00:00.000/2012-01-03T00:00:00.000”, “2012-01-05T00:00:00.000/2012-01-07T00:00:00.000”]

tuningConfig

The tuningConfig is optional and default parameters will be used if no tuningConfig is specified.

FieldTypeDescriptionRequired
workingPathStringThe working path to use for intermediate results (results between Hadoop jobs).Only used by the Command-line Hadoop indexer. The default is ‘/tmp/druid-indexing’. This field must be null otherwise.
versionStringThe version of created segments. Ignored for HadoopIndexTask unless useExplicitVersion is set to trueno (default == datetime that indexing starts at)
partitionsSpecObjectA specification of how to partition each time bucket into segments. Absence of this property means no partitioning will occur. See below.no (default == ‘hashed’)
maxRowsInMemoryIntegerThe number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.no (default == 1000000)
maxBytesInMemoryLongThe number of bytes to aggregate in heap memory before persisting. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). Note that maxBytesInMemory also includes heap usage of artifacts created from intermediary persists. This means that after every persist, the amount of maxBytesInMemory until next persist will decreases, and task will fail when the sum of bytes of all intermediary persisted artifacts exceeds maxBytesInMemory.no (default == One-sixth of max JVM memory)
leaveIntermediateBooleanLeave behind intermediate files (for debugging) in the workingPath when a job completes, whether it passes or fails.no (default == false)
cleanupOnFailureBooleanClean up intermediate files when a job fails (unless leaveIntermediate is on).no (default == true)
overwriteFilesBooleanOverride existing files found during indexing.no (default == false)
ignoreInvalidRowsBooleanDEPRECATED. Ignore rows found to have problems. If false, any exception encountered during parsing will be thrown and will halt ingestion; if true, unparseable rows and fields will be skipped. If maxParseExceptions is defined, this property is ignored.no (default == false)
combineTextBooleanUse CombineTextInputFormat to combine multiple files into a file split. This can speed up Hadoop jobs when processing a large number of small files.no (default == false)
useCombinerBooleanUse Hadoop combiner to merge rows at mapper if possible.no (default == false)
jobPropertiesObjectA map of properties to add to the Hadoop job configuration, see below for details.no (default == null)
indexSpecObjectTune how data is indexed. See indexSpec on the main ingestion page for more information.no
indexSpecForIntermediatePersistsObjectdefines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see for possible values.no (default = same as indexSpec)
numBackgroundPersistThreadsIntegerThe number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and CPU usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.no (default == 0)
forceExtendableShardSpecsBooleanForces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check Partitioning specification. This option can be useful when you need to append more data to existing dataSource.no (default = false)
useExplicitVersionBooleanForces HadoopIndexTask to use version.no (default = false)
logParseExceptionsBooleanIf true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.no(default = false)
maxParseExceptionsIntegerThe maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overrides ignoreInvalidRows if maxParseExceptions is defined.no(default = unlimited)
useYarnRMJobStatusFallbackBooleanIf the Hadoop jobs created by the indexing task are unable to retrieve their completion status from the JobHistory server, and this parameter is true, the indexing task will try to fetch the application status from ;, where <yarn-rm-address> is the value of yarn.resourcemanager.webapp.address in your Hadoop configuration. This flag is intended as a fallback for cases where an indexing task’s jobs succeed, but the JobHistory server is unavailable, causing the indexing task to fail because it cannot determine the job statuses.no (default = true)
awaitSegmentAvailabilityTimeoutMillisLongMilliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If <= 0, no wait will occur. If > 0, the task will wait for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task will exit as successful, but the segments were not confirmed to have become available for query.no (default = 0)

jobProperties

  1. "tuningConfig" : {
  2. "type": "hadoop",
  3. "jobProperties": {
  4. "<hadoop-property-a>": "<value-a>",
  5. "<hadoop-property-b>": "<value-b>"
  6. }
  7. }

Hadoop’s lists the possible configuration parameters.

With some Hadoop distributions, it may be necessary to set mapreduce.job.classpath or mapreduce.job.user.classpath.first to avoid class loading issues. See the working with different Hadoop versions documentation for more details.

partitionsSpec

Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in some other way depending on partition type. Druid supports two types of partitioning strategies: hashed (based on the hash of all dimensions in each row), and single_dim (based on ranges of a single dimension).

Hashed partitioning is recommended in most cases, as it will improve indexing performance and create more uniformly sized data segments relative to single-dimension partitioning.

  1. "type": "hashed",
  2. "targetRowsPerSegment": 5000000
  3. }

Hashed partitioning works by first selecting a number of segments, and then partitioning rows across those segments according to the hash of all dimensions in each row. The number of segments is determined automatically based on the cardinality of the input set and a target partition size.

The configuration options are:

FieldDescriptionRequired
typeType of partitionSpec to be used.“hashed”
targetRowsPerSegmentTarget number of rows to include in a partition, should be a number that targets segments of 500MB~1GB. Defaults to 5000000 if numShards is not set.either this or numShards
targetPartitionSizeDeprecated. Renamed to targetRowsPerSegment. Target number of rows to include in a partition, should be a number that targets segments of 500MB~1GB.either this or numShards
maxRowsPerSegmentDeprecated. Renamed to targetRowsPerSegment. Target number of rows to include in a partition, should be a number that targets segments of 500MB~1GB.either this or numShards
numShardsSpecify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.either this or targetRowsPerSegment
partitionDimensionsThe dimensions to partition on. Leave blank to select all dimensions. Only used with numShards, will be ignored when targetRowsPerSegment is set.no
partitionFunctionA function to compute hash of partition dimensions. See murmur3_32_absno
Hash partition function

In hash partitioning, the partition function is used to compute hash of partition dimensions. The partition dimension values are first serialized into a byte array as a whole, and then the partition function is applied to compute hash of the byte array. Druid currently supports only one partition function.

namedescription
murmur3_32_absApplies an absolute value function to the result of .

Single-dimension range partitioning

Single-dimension range partitioning works by first selecting a dimension to partition on, and then separating that dimension into contiguous ranges. Each segment will contain all rows with values of that dimension in that range. For example, your segments may be partitioned on the dimension “host” using the ranges “a.example.com” to “f.example.com” and “f.example.com” to “z.example.com”. By default, the dimension to use is determined automatically, although you can override it with a specific dimension.

If you have a remote Hadoop cluster, make sure to include the folder holding your configuration *.xml files in your Druid _common configuration folder.

If you are having dependency problems with your version of Hadoop and the version compiled with Druid, please see these docs.

Elastic MapReduce

If your cluster is running on Amazon Web Services, you can use Elastic MapReduce (EMR) to index data from S3. To do this:

  • Create a persistent, long-running cluster.
  • When creating your cluster, enter the following configuration. If you’re using the wizard, this should be in advanced mode under “Edit software settings”:
  1. classification=yarn-site,properties=[mapreduce.reduce.memory.mb=6144,mapreduce.reduce.java.opts=-server -Xms2g -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps,mapreduce.map.java.opts=758,mapreduce.map.java.opts=-server -Xms512m -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps,mapreduce.task.timeout=1800000]
  • Follow the instructions under using the XML files from /etc/hadoop/conf on your EMR master.

Kerberized Hadoop clusters

By default druid can use the existing TGT kerberos ticket available in local kerberos key cache. Although TGT ticket has a limited life cycle, therefore you need to call kinit command periodically to ensure validity of TGT ticket. To avoid this extra external cron job script calling kinit periodically, you can provide the principal name and keytab location and druid will do the authentication transparently at startup and job launching time.

PropertyPossible ValuesDescriptionDefault
druid.hadoop.security.kerberos.principaldruid@EXAMPLE.COMPrincipal user nameempty
druid.hadoop.security.kerberos.keytab/etc/security/keytabs/druid.headlessUser.keytabPath to keytab fileempty
  • In the jobProperties field in the tuningConfig section of your Hadoop indexing task, add:
  1. "jobProperties" : {
  2. "fs.s3.awsAccessKeyId" : "YOUR_ACCESS_KEY",
  3. "fs.s3.awsSecretAccessKey" : "YOUR_SECRET_KEY",
  4. "fs.s3.impl" : "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
  5. "fs.s3n.awsAccessKeyId" : "YOUR_ACCESS_KEY",
  6. "fs.s3n.awsSecretAccessKey" : "YOUR_SECRET_KEY",
  7. "fs.s3n.impl" : "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
  8. "io.compression.codecs" : "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec"
  9. }

Note that this method uses Hadoop’s built-in S3 filesystem rather than Amazon’s EMRFS, and is not compatible with Amazon-specific features such as S3 encryption and consistent views. If you need to use these features, you will need to make the Amazon EMR Hadoop JARs available to Druid through one of the mechanisms described in the Using other Hadoop distributions section.

Druid works out of the box with many Hadoop distributions.

If you are having dependency conflicts between Druid and your version of Hadoop, you can try searching for a solution in the Druid user groups, or reading the Druid documentation.

Command line (non-task) version

To run:

  1. java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:<hadoop_config_dir> org.apache.druid.cli.Main index hadoop <spec_file>

Options

  • “—coordinate” - provide a version of Apache Hadoop to use. This property will override the default Hadoop coordinates. Once specified, Apache Druid will look for those Hadoop dependencies from the location specified by druid.extensions.hadoopDependenciesDir.
  • “—no-default-hadoop” - don’t pull down the default hadoop version

The spec file needs to contain a JSON object where the contents are the same as the “spec” field in the Hadoop index task. See for details on the spec format.

In addition, a metadataUpdateSpec and segmentOutputPath field needs to be added to the ioConfig:

  1. "ioConfig" : {
  2. ...
  3. "metadataUpdateSpec" : {
  4. "type":"mysql",
  5. "connectURI" : "jdbc:mysql://localhost:3306/druid",
  6. "password" : "diurd",
  7. "segmentTable" : "druid_segments",
  8. "user" : "druid"
  9. },
  10. "segmentOutputPath" : "/MyDirectory/data/index/output"
  11. },

and a workingPath field needs to be added to the tuningConfig:

  1. "tuningConfig" : {
  2. ...
  3. "workingPath": "/tmp",
  4. ...

Metadata Update Job Spec

This is a specification of the properties that tell the job how to update metadata such that the Druid cluster will see the output segments and load them.

FieldTypeDescriptionRequired
typeString“metadata” is the only value available.yes
connectURIStringA valid JDBC url to metadata storage.yes
userStringUsername for db.yes
passwordStringpassword for db.yes
segmentTableStringTable to use in DB.yes

These properties should parrot what you have configured for your .

segmentOutputPath Config

FieldTypeDescriptionRequired
segmentOutputPathStringthe path to dump segments into.yes

workingPath Config

FieldTypeDescriptionRequired
workingPathStringthe working path to use for intermediate results (results between Hadoop jobs).no (default == ‘/tmp/druid-indexing’)

Please note that the command line Hadoop indexer doesn’t have the locking capabilities of the indexing service, so if you choose to use it, you have to take caution to not override segments created by real-time processing (if you that a real-time pipeline set up).