ElasticJob-Lite 调度器分为定时调度和一次性调度两种类型。 每种调度器启动时均需要注册中心配置、作业对象(或作业类型)以及作业配置这 3 个参数。

  1. public static void main(String[] args) {
  2. // 调度基于 class 类型的作业
  3. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration()).schedule();
  4. // 调度基于 type 类型的作业
  5. new ScheduleJobBootstrap(createRegistryCenter(), "MY_TYPE", createJobConfiguration()).schedule();
  6. }
  7. private static CoordinatorRegistryCenter createRegistryCenter() {
  8. CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
  9. regCenter.init();
  10. return regCenter;
  11. }
  12. private static JobConfiguration createJobConfiguration() {
  13. // 创建作业配置
  14. ...
  15. }
  16. }

一次性调度

  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. OneOffJobBootstrap jobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration());
  4. // 可多次调用一次性调度
  5. jobBootstrap.execute();
  6. jobBootstrap.execute();
  7. jobBootstrap.execute();
  8. }
  9. private static CoordinatorRegistryCenter createRegistryCenter() {
  10. CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
  11. regCenter.init();
  12. return regCenter;
  13. }
  14. private static JobConfiguration createJobConfiguration() {
  15. // 创建作业配置
  16. ...
  17. }
  18. }

使用 ElasticJob-Lite 过程中可能会碰到一些分布式问题,导致作业运行不稳定。

由于无法在生产环境调试,通过 dump 命令可以把作业内部相关信息导出,方便开发者调试分析;

以下示例用于展示如何通过 SnapshotService 开启用于导出命令的监听端口。

  1. public static void main(final String[] args) {
  2. SnapshotService snapshotService = new SnapshotService(regCenter, 9888).listen();
  3. }
  4. private static CoordinatorRegistryCenter createRegistryCenter() {
  5. // 创建注册中心
  6. }

使用 ElasticJob-Lite 过程中当作业发生异常后,可采用以下错误处理策略。

抛出异常策略

  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. // 定时调度作业
  4. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
  5. // 一次性调度作业
  6. new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
  7. }
  8. private static JobConfiguration createScheduleJobConfiguration() {
  9. // 创建定时作业配置, 并且使用抛出异常策略
  10. return JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("THROW").build();
  11. }
  12. private static JobConfiguration createOneOffJobConfiguration() {
  13. // 创建一次性作业配置, 并且使用抛出异常策略
  14. return JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("THROW").build();
  15. }
  16. private static CoordinatorRegistryCenter createRegistryCenter() {
  17. // 配置注册中心
  18. ...
  19. }
  20. }
  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. // 定时调度作业
  4. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
  5. // 一次性调度作业
  6. new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
  7. }
  8. private static JobConfiguration createScheduleJobConfiguration() {
  9. // 创建定时作业配置, 并且使用忽略异常策略
  10. return JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("IGNORE").build();
  11. }
  12. private static JobConfiguration createOneOffJobConfiguration() {
  13. // 创建一次性作业配置, 并且使用忽略异常策略
  14. return JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("IGNORE").build();
  15. }
  16. // 配置注册中心
  17. ...
  18. }

邮件通知策略

请参考 了解更多。

  1. <dependency>
  2. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3. <artifactId>elasticjob-error-handler-email</artifactId>
  4. <version>${latest.release.version}</version>
  5. </dependency>

请参考 这里 了解更多。

Maven POM:

  1. <dependency>
  2. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3. <artifactId>elasticjob-error-handler-wechat</artifactId>
  4. <version>${latest.release.version}</version>
  5. </dependency>
  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. // 定时调度作业
  4. new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
  5. // 一次性调度作业
  6. new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
  7. }
  8. private static JobConfiguration createScheduleJobConfiguration() {
  9. // 创建定时作业配置, 并且使用企业微信通知策略
  10. JobConfiguration jobConfig = JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("WECHAT").build();
  11. setWechatProperties(jobConfig);
  12. return jobConfig;
  13. }
  14. private static JobConfiguration createOneOffJobConfiguration() {
  15. // 创建一次性作业配置, 并且使用企业微信通知策略
  16. JobConfiguration jobConfig = JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("WECHAT").build();
  17. setWechatProperties(jobConfig);
  18. return jobConfig;
  19. }
  20. private static void setWechatProperties(final JobConfiguration jobConfig) {
  21. // 设置企业微信的配置
  22. jobConfig.getProps().setProperty(WechatPropertiesConstants.WEBHOOK, "you_webhook");
  23. }
  24. private static CoordinatorRegistryCenter createRegistryCenter() {
  25. // 配置注册中心
  26. ...
  27. }
  28. }

钉钉通知策略

请参考 这里 了解更多。

  1. <dependency>
  2. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3. <artifactId>elasticjob-error-handler-dingtalk</artifactId>
  4. <version>${latest.release.version}</version>