Native batch input sources

    For general information on native batch indexing and parallel task indexing, see Native batch ingestion.

    The S3 input source reads objects directly from S3. You can specify either:

    • a list of S3 URI strings
    • a list of S3 location prefixes that attempts to list the contents and ingest all objects contained within the locations.

    The S3 input source is splittable. Therefore, you can use it with the Parallel task. Each worker task of reads one or multiple objects.

    Sample specs:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "s3",
    6. "prefixes": ["s3://foo/bar/", "s3://bar/foo/"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "s3",
    6. "objects": [
    7. { "bucket": "foo", "path": "bar/file1.json"},
    8. { "bucket": "bar", "path": "foo/file2.json"}
    9. ]
    10. },
    11. "inputFormat": {
    12. "type": "json"
    13. },
    14. ...
    15. },
    16. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "s3",
    6. "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
    7. "properties": {
    8. "accessKeyId": "KLJ78979SDFdS2",
    9. "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd"
    10. }
    11. },
    12. "inputFormat": {
    13. "type": "json"
    14. },
    15. ...
    16. },
    17. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "s3",
    6. "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
    7. "properties": {
    8. "accessKeyId": "KLJ78979SDFdS2",
    9. "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd",
    10. "assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3"
    11. }
    12. },
    13. "inputFormat": {
    14. "type": "json"
    15. },
    16. ...
    17. },
    18. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "s3",
    6. "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
    7. "endpointConfig": {
    8. "url" : "s3-store.aws.com",
    9. "signingRegion" : "us-west-2"
    10. },
    11. "clientConfig": {
    12. "protocol" : "http",
    13. "disableChunkedEncoding" : true,
    14. "enablePathStyleAccess" : true,
    15. "forceGlobalBucketAccessEnabled" : false
    16. },
    17. "proxyConfig": {
    18. "host" : "proxy-s3.aws.com",
    19. "port" : 8888,
    20. "username" : "admin",
    21. "password" : "admin"
    22. },
    23. "properties": {
    24. "accessKeyId": "KLJ78979SDFdS2",
    25. "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd",
    26. "assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3"
    27. }
    28. },
    29. "inputFormat": {
    30. "type": "json"
    31. },
    32. ...
    33. },
    34. ...

    Note that the S3 input source will skip all empty objects only when prefixes is specified.

    S3 Object:

    PropertyDescriptionDefaultRequired
    bucketName of the S3 bucketNoneyes
    pathThe path where data is located.Noneyes

    Properties Object:

    PropertyDescriptionDefaultRequired
    accessKeyIdThe or plain text string of this S3 input source access keyNoneyes if secretAccessKey is given
    secretAccessKeyThe Password Provider or plain text string of this S3 input source secret keyNoneyes if accessKeyId is given
    assumeRoleArnAWS ARN of the role to assume . assumeRoleArn can be used either with the ingestion spec AWS credentials or with the default S3 credentialsNoneno
    assumeRoleExternalIdA unique identifier that might be required when you assume a role in another account seeNoneno

    Note: If accessKeyId and secretAccessKey are not given, the default is used.

    Google Cloud Storage input source

    The Google Cloud Storage input source is to support reading objects directly from Google Cloud Storage. Objects can be specified as list of Google Cloud Storage URI strings. The Google Cloud Storage input source is splittable and can be used by the , where each worker task of index_parallel will read one or multiple objects.

    Sample specs:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "google",
    6. "uris": ["gs://foo/bar/file.json", "gs://bar/foo/file2.json"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "google",
    6. "prefixes": ["gs://foo/bar/", "gs://bar/foo/"]
    7. "inputFormat": {
    8. "type": "json"
    9. },
    10. ...
    11. },
    12. ...
    PropertyDescriptionDefaultRequired
    typeSet the value to google.Noneyes
    urisJSON array of URIs where Google Cloud Storage objects to be ingested are located.Noneuris or prefixes or objects must be set
    prefixesJSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.Noneuris or prefixes or objects must be set
    objectsJSON array of Google Cloud Storage objects to be ingested.Noneuris or prefixes or objects must be set

    Note that the Google Cloud Storage input source will skip all empty objects only when prefixes is specified.

    Google Cloud Storage object:

    Azure input source

    You need to include the as an extension to use the Azure input source.

    The Azure input source reads objects directly from Azure Blob store or Azure Data Lake sources. You can specify objects as a list of file URI strings or prefixes. You can split the Azure input source for use with Parallel task indexing and each worker task reads one chunk of the split data.

    Sample specs:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "type": "azure",
    5. "uris": ["azure://container/prefix1/file.json", "azure://container/prefix2/file2.json"]
    6. },
    7. "inputFormat": {
    8. "type": "json"
    9. },
    10. ...
    11. },
    12. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "azure",
    6. "prefixes": ["azure://container/prefix1/", "azure://container/prefix2/"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "azure",
    6. "objects": [
    7. { "bucket": "container", "path": "prefix1/file1.json"},
    8. { "bucket": "container", "path": "prefix2/file2.json"}
    9. ]
    10. },
    11. "inputFormat": {
    12. "type": "json"
    13. },
    14. ...
    15. },
    16. ...
    PropertyDescriptionDefaultRequired
    typeSet the value to azure.Noneyes
    urisJSON array of URIs where the Azure objects to be ingested are located, in the form azure://<container>/<path-to-file>Noneuris or prefixes or objects must be set
    prefixesJSON array of URI prefixes for the locations of Azure objects to ingest, in the form azure://<container>/<prefix>. Empty objects starting with one of the given prefixes are skipped.Noneuris or prefixes or objects must be set
    objectsJSON array of Azure objects to ingest.Noneuris or prefixes or objects must be set

    Note that the Azure input source skips all empty objects only when prefixes is specified.

    PropertyDescriptionDefaultRequired
    bucketName of the Azure Blob Storage or Azure Data Lake containerNoneyes
    pathThe path where data is located.Noneyes

    The HDFS input source is to support reading files directly from HDFS storage. File paths can be specified as an HDFS URI string or a list of HDFS URI strings. The HDFS input source is splittable and can be used by the Parallel task, where each worker task of index_parallel will read one or multiple files.

    Sample specs:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "hdfs",
    6. "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo"
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "hdfs",
    6. "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo"
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "hdfs",
    6. "paths": "hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "hdfs",
    6. "paths": ["hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"]
    7. },
    8. "inputFormat": {
    9. "type": "json"
    10. },
    11. ...
    12. },
    13. ...
    PropertyDescriptionDefaultRequired
    typeSet the value to hdfs.Noneyes
    pathsHDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like * are supported in these paths. Empty files located under one of the given paths will be skipped.Noneyes

    You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage. However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage. If you want to use a non-hdfs protocol with the HDFS input source, include the protocol in druid.ingestion.hdfs.allowedProtocols. See for more details.

    HTTP input source

    The HTTP input source is to support reading files directly from remote sites via HTTP.

    Security notes: Ingestion tasks run under the operating system account that runs the Druid processes, for example the Indexer, Middle Manager, and Peon. This means any user who can submit an ingestion task can specify an input source referring to any location that the Druid process can access. For example, using http input source, users may have access to internal network servers.

    The http input source is not limited to the HTTP or HTTPS protocols. It uses the Java URI class that supports HTTP, HTTPS, FTP, file, and jar protocols by default.

    For more information about security best practices, see .

    The HTTP input source is splittable and can be used by the Parallel task, where each worker task of index_parallel will read only one file. This input source does not support Split Hint Spec.

    Sample specs:

    Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "http",
    6. "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
    7. "httpAuthenticationUsername": "username",
    8. "httpAuthenticationPassword": "password123"
    9. },
    10. "inputFormat": {
    11. "type": "json"
    12. },
    13. ...
    14. },
    15. ...

    You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "http",
    6. "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
    7. "httpAuthenticationUsername": "username",
    8. "httpAuthenticationPassword": {
    9. "type": "environment",
    10. "variable": "HTTP_INPUT_SOURCE_PW"
    11. }
    12. },
    13. "inputFormat": {
    14. "type": "json"
    15. },
    16. },
    17. ...
    18. }

    You can only use protocols listed in the druid.ingestion.http.allowedProtocols property as HTTP input sources. The http and https protocols are allowed by default. See for more details.

    Inline input source

    The Inline input source can be used to read the data inlined in its own spec. It can be used for demos or for quickly testing out parsing and schema.

    Sample spec:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "inline",
    6. "data": "0,values,formatted\n1,as,CSV"
    7. "inputFormat": {
    8. "type": "csv"
    9. },
    10. ...
    11. },
    12. ...
    PropertyDescriptionRequired
    typeSet the value to inline.yes
    dataInlined data to ingest.yes

    The Local input source is to support reading files directly from local storage, and is mainly intended for proof-of-concept testing. The Local input source is splittable and can be used by the Parallel task, where each worker task of index_parallel will read one or multiple files.

    Sample spec:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "local",
    6. "filter" : "*.csv",
    7. "baseDir": "/data/directory",
    8. "files": ["/bar/foo", "/foo/bar"]
    9. },
    10. "inputFormat": {
    11. "type": "csv"
    12. },
    13. ...
    14. },
    15. ...
    PropertyDescriptionRequired
    typeSet the value to local.yes
    filterA wildcard filter for files. See for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.yes if baseDir is specified
    typeThis should be “local”.yes
    baseDirDirectory to search recursively for files to be ingested. Empty files under the baseDir will be skipped.At least one of baseDir or files should be specified
    filesFile paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified baseDir. Empty files will be skipped.At least one of baseDir or files should be specified

    Druid input source

    The Druid input source is to support reading data directly from existing Druid segments, potentially using a new schema and changing the name, dimensions, metrics, rollup, etc. of the segment. The Druid input source is splittable and can be used by the . This input source has a fixed input format for reading from Druid segments; no inputFormat field needs to be specified in the ingestion spec when using this input source.

    PropertyDescriptionRequired
    typeSet the value to druid.yes
    dataSourceA String defining the Druid datasource to fetch rows fromyes
    intervalA String representing an ISO-8601 interval, which defines the time range to fetch the data over.yes
    filterSee Filters. Only rows that match the filter, if specified, will be returned.no
    • Creating new datasources that are rolled-up copies of existing datasources.
    • Changing the of a datasource to improve performance.
    • Updating or removing rows using a transformSpec.

    When using the Druid input source, the timestamp column shows up as a numeric field named __time set to the number of milliseconds since the epoch (January 1, 1970 00:00:00 UTC). It is common to use this in the timestampSpec, if you want the output timestamp to be equivalent to the input timestamp. In this case, set the timestamp column to __time and the format to auto or millis.

    It is OK for the input and output datasources to be the same. In this case, newly generated data will overwrite the previous data for the intervals specified in the granularitySpec. Generally, if you are going to do this, it is a good idea to test out your reindexing by writing to a separate datasource before overwriting your main one. Alternatively, if your goals can be satisfied by , consider that instead as a simpler approach.

    An example task spec is shown below. It reads from a hypothetical raw datasource wikipedia_raw and creates a new rolled-up datasource wikipedia_rollup by grouping on hour, “countryName”, and “page”.

    1. {
    2. "type": "index_parallel",
    3. "spec": {
    4. "dataSchema": {
    5. "dataSource": "wikipedia_rollup",
    6. "timestampSpec": {
    7. "column": "__time",
    8. "format": "millis"
    9. },
    10. "dimensionsSpec": {
    11. "dimensions": [
    12. "countryName",
    13. "page"
    14. ]
    15. },
    16. "metricsSpec": [
    17. {
    18. "type": "count",
    19. "name": "cnt"
    20. }
    21. ],
    22. "granularitySpec": {
    23. "type": "uniform",
    24. "queryGranularity": "HOUR",
    25. "segmentGranularity": "DAY",
    26. "intervals": ["2016-06-27/P1D"],
    27. "rollup": true
    28. }
    29. },
    30. "ioConfig": {
    31. "type": "index_parallel",
    32. "inputSource": {
    33. "type": "druid",
    34. "dataSource": "wikipedia_raw",
    35. "interval": "2016-06-27/P1D"
    36. }
    37. },
    38. "tuningConfig": {
    39. "type": "index_parallel",
    40. "partitionsSpec": {
    41. "type": "hashed"
    42. },
    43. "forceGuaranteedRollup": true,
    44. "maxNumConcurrentSubTasks": 1
    45. }
    46. }
    47. }

    SQL input source

    The SQL input source is used to read data directly from RDBMS. The SQL input source is splittable and can be used by the , where each worker task will read from one SQL query from the list of queries. This input source does not support Split Hint Spec. Since this input source has a fixed input format for reading events, no inputFormat field needs to be specified in the ingestion spec when using this input source. Please refer to the Recommended practices section below before using this input source.

    The following is an example of an SQL input source spec:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "sql",
    6. "database": {
    7. "type": "mysql",
    8. "connectorConfig": {
    9. "connectURI": "jdbc:mysql://host:port/schema",
    10. "user": "user",
    11. "password": "password"
    12. }
    13. },
    14. "sqls": ["SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'"]
    15. }
    16. },
    17. ...

    The spec above will read all events from two separate SQLs for the interval 2013-01-01/2013-01-02. Each of the SQL queries will be run in its own sub-task and thus for the above example, there would be two sub-tasks.

    Recommended practices

    Compared to the other native batch input sources, SQL input source behaves differently in terms of reading the input data. Therefore, consider the following points before using this input source in a production environment:

    • During indexing, each sub-task would execute one of the SQL queries and the results are stored locally on disk. The sub-tasks then proceed to read the data from these local input files and generate segments. Presently, there isn’t any restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed.

    • Filtering the SQL queries based on the intervals specified in the granularitySpec can avoid unwanted data being retrieved and stored locally by the indexing sub-tasks. For example, if the intervals specified in the granularitySpec is ["2013-01-01/2013-01-02"] and the SQL query is SELECT * FROM table1, SqlInputSource will read all the data for table1 based on the query, even though only data between the intervals specified will be indexed into Druid.

    • Pagination may be used on the SQL queries to ensure that each query pulls a similar amount of data, thereby improving the efficiency of the sub-tasks.

    • Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the granularitySpec.

    The Combining input source lets you read data from multiple input sources. It identifies the splits from delegate input sources and uses a worker task to process each split. Use the Combining input source only if all the delegates are splittable and can be used by the .

    Similar to other input sources, the Combining input source supports a single inputFormat. Delegate input sources that require an inputFormat must have the same format for input data.

    PropertyDescriptionRequired
    typeSet the value to combining.Yes
    delegatesList of splittable input sources to read data from.Yes

    The following is an example of a Combining input source spec:

    1. ...
    2. "ioConfig": {
    3. "type": "index_parallel",
    4. "inputSource": {
    5. "type": "combining",
    6. "delegates" : [
    7. {
    8. "type": "local",
    9. "filter" : "*.csv",
    10. "baseDir": "/data/directory",
    11. "files": ["/bar/foo", "/foo/bar"]
    12. },
    13. {
    14. "type": "druid",
    15. "dataSource": "wikipedia",
    16. "interval": "2013-01-01/2013-01-02"
    17. }
    18. ]
    19. },
    20. "inputFormat": {
    21. "type": "csv"
    22. },
    23. ...
    24. },
    25. ...

    The secondary partitioning method determines the requisite number of concurrent worker tasks that run in parallel to complete ingestion with the Combining input source. Set this value in maxNumConcurrentSubTasks in tuningConfig based on the secondary partitioning method:

    • range or single_dim partitioning: greater than or equal to 1

    For more information on the maxNumConcurrentSubTasks field, see .