08、Java 任务调度 - ElasticJob 入门实战(ElasticJob-Lite使用)
ElasticJob 是一个分布式调度解决方案,由 2 个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。本文主要介绍 ElasticJob-Lite 的基本使用,文中所使用到的软件版本:Spring Boot 2.4.4、jdk1.8.0_181、elasticjob-lite 3.0.0-RC1。
1、ElasticJob-Lite 简介
ElasticJob-Lite 定位为轻量级无中心化解决方案,使用jar的形式提供分布式任务的协调服务。架构图如下:

详细的介绍请参考官网文档:https://shardingsphere.apache.org/elasticjob/current/cn/overview/
2、使用
2.1、Zookeeper 环境准备
ElasticJob-Lite 使用 Zookeeper作为注册中心,需先安装 Zookeeper;安装方法可参考:https://www.cnblogs.com/wuyongyin/p/12485181.html
2.2、单独使用
2.2.1、引入依赖
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-core</artifactId>
    <version>3.0.0-RC1</version>
</dependency>
2.2.2、样例
开发Job:
package com.abc.demo.solo;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyJob implements SimpleJob {
    private static Logger logger = LoggerFactory.getLogger(MyJob.class);
    @Override
    public void execute(ShardingContext shardingContext) {
        logger.info(shardingContext.getJobName() + "|" + shardingContext.getShardingItem() + "|" + shardingContext.getShardingParameter());
        try {
            Thread.sleep(1000 * 5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
运行Job:
package com.abc.demo.solo;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
import javax.sql.DataSource;
public class MyJobDemo {
    public static void main(String[] args) {
        CoordinatorRegistryCenter coordinatorRegistryCenter = createRegistryCenter();
        DataSource dataSource = getDataSource();
        TracingConfiguration tracingConfig = new TracingConfiguration<>("RDB", dataSource);
        new ScheduleJobBootstrap(coordinatorRegistryCenter, new MyJob(), createJobConfiguration("job1", tracingConfig)).schedule();
        new ScheduleJobBootstrap(coordinatorRegistryCenter, new MyJob(), createJobConfiguration("job2", tracingConfig)).schedule();
        new ScheduleJobBootstrap(coordinatorRegistryCenter, new MyJob(), createJobConfiguration("job3", tracingConfig)).schedule();
    }
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("10.49.196.10:2181", "my-job");
        zookeeperConfiguration.setMaxSleepTimeMilliseconds(1000 * 30);
        zookeeperConfiguration.setConnectionTimeoutMilliseconds(1000 * 30);
        CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        coordinatorRegistryCenter.init();
        return coordinatorRegistryCenter;
    }
    private static JobConfiguration createJobConfiguration(String jobName, TracingConfiguration tracingConfig) {
        JobConfiguration jobConfiguration = JobConfiguration.newBuilder(jobName, 2)
                .shardingItemParameters("0=Beijing,1=Shanghai")
                .cron("0/20 * * * * ?").build();
        //配置事件追踪,即记录任务执行日志
        jobConfiguration.getExtraConfigurations().add(tracingConfig);
        return jobConfiguration;
    }
    //这里使用 Hikari 连接池,使用 Druid 有时会报错
    private static DataSource getDataSource() {
        HikariConfig hikariConfig = new HikariConfig();
        hikariConfig.setDriverClassName("com.mysql.cj.jdbc.Driver");
        hikariConfig.setJdbcUrl("jdbc:mysql://10.49.196.10:3306/itest?useUnicode=true&characterEncoding=UTF-8");
        hikariConfig.setUsername("root");
        hikariConfig.setPassword("Root_123!");
        hikariConfig.setMinimumIdle(2);
        hikariConfig.setMaximumPoolSize(5);
        hikariConfig.setConnectionTestQuery("select 1");
        HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
        return hikariDataSource;
    }
}
2.3、Spring Boot 中整合 ElasticJob-Lite
2.3.1、引入依赖
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>3.0.0-RC1</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
2.3.2、application.yml
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://10.49.196.10:3306/itest?useUnicode=true&characterEncoding=UTF-8
    username: root
    password: 123456
elasticjob:
  reg-center:
    server-lists: 10.49.196.10:2181
    namespace: my-job
    max-sleep-time-milliseconds: 30000
    connection-timeout-milliseconds: 30000
  jobs:
    FirstJob:
      elasticJobClass: com.abc.demo.job.FirstJob
      cron: 0/10 * * * * ?
      shardingTotalCount: 2
      shardingItemParameters: 0=Beijing,1=Shanghai
    ScriptJob:
      elasticJobType: SCRIPT
      cron: 0/20 * * * * ?
      shardingTotalCount: 2
      props:
        script.command.line: "/home/demo/test.sh"
  tracing:
    type: RDB
2.3.3、开发任务
com.abc.demo.job.FirstJob:
package com.abc.demo.job;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class FirstJob implements SimpleJob {
    private static Logger logger = LoggerFactory.getLogger(FirstJob.class);
    @Override
    public void execute(ShardingContext shardingContext) {
        logger.info(shardingContext.getJobName() + "|" + shardingContext.getShardingItem() + "|" + shardingContext.getShardingParameter());
        try {
            Thread.sleep(1000 * 3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
脚本任务(/home/demo/test.sh):
echo 'hello'
echo 'hello2'
echo 'hello3'
2.3.4、启动应用
应用启动后,任务开始运行。
2.4、部署控制台
下载ElasticJob-Lite-UI 二进制包并解压:https://shardingsphere.apache.org/elasticjob/current/cn/downloads/
在lib 目录下增加 MySQL、Druid 的驱动包,然后执行 bin/start.sh。
访问地址为:http://10.49.196.10:8088/ (root/root)