SQL-based ingestion and multi-stage query task API

The Query view in the web console provides a friendly experience for the multi-stage query task engine (MSQ task engine) and multi-stage query architecture. We recommend using the web console if you do not need a programmatic interface.

When using the API for the MSQ task engine, the action you want to take determines the endpoint you use:

  • endpoint: Submit a query for ingestion.
  • /druid/indexer/v1/task endpoint: Interact with a query, including getting its status, getting its details, or canceling it. This page describes a few of the Overlord Task APIs that you can use with the MSQ task engine. For information about Druid APIs, see the .

You submit queries to the MSQ task engine using the POST /druid/v2/sql/task/ endpoint.

Request

The SQL task endpoint accepts SQL requests in the JSON-over-HTTP form using the query, context, and parameters fields, but ignoring the resultFormat, header, typesHeader, and sqlTypesHeader fields.

This endpoint accepts and REPLACE statements.

As an experimental feature, this endpoint also accepts SELECT queries. SELECT query results are collected from workers by the controller, and written into the as an array of arrays. The behavior and result format of plain SELECT queries (without INSERT or REPLACE) is subject to change.

HTTP

curl

  1. { "query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '{\"type\": \"http\", \"uris\": [\"https://druid.apache.org/data/wikipedia.json.gz\"]}',\n '{\"type\": \"json\"}',\n '[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n )\n)\nPARTITIONED BY DAY", "context": { "maxNumTasks": 3 }}
  1. # Make sure you replace `username`, `password`, `your-instance`, and `port` with the values for your deployment.curl --location --request POST 'https://<username>:<password>@<your-instance>:<port>/druid/v2/sql/task/' \ --header 'Content-Type: application/json' \ --data-raw '{ "query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '\''{\"type\": \"http\", \"uris\": [\"https://druid.apache.org/data/wikipedia.json.gz\"]}'\'',\n '\''{\"type\": \"json\"}'\'',\n '\''[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\''\n )\n)\nPARTITIONED BY DAY", "context": { "maxNumTasks": 3 } }'
  1. import jsonimport requests# Make sure you replace `username`, `password`, `your-instance`, and `port` with the values for your deployment.url = "https://<username>:<password>@<your-instance>:<port>/druid/v2/sql/task/"payload = json.dumps({ "query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '{\"type\": \"http\", \"uris\": [\"https://druid.apache.org/data/wikipedia.json.gz\"]}',\n '{\"type\": \"json\"}',\n '[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n )\n)\nPARTITIONED BY DAY", "context": { "maxNumTasks": 3 }})headers = { 'Content-Type': 'application/json'}response = requests.request("POST", url, headers=headers, data=payload)print(response.text)

Response

  1. {
  2. "taskId": "query-f795a235-4dc7-4fef-abac-3ae3f9686b79",
  3. "state": "RUNNING",
  4. }

Response fields

You can retrieve status of a query to see if it is still running, completed successfully, failed, or got canceled.

Request

HTTP

curl

Python

  1. # Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment.curl --location --request GET 'https://<username>:<password>@<hostname>:<port>/druid/indexer/v1/task/<taskId>/status'
  1. import requests# Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment.url = "https://<username>:<password>@<hostname>:<port>/druid/indexer/v1/task/<taskId>/status"payload={}headers = {}response = requests.request("GET", url, headers=headers, data=payload)print(response.text)

Response

  1. {
  2. "task": "query-3dc0c45d-34d7-4b15-86c9-cdb2d3ebfc4e",
  3. "status": {
  4. "id": "query-3dc0c45d-34d7-4b15-86c9-cdb2d3ebfc4e",
  5. "groupId": "query-3dc0c45d-34d7-4b15-86c9-cdb2d3ebfc4e",
  6. "type": "query_controller",
  7. "createdTime": "2022-09-14T22:12:00.183Z",
  8. "queueInsertionTime": "1970-01-01T00:00:00.000Z",
  9. "statusCode": "RUNNING",
  10. "status": "RUNNING",
  11. "runnerStatusCode": "RUNNING",
  12. "duration": -1,
  13. "location": {
  14. "host": "localhost",
  15. "port": 8100,
  16. "tlsPort": -1
  17. },
  18. "dataSource": "kttm_simple",
  19. "errorMsg": null
  20. }
  21. }

A report provides detailed information about a query task, including things like the stages, warnings, and errors.

Keep the following in mind when using the task API to view reports:

  • The task report for an entire job is associated with the query_controller task. The query_worker tasks do not have their own reports; their information is incorporated into the controller report.
  • The task report API may report 404 Not Found temporarily while the task is in the process of starting up.
  • As an experimental feature, the MSQ task engine supports running SELECT queries. SELECT query results are written into the multiStageQuery.payload.results.results task report key as an array of arrays. The behavior and result format of plain SELECT queries (without INSERT or REPLACE) is subject to change.

For an explanation of the fields in a report, see .

Request

curl

Python

  1. GET /druid/indexer/v1/task/<taskId>/reports
  1. import requests# Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment.url = "https://<username>:<password>@<hostname>:<port>/druid/indexer/v1/task/<taskId>/reports"headers = {}response = requests.request("GET", url, headers=headers)print(response.text)

Response

The response shows an example report for a query.

Show the response

  1. {
  2. "multiStageQuery": {
  3. "type": "multiStageQuery",
  4. "taskId": "query-3dc0c45d-34d7-4b15-86c9-cdb2d3ebfc4e",
  5. "payload": {
  6. "status": {
  7. "status": "SUCCESS",
  8. "startTime": "2022-09-14T22:12:09.266Z",
  9. "durationMs": 28227
  10. },
  11. "stages": [
  12. {
  13. "stageNumber": 0,
  14. "definition": {
  15. "id": "71ecb11e-09d7-42f8-9225-1662c8e7e121_0",
  16. "input": [
  17. {
  18. "type": "external",
  19. "inputSource": {
  20. "type": "http",
  21. "uris": [
  22. "https://static.imply.io/example-data/kttm-v2/kttm-v2-2019-08-25.json.gz"
  23. ],
  24. "httpAuthenticationUsername": null,
  25. "httpAuthenticationPassword": null
  26. },
  27. "inputFormat": {
  28. "type": "json",
  29. "flattenSpec": null,
  30. "featureSpec": {},
  31. "keepNullColumns": false
  32. },
  33. "signature": [
  34. {
  35. "name": "timestamp",
  36. "type": "STRING"
  37. },
  38. {
  39. "name": "agent_category",
  40. "type": "STRING"
  41. },
  42. {
  43. "name": "agent_type",
  44. "type": "STRING"
  45. }
  46. ]
  47. }
  48. ],
  49. "processor": {
  50. "type": "scan",
  51. "query": {
  52. "queryType": "scan",
  53. "dataSource": {
  54. "type": "inputNumber",
  55. "inputNumber": 0
  56. },
  57. "intervals": {
  58. "type": "intervals",
  59. "intervals": [
  60. "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
  61. ]
  62. },
  63. "resultFormat": "compactedList",
  64. "columns": [
  65. "agent_category",
  66. "timestamp"
  67. ],
  68. "context": {
  69. "finalize": false,
  70. "finalizeAggregations": false,
  71. "groupByEnableMultiValueUnnesting": false,
  72. "scanSignature": "[{\"name\":\"agent_category\",\"type\":\"STRING\"},{\"name\":\"agent_type\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"}]",
  73. "sqlInsertSegmentGranularity": "{\"type\":\"all\"}",
  74. "sqlQueryId": "3dc0c45d-34d7-4b15-86c9-cdb2d3ebfc4e",
  75. "sqlReplaceTimeChunks": "all"
  76. },
  77. "granularity": {
  78. "type": "all"
  79. }
  80. }
  81. },
  82. "signature": [
  83. {
  84. "name": "__boost",
  85. "type": "LONG"
  86. },
  87. {
  88. "name": "agent_category",
  89. "type": "STRING"
  90. },
  91. {
  92. "name": "agent_type",
  93. "type": "STRING"
  94. },
  95. {
  96. "name": "timestamp",
  97. "type": "STRING"
  98. }
  99. ],
  100. "shuffleSpec": {
  101. "type": "targetSize",
  102. "clusterBy": {
  103. "columns": [
  104. {
  105. "columnName": "__boost"
  106. }
  107. ]
  108. },
  109. "targetSize": 3000000
  110. },
  111. "maxWorkerCount": 1,
  112. "shuffleCheckHasMultipleValues": true
  113. },
  114. "phase": "FINISHED",
  115. "workerCount": 1,
  116. "partitionCount": 1,
  117. "startTime": "2022-09-14T22:12:11.663Z",
  118. "duration": 19965,
  119. "sort": true
  120. },
  121. {
  122. "stageNumber": 1,
  123. "definition": {
  124. "id": "71ecb11e-09d7-42f8-9225-1662c8e7e121_1",
  125. "input": [
  126. {
  127. "type": "stage",
  128. "stage": 0
  129. }
  130. ],
  131. "processor": {
  132. "type": "segmentGenerator",
  133. "dataSchema": {
  134. "dataSource": "kttm_simple",
  135. "timestampSpec": {
  136. "column": "__time",
  137. "format": "millis",
  138. "missingValue": null
  139. },
  140. "dimensionsSpec": {
  141. "dimensions": [
  142. {
  143. "type": "string",
  144. "name": "timestamp",
  145. "multiValueHandling": "SORTED_ARRAY",
  146. "createBitmapIndex": true
  147. },
  148. {
  149. "type": "string",
  150. "name": "agent_category",
  151. "multiValueHandling": "SORTED_ARRAY",
  152. "createBitmapIndex": true
  153. },
  154. {
  155. "type": "string",
  156. "name": "agent_type",
  157. "multiValueHandling": "SORTED_ARRAY",
  158. "createBitmapIndex": true
  159. }
  160. ],
  161. "dimensionExclusions": [
  162. "__time"
  163. ],
  164. "includeAllDimensions": false
  165. },
  166. "metricsSpec": [],
  167. "granularitySpec": {
  168. "type": "arbitrary",
  169. "queryGranularity": {
  170. "type": "none"
  171. },
  172. "rollup": false,
  173. "intervals": [
  174. "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
  175. ]
  176. },
  177. "transformSpec": {
  178. "filter": null,
  179. }
  180. "columnMappings": [
  181. {
  182. "queryColumn": "timestamp",
  183. "outputColumn": "timestamp"
  184. },
  185. {
  186. "queryColumn": "agent_category",
  187. "outputColumn": "agent_category"
  188. },
  189. {
  190. "queryColumn": "agent_type",
  191. "outputColumn": "agent_type"
  192. }
  193. ],
  194. "tuningConfig": {
  195. "maxNumWorkers": 1,
  196. "maxRowsInMemory": 100000,
  197. "rowsPerSegment": 3000000
  198. }
  199. },
  200. "signature": [],
  201. "maxWorkerCount": 1
  202. },
  203. "phase": "FINISHED",
  204. "workerCount": 1,
  205. "partitionCount": 1,
  206. "startTime": "2022-09-14T22:12:31.602Z",
  207. "duration": 5891
  208. }
  209. ],
  210. "counters": {
  211. "0": {
  212. "0": {
  213. "input0": {
  214. "type": "channel",
  215. "rows": [
  216. 465346
  217. ],
  218. "files": [
  219. 1
  220. ],
  221. "totalFiles": [
  222. 1
  223. ]
  224. },
  225. "output": {
  226. "type": "channel",
  227. "rows": [
  228. 465346
  229. ],
  230. "bytes": [
  231. 43694447
  232. ],
  233. "frames": [
  234. 7
  235. ]
  236. },
  237. "shuffle": {
  238. "type": "channel",
  239. "rows": [
  240. 465346
  241. ],
  242. "bytes": [
  243. 41835307
  244. ],
  245. "frames": [
  246. 73
  247. ]
  248. },
  249. "sortProgress": {
  250. "type": "sortProgress",
  251. "totalMergingLevels": 3,
  252. "levelToTotalBatches": {
  253. "0": 1,
  254. "1": 1,
  255. "2": 1
  256. },
  257. "levelToMergedBatches": {
  258. "0": 1,
  259. "1": 1,
  260. "2": 1
  261. },
  262. "totalMergersForUltimateLevel": 1,
  263. "progressDigest": 1
  264. }
  265. }
  266. },
  267. "1": {
  268. "0": {
  269. "input0": {
  270. "type": "channel",
  271. "rows": [
  272. 465346
  273. ],
  274. "bytes": [
  275. 41835307
  276. ],
  277. "frames": [
  278. 73
  279. ]
  280. }
  281. }
  282. }
  283. }
  284. }
  285. }
  286. }

The following table describes the response fields when you retrieve a report for a MSQ task engine using the /druid/indexer/v1/task/<taskId>/reports endpoint:

FieldDescription
multiStageQuery.taskIdController task ID.
multiStageQuery.payload.statusQuery status container.
multiStageQuery.payload.status.statusRUNNING, SUCCESS, or FAILED.
multiStageQuery.payload.status.startTimeStart time of the query in ISO format. Only present if the query has started running.
multiStageQuery.payload.status.durationMsMilliseconds elapsed after the query has started running. -1 denotes that the query hasn’t started running yet.
multiStageQuery.payload.status.errorReportError object. Only present if there was an error.
multiStageQuery.payload.status.errorReport.taskIdThe task that reported the error, if known. May be a controller task or a worker task.
multiStageQuery.payload.status.errorReport.hostThe hostname and port of the task that reported the error, if known.
multiStageQuery.payload.status.errorReport.stageNumberThe stage number that reported the error, if it happened during execution of a specific stage.
multiStageQuery.payload.status.errorReport.errorError object. Contains errorCode at a minimum, and may contain other fields as described in the error code table. Always present if there is an error.
multiStageQuery.payload.status.errorReport.error.errorCodeOne of the error codes from the . Always present if there is an error.
multiStageQuery.payload.status.errorReport.error.errorMessageUser-friendly error message. Not always present, even if there is an error.
multiStageQuery.payload.status.errorReport.exceptionStackTraceJava stack trace in string form, if the error was due to a server-side exception.
multiStageQuery.payload.stagesArray of query stages.
multiStageQuery.payload.stages[].stageNumberEach stage has a number that differentiates it from other stages.
multiStageQuery.payload.stages[].phaseEither NEW, READING_INPUT, POST_READING, RESULTS_COMPLETE, or FAILED. Only present if the stage has started.
multiStageQuery.payload.stages[].workerCountNumber of parallel tasks that this stage is running on. Only present if the stage has started.
multiStageQuery.payload.stages[].partitionCountNumber of output partitions generated by this stage. Only present if the stage has started and has computed its number of output partitions.
multiStageQuery.payload.stages[].startTimeStart time of this stage. Only present if the stage has started.
multiStageQuery.payload.stages[].durationThe number of milliseconds that the stage has been running. Only present if the stage has started.
multiStageQuery.payload.stages[].sortA boolean that is set to true if the stage does a sort as part of its execution.
multiStageQuery.payload.stages[].definitionThe object defining what the stage does.
multiStageQuery.payload.stages[].definition.idThe unique identifier of the stage.
multiStageQuery.payload.stages[].definition.inputArray of inputs that the stage has.
multiStageQuery.payload.stages[].definition.broadcastArray of input indexes that get broadcasted. Only present if there are inputs that get broadcasted.
multiStageQuery.payload.stages[].definition.processorAn object defining the processor logic.
multiStageQuery.payload.stages[].definition.signatureThe output signature of the stage.

Request

HTTP

curl

Python

  1. POST /druid/indexer/v1/task/<taskId>/shutdown
  1. # Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment.curl --location --request POST 'https://<username>:<password>@<your-instance>:<port>/druid/indexer/v1/task/<taskId>/shutdown'

Response

  1. {
  2. }