任务总体存储结构

该数据库表结构如下表所示:

其中process_definition_json 字段为核心字段, 定义了 DAG 图中的任务信息.该数据以JSON 的方式进行存储.

公共的数据结构如下表.

序号字段类型描述
1globalParamsArray全局参数
2tasksArray流程中的任务集合 [ 各个类型的结构请参考如下章节]
3tenantIdint租户id
4timeoutint超时时间

数据示例:

各任务类型存储结构详解

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SHELL
3nameString名称
4paramsObject自定义参数Json 格式
5rawScriptStringShell脚本
6localParamsArray自定义参数
7resourceListArray资源文件
8descriptionString描述
9runFlagString运行标识
10conditionResultObject条件分支
11successNodeArray成功跳转节点
12failedNodeArray失败跳转节点
13dependenceObject任务依赖与params互斥
14maxRetryTimesString最大重试次数
15retryIntervalString重试间隔
16timeoutObject超时控制
17taskInstancePriorityString任务优先级
18workerGroupStringWorker 分组
19preTasksArray前置任务

节点数据样例:

  1. "type":"SHELL",
  2. "id":"tasks-80760",
  3. "name":"Shell Task",
  4. "params":{
  5. "resourceList":[
  6. {
  7. "id":3,
  8. "name":"run.sh",
  9. "res":"run.sh"
  10. }
  11. ],
  12. "localParams":[
  13. ],
  14. "rawScript":"echo "This is a shell script""
  15. },
  16. "description":"",
  17. "runFlag":"NORMAL",
  18. "conditionResult":{
  19. "successNode":[
  20. ""
  21. ],
  22. "failedNode":[
  23. ""
  24. ]
  25. },
  26. "dependence":{
  27. },
  28. "maxRetryTimes":"0",
  29. "retryInterval":"1",
  30. "timeout":{
  31. "strategy":"",
  32. "interval":null,
  33. "enable":false
  34. },
  35. "taskInstancePriority":"MEDIUM",
  36. "workerGroup":"default",
  37. "preTasks":[
  38. ]
  39. }

SQL节点

通过 SQL对指定的数据源进行数据查询、更新操作.

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SQL
3nameString名称
4paramsObject自定义参数Json 格式
5typeString数据库类型
6datasourceInt数据源id
7sqlString查询SQL语句
8udfsStringudf函数UDF函数id,以逗号分隔.
9sqlTypeStringSQL节点类型0 查询 , 1 非查询
10titleString邮件标题
11receiversString收件人
12receiversCcString抄送人
13showTypeString邮件显示类型TABLE 表格 , ATTACHMENT附件
14connParamsString连接参数
15preStatementsArray前置SQL
16postStatementsArray后置SQL
17localParamsArray自定义参数
18descriptionString描述
19runFlagString运行标识
20conditionResultObject条件分支
21successNodeArray成功跳转节点
22failedNodeArray失败跳转节点
23dependenceObject任务依赖与params互斥
24maxRetryTimesString最大重试次数
25retryIntervalString重试间隔
26timeoutObject超时控制
27taskInstancePriorityString任务优先级
28workerGroupStringWorker 分组
29preTasksArray前置任务

节点数据样例:

  1. {
  2. "type":"SQL",
  3. "id":"tasks-95648",
  4. "name":"SqlTask-Query",
  5. "params":{
  6. "type":"MYSQL",
  7. "datasource":1,
  8. "sql":"select id , namge , age from emp where id = ${id}",
  9. "udfs":"",
  10. "sqlType":"0",
  11. "title":"xxxx@xxx.com",
  12. "receivers":"xxxx@xxx.com",
  13. "receiversCc":"",
  14. "showType":"TABLE",
  15. "localParams":[
  16. {
  17. "prop":"id",
  18. "direct":"IN",
  19. "type":"INTEGER",
  20. "value":"1"
  21. }
  22. ],
  23. "connParams":"",
  24. "preStatements":[
  25. "insert into emp ( id,name ) value (1,'Li' )"
  26. ],
  27. "postStatements":[
  28. ]
  29. },
  30. "description":"",
  31. "runFlag":"NORMAL",
  32. "conditionResult":{
  33. "successNode":[
  34. ""
  35. ],
  36. "failedNode":[
  37. ""
  38. ]
  39. },
  40. "dependence":{
  41. },
  42. "maxRetryTimes":"0",
  43. "retryInterval":"1",
  44. "timeout":{
  45. "strategy":"",
  46. "interval":null,
  47. "enable":false
  48. },
  49. "taskInstancePriority":"MEDIUM",
  50. "workerGroup":"default",
  51. "preTasks":[
  52. ]
  53. }

PROCEDURE[存储过程]节点

SPARK节点

节点数据结构如下:

节点数据样例:

  1. {
  2. "type":"SPARK",
  3. "id":"tasks-87430",
  4. "name":"SparkTask",
  5. "params":{
  6. "mainClass":"org.apache.spark.examples.SparkPi",
  7. "mainJar":{
  8. "id":4
  9. },
  10. "deployMode":"cluster",
  11. "resourceList":[
  12. {
  13. "id":3,
  14. "name":"run.sh",
  15. "res":"run.sh"
  16. }
  17. ],
  18. "localParams":[
  19. ],
  20. "driverCores":1,
  21. "driverMemory":"512M",
  22. "numExecutors":2,
  23. "executorMemory":"2G",
  24. "executorCores":2,
  25. "mainArgs":"10",
  26. "others":"",
  27. "programType":"SCALA",
  28. },
  29. "description":"",
  30. "runFlag":"NORMAL",
  31. "conditionResult":{
  32. "successNode":[
  33. ""
  34. ],
  35. "failedNode":[
  36. ""
  37. },
  38. "dependence":{
  39. },
  40. "maxRetryTimes":"0",
  41. "retryInterval":"1",
  42. "timeout":{
  43. "strategy":"",
  44. "interval":null,
  45. "enable":false
  46. },
  47. "taskInstancePriority":"MEDIUM",
  48. "workerGroup":"default",
  49. "preTasks":[
  50. ]
  51. }

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型MR
3nameString名称
4paramsObject自定义参数Json 格式
5mainClassString运行主类
6mainArgsString运行参数
7othersString其他参数
8mainJarObject程序 jar 包
9programTypeString程序类型JAVA,PYTHON
10localParamsArray自定义参数
11resourceListArray资源文件
12descriptionString描述
13runFlagString运行标识
14conditionResultObject条件分支
15successNodeArray成功跳转节点
16failedNodeArray失败跳转节点
17dependenceObject任务依赖与params互斥
18maxRetryTimesString最大重试次数
19retryIntervalString重试间隔
20timeoutObject超时控制
21taskInstancePriorityString任务优先级
22workerGroupStringWorker 分组
23preTasksArray前置任务

节点数据样例:

Python节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型PYTHON
3nameString名称
4paramsObject自定义参数Json 格式
5rawScriptStringPython脚本
6localParamsArray自定义参数
7resourceListArray资源文件
8descriptionString描述
9runFlagString运行标识
10conditionResultObject条件分支
11successNodeArray成功跳转节点
12failedNodeArray失败跳转节点
13dependenceObject任务依赖与params互斥
14maxRetryTimesString最大重试次数
15retryIntervalString重试间隔
16timeoutObject超时控制
17taskInstancePriorityString任务优先级
18workerGroupStringWorker 分组
19preTasksArray前置任务

节点数据样例:

  1. {
  2. "type":"PYTHON",
  3. "id":"tasks-5463",
  4. "name":"Python Task",
  5. "params":{
  6. "resourceList":[
  7. {
  8. "id":3,
  9. "name":"run.sh",
  10. "res":"run.sh"
  11. }
  12. ],
  13. "localParams":[
  14. ],
  15. "rawScript":"print("This is a python script")"
  16. },
  17. "description":"",
  18. "runFlag":"NORMAL",
  19. "conditionResult":{
  20. "successNode":[
  21. ""
  22. ],
  23. "failedNode":[
  24. ""
  25. ]
  26. },
  27. "dependence":{
  28. },
  29. "maxRetryTimes":"0",
  30. "retryInterval":"1",
  31. "timeout":{
  32. "strategy":"",
  33. "interval":null,
  34. "enable":false
  35. },
  36. "taskInstancePriority":"MEDIUM",
  37. "workerGroup":"default",
  38. "preTasks":[
  39. ]
  40. }

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型FLINK
3nameString名称
4paramsObject自定义参数Json 格式
5mainClassString运行主类
6mainArgsString运行参数
7othersString其他参数
8mainJarObject程序 jar 包
9deployModeString部署模式local,client,cluster
10slotStringslot数量
11taskManagerStringtaskManager数量
12taskManagerMemoryStringtaskManager内存数
13jobManagerMemoryStringjobManager内存数
14programTypeString程序类型JAVA,SCALA,PYTHON
15localParamsArray自定义参数
16resourceListArray资源文件
17descriptionString描述
18runFlagString运行标识
19conditionResultObject条件分支
20successNodeArray成功跳转节点
21failedNodeArray失败跳转节点
22dependenceObject任务依赖与params互斥
23maxRetryTimesString最大重试次数
24retryIntervalString重试间隔
25timeoutObject超时控制
26taskInstancePriorityString任务优先级
27workerGroupStringWorker 分组
38preTasksArray前置任务

节点数据样例:

  1. {
  2. "type":"FLINK",
  3. "id":"tasks-17135",
  4. "name":"FlinkTask",
  5. "params":{
  6. "mainClass":"com.flink.demo",
  7. "mainJar":{
  8. "id":6
  9. },
  10. "deployMode":"cluster",
  11. "resourceList":[
  12. {
  13. "id":3,
  14. "name":"run.sh",
  15. "res":"run.sh"
  16. }
  17. ],
  18. "localParams":[
  19. ],
  20. "slot":1,
  21. "taskManager":"2",
  22. "jobManagerMemory":"1G",
  23. "taskManagerMemory":"2G",
  24. "executorCores":2,
  25. "mainArgs":"100",
  26. "others":"",
  27. "programType":"SCALA"
  28. },
  29. "description":"",
  30. "runFlag":"NORMAL",
  31. "conditionResult":{
  32. "successNode":[
  33. ""
  34. ],
  35. "failedNode":[
  36. ""
  37. ]
  38. },
  39. "dependence":{
  40. },
  41. "maxRetryTimes":"0",
  42. "retryInterval":"1",
  43. "timeout":{
  44. "strategy":"",
  45. "interval":null,
  46. "enable":false
  47. },
  48. "taskInstancePriority":"MEDIUM",
  49. "workerGroup":"default",
  50. "preTasks":[
  51. ]
  52. }

HTTP节点

节点数据结构如下:

  1. {
  2. "type":"HTTP",
  3. "id":"tasks-60499",
  4. "name":"HttpTask",
  5. "params":{
  6. "localParams":[
  7. ],
  8. "httpParams":[
  9. {
  10. "prop":"id",
  11. "httpParametersType":"PARAMETER",
  12. "value":"1"
  13. },
  14. {
  15. "prop":"name",
  16. "value":"Bo"
  17. }
  18. ],
  19. "url":"https://www.xxxxx.com:9012",
  20. "httpCheckCondition":"STATUS_CODE_DEFAULT",
  21. "condition":""
  22. },
  23. "description":"",
  24. "runFlag":"NORMAL",
  25. "conditionResult":{
  26. "successNode":[
  27. ""
  28. ],
  29. "failedNode":[
  30. ""
  31. ]
  32. },
  33. "dependence":{
  34. },
  35. "maxRetryTimes":"0",
  36. "retryInterval":"1",
  37. "timeout":{
  38. "strategy":"",
  39. "interval":null,
  40. "enable":false
  41. },
  42. "taskInstancePriority":"MEDIUM",
  43. "workerGroup":"default",
  44. "preTasks":[
  45. ]
  46. }

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型DATAX
3nameString名称
4paramsObject自定义参数Json 格式
5customConfigInt自定义类型0定制 , 1自定义
6dsTypeString源数据库类型
7dataSourceInt源数据库ID
8dtTypeString目标数据库类型
9dataTargetInt目标数据库ID
10sqlStringSQL语句
11targetTableString目标表
12jobSpeedByteInt限流(字节数)
13jobSpeedRecordInt限流(记录数)
14preStatementsArray前置SQL
15postStatementsArray后置SQL
16jsonString自定义配置customConfig=1时生效
17localParamsArray自定义参数customConfig=1时生效
18descriptionString描述
19runFlagString运行标识
20conditionResultObject条件分支
21successNodeArray成功跳转节点
22failedNodeArray失败跳转节点
23dependenceObject任务依赖与params互斥
24maxRetryTimesString最大重试次数
25retryIntervalString重试间隔
26timeoutObject超时控制
27taskInstancePriorityString任务优先级
28workerGroupStringWorker 分组
29preTasksArray前置任务

节点数据样例:

Sqoop节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SQOOP
3nameString名称
4paramsObject自定义参数JSON 格式
5concurrencyInt并发度
6modelTypeString流向import,export
7sourceTypeString数据源类型
8sourceParamsString数据源参数JSON格式
9targetTypeString目标数据类型
10targetParamsString目标数据参数JSON格式
11localParamsArray自定义参数
12descriptionString描述
13runFlagString运行标识
14conditionResultObject条件分支
15successNodeArray成功跳转节点
16failedNodeArray失败跳转节点
17dependenceObject任务依赖与params互斥
18maxRetryTimesString最大重试次数
19retryIntervalString重试间隔
20timeoutObject超时控制
21taskInstancePriorityString任务优先级
22workerGroupStringWorker 分组
23preTasksArray前置任务

节点数据样例:

  1. {
  2. "type":"SQOOP",
  3. "id":"tasks-82041",
  4. "name":"Sqoop Task",
  5. "params":{
  6. "concurrency":1,
  7. "modelType":"import",
  8. "sourceType":"MYSQL",
  9. "targetType":"HDFS",
  10. "sourceParams":"{"srcType":"MYSQL","srcDatasource":1,"srcTable":"","srcQueryType":"1","srcQuerySql":"selec id , name from user","srcColumnType":"0","srcColumns":"","srcConditionList":[],"mapColumnHive":[{"prop":"hivetype-key","direct":"IN","type":"VARCHAR","value":"hivetype-value"}],"mapColumnJava":[{"prop":"javatype-key","direct":"IN","type":"VARCHAR","value":"javatype-value"}]}",
  11. "targetParams":"{"targetPath":"/user/hive/warehouse/ods.db/user","deleteTargetDir":false,"fileType":"--as-avrodatafile","compressionCodec":"snappy","fieldsTerminated":",","linesTerminated":"@"}",
  12. "localParams":[
  13. ]
  14. },
  15. "description":"",
  16. "runFlag":"NORMAL",
  17. "conditionResult":{
  18. "successNode":[
  19. ""
  20. ],
  21. "failedNode":[
  22. ""
  23. ]
  24. },
  25. "dependence":{
  26. },
  27. "maxRetryTimes":"0",
  28. "retryInterval":"1",
  29. "timeout":{
  30. "strategy":"",
  31. "interval":null,
  32. "enable":false
  33. },
  34. "taskInstancePriority":"MEDIUM",
  35. "workerGroup":"default",
  36. "preTasks":[
  37. ]
  38. }

条件分支节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SHELL
3nameString名称
4paramsObject自定义参数null
5descriptionString描述
6runFlagString运行标识
7conditionResultObject条件分支
8successNodeArray成功跳转节点
9failedNodeArray失败跳转节点
10dependenceObject任务依赖与params互斥
11maxRetryTimesString最大重试次数
12retryIntervalString重试间隔
13timeoutObject超时控制
14taskInstancePriorityString任务优先级
15workerGroupStringWorker 分组
16preTasksArray前置任务

节点数据样例:

  1. {
  2. "type":"CONDITIONS",
  3. "id":"tasks-96189",
  4. "name":"条件",
  5. "params":{
  6. },
  7. "description":"",
  8. "runFlag":"NORMAL",
  9. "conditionResult":{
  10. "successNode":[
  11. "test04"
  12. ],
  13. "failedNode":[
  14. "test05"
  15. ]
  16. },
  17. "dependence":{
  18. "relation":"AND",
  19. "dependTaskList":[
  20. ]
  21. },
  22. "maxRetryTimes":"0",
  23. "retryInterval":"1",
  24. "timeout":{
  25. "strategy":"",
  26. "interval":null,
  27. "enable":false
  28. },
  29. "taskInstancePriority":"MEDIUM",
  30. "workerGroup":"default",
  31. "preTasks":[
  32. "test01",
  33. "test02"
  34. ]
  35. }

子流程节点

节点数据结构如下:

节点数据样例:

  1. {
  2. "type":"SUB_PROCESS",
  3. "id":"tasks-14806",
  4. "name":"SubProcessTask",
  5. "params":{
  6. "processDefinitionId":2
  7. },
  8. "description":"",
  9. "runFlag":"NORMAL",
  10. "conditionResult":{
  11. "successNode":[
  12. ""
  13. ],
  14. "failedNode":[
  15. ""
  16. ]
  17. },
  18. "dependence":{
  19. },
  20. "timeout":{
  21. "strategy":"",
  22. "interval":null,
  23. "enable":false
  24. },
  25. "taskInstancePriority":"MEDIUM",
  26. "workerGroup":"default",
  27. "preTasks":[
  28. }

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型DEPENDENT
3nameString名称
4paramsObject自定义参数Json 格式
5rawScriptStringShell脚本
6localParamsArray自定义参数
7resourceListArray资源文件
8descriptionString描述
9runFlagString运行标识
10conditionResultObject条件分支
11successNodeArray成功跳转节点
12failedNodeArray失败跳转节点
13dependenceObject任务依赖与params互斥
14relationString关系AND,OR
15dependTaskListArray依赖任务清单
16maxRetryTimesString最大重试次数
17retryIntervalString重试间隔
18timeoutObject超时控制
19taskInstancePriorityString任务优先级
20workerGroupStringWorker 分组
21preTasksArray前置任务