ElasticJob-Lite 调度器分为定时调度和一次性调度两种类型。 每种调度器启动时均需要注册中心配置、作业对象(或作业类型)以及作业配置这 3 个参数。
public static void main(String[] args) {
// 调度基于 class 类型的作业
new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration()).schedule();
// 调度基于 type 类型的作业
new ScheduleJobBootstrap(createRegistryCenter(), "MY_TYPE", createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
// 创建作业配置
...
}
}
一次性调度
public class JobDemo {
public static void main(String[] args) {
OneOffJobBootstrap jobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration());
// 可多次调用一次性调度
jobBootstrap.execute();
jobBootstrap.execute();
jobBootstrap.execute();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
// 创建作业配置
...
}
}
使用 ElasticJob-Lite 过程中可能会碰到一些分布式问题,导致作业运行不稳定。
由于无法在生产环境调试,通过 dump 命令可以把作业内部相关信息导出,方便开发者调试分析;
以下示例用于展示如何通过 SnapshotService 开启用于导出命令的监听端口。
public static void main(final String[] args) {
SnapshotService snapshotService = new SnapshotService(regCenter, 9888).listen();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
// 创建注册中心
}
使用 ElasticJob-Lite 过程中当作业发生异常后,可采用以下错误处理策略。
抛出异常策略
public class JobDemo {
public static void main(String[] args) {
// 定时调度作业
new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
// 一次性调度作业
new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
}
private static JobConfiguration createScheduleJobConfiguration() {
// 创建定时作业配置, 并且使用抛出异常策略
return JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("THROW").build();
}
private static JobConfiguration createOneOffJobConfiguration() {
// 创建一次性作业配置, 并且使用抛出异常策略
return JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("THROW").build();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
// 配置注册中心
...
}
}
public class JobDemo {
public static void main(String[] args) {
// 定时调度作业
new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
// 一次性调度作业
new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
}
private static JobConfiguration createScheduleJobConfiguration() {
// 创建定时作业配置, 并且使用忽略异常策略
return JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("IGNORE").build();
}
private static JobConfiguration createOneOffJobConfiguration() {
// 创建一次性作业配置, 并且使用忽略异常策略
return JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("IGNORE").build();
}
// 配置注册中心
...
}
邮件通知策略
请参考 了解更多。
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-error-handler-email</artifactId>
<version>${latest.release.version}</version>
</dependency>
请参考 这里 了解更多。
Maven POM:
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-error-handler-wechat</artifactId>
<version>${latest.release.version}</version>
</dependency>
public class JobDemo {
public static void main(String[] args) {
// 定时调度作业
new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createScheduleJobConfiguration()).schedule();
// 一次性调度作业
new OneOffJobBootstrap(createRegistryCenter(), new MyJob(), createOneOffJobConfiguration()).execute();
}
private static JobConfiguration createScheduleJobConfiguration() {
// 创建定时作业配置, 并且使用企业微信通知策略
JobConfiguration jobConfig = JobConfiguration.newBuilder("myScheduleJob", 3).cron("0/5 * * * * ?").jobErrorHandlerType("WECHAT").build();
setWechatProperties(jobConfig);
return jobConfig;
}
private static JobConfiguration createOneOffJobConfiguration() {
// 创建一次性作业配置, 并且使用企业微信通知策略
JobConfiguration jobConfig = JobConfiguration.newBuilder("myOneOffJob", 3).jobErrorHandlerType("WECHAT").build();
setWechatProperties(jobConfig);
return jobConfig;
}
private static void setWechatProperties(final JobConfiguration jobConfig) {
// 设置企业微信的配置
jobConfig.getProps().setProperty(WechatPropertiesConstants.WEBHOOK, "you_webhook");
}
private static CoordinatorRegistryCenter createRegistryCenter() {
// 配置注册中心
...
}
}
钉钉通知策略
请参考 这里 了解更多。
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-error-handler-dingtalk</artifactId>
<version>${latest.release.version}</version>