ElasticJob 的作业分类基于 class 和 type 两种类型。 基于 class 的作业需要开发者自行通过实现接口的方式织入业务逻辑; 基于 type 的作业则无需编码,只需要提供相应配置即可。

基于 class 的作业接口的方法参数 包含作业配置、片和运行时信息。 可通过 getShardingTotalCount(), getShardingItem() 等方法分别获取分片总数,运行在本作业服务器的分片序列号等。

ElasticJob 目前提供 Simple、Dataflow 这两种基于 class 的作业类型,并提供 Script、HTTP 这两种基于 type 的作业类型,用户可通过实现 SPI 接口自行扩展作业类型。

意为简单实现,未经任何封装的类型。需实现 SimpleJob 接口。 该接口仅提供单一方法用于覆盖,此方法将定时执行。 与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

  1. public class MyElasticJob implements DataflowJob<Foo> {
  2. @Override
  3. public List<Foo> fetchData(ShardingContext context) {
  4. switch (context.getShardingItem()) {
  5. case 0:
  6. List<Foo> data = // get data from database by sharding item 0
  7. return data;
  8. List<Foo> data = // get data from database by sharding item 1
  9. return data;
  10. List<Foo> data = // get data from database by sharding item 2
  11. return data;
  12. // case n: ...
  13. }
  14. }
  15. @Override
  16. public void processData(ShardingContext shardingContext, List<Foo> data) {
  17. // ...
  18. }
  19. }

流式处理

可通过属性配置 streaming.process 开启或关闭流式处理。

如果开启流式处理,则作业只有在 fetchData 方法的返回值为 null 或集合容量为空时,才停止抓取,否则作业将一直运行下去; 如果关闭流式处理,则作业只会在每次作业执行过程中执行一次 fetchData 和 processData 方法,随即完成本次作业。

如果采用流式作业处理方式,建议 processData 在处理数据后更新其状态,避免 fetchData 再次抓取到,从而使得作业永不停止。

例如如下脚本:

作业运行时将输出:

  1. sharding execution context is {"jobName":"scriptElasticDemoJob","shardingTotalCount":10,"jobParameter":"","shardingItem":0,"shardingParameter":"A"}

可通过属性配置http.url,http.method,http.data等配置待请求的http信息。 分片信息以Header形式传递,key为shardingContext,值为json格式。

  1. @Controller
  2. @Slf4j
  3. public class HttpJobController {
  4. @RequestMapping(path = "/execute", method = RequestMethod.POST)
  5. public void execute(String source, @RequestHeader String shardingContext) {
  6. log.info("execute from source : {}, shardingContext : {}", source, shardingContext);

execute接口将输出: