08、Java 任务调度 - ElasticJob 入门实战(ElasticJob-Lite使用)
ElasticJob 是一个分布式调度解决方案,由 2 个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。本文主要介绍 ElasticJob-Lite 的基本使用,文中所使用到的软件版本:Spring Boot 2.4.4、jdk1.8.0_181、elasticjob-lite 3.0.0-RC1。
1、ElasticJob-Lite 简介
ElasticJob-Lite 定位为轻量级无中心化解决方案,使用jar的形式提供分布式任务的协调服务。架构图如下:
详细的介绍请参考官网文档:https://shardingsphere.apache.org/elasticjob/current/cn/overview/
2、使用
2.1、Zookeeper 环境准备
ElasticJob-Lite 使用 Zookeeper作为注册中心,需先安装 Zookeeper;安装方法可参考:https://www.cnblogs.com/wuyongyin/p/12485181.html
2.2、单独使用
2.2.1、引入依赖
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
<version>3.0.0-RC1</version>
</dependency>
2.2.2、样例
开发Job:
package com.abc.demo.solo;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyJob implements SimpleJob {
private static Logger logger = LoggerFactory.getLogger(MyJob.class);
@Override
public void execute(ShardingContext shardingContext) {
logger.info(shardingContext.getJobName() + "|" + shardingContext.getShardingItem() + "|" + shardingContext.getShardingParameter());
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行Job:
package com.abc.demo.solo;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
import javax.sql.DataSource;
public class MyJobDemo {
public static void main(String[] args) {
CoordinatorRegistryCenter coordinatorRegistryCenter = createRegistryCenter();
DataSource dataSource = getDataSource();
TracingConfiguration tracingConfig = new TracingConfiguration<>("RDB", dataSource);
new ScheduleJobBootstrap(coordinatorRegistryCenter, new MyJob(), createJobConfiguration("job1", tracingConfig)).schedule();
new ScheduleJobBootstrap(coordinatorRegistryCenter, new MyJob(), createJobConfiguration("job2", tracingConfig)).schedule();
new ScheduleJobBootstrap(coordinatorRegistryCenter, new MyJob(), createJobConfiguration("job3", tracingConfig)).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("10.49.196.10:2181", "my-job");
zookeeperConfiguration.setMaxSleepTimeMilliseconds(1000 * 30);
zookeeperConfiguration.setConnectionTimeoutMilliseconds(1000 * 30);
CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
coordinatorRegistryCenter.init();
return coordinatorRegistryCenter;
}
private static JobConfiguration createJobConfiguration(String jobName, TracingConfiguration tracingConfig) {
JobConfiguration jobConfiguration = JobConfiguration.newBuilder(jobName, 2)
.shardingItemParameters("0=Beijing,1=Shanghai")
.cron("0/20 * * * * ?").build();
//配置事件追踪,即记录任务执行日志
jobConfiguration.getExtraConfigurations().add(tracingConfig);
return jobConfiguration;
}
//这里使用 Hikari 连接池,使用 Druid 有时会报错
private static DataSource getDataSource() {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setDriverClassName("com.mysql.cj.jdbc.Driver");
hikariConfig.setJdbcUrl("jdbc:mysql://10.49.196.10:3306/itest?useUnicode=true&characterEncoding=UTF-8");
hikariConfig.setUsername("root");
hikariConfig.setPassword("Root_123!");
hikariConfig.setMinimumIdle(2);
hikariConfig.setMaximumPoolSize(5);
hikariConfig.setConnectionTestQuery("select 1");
HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
return hikariDataSource;
}
}
2.3、Spring Boot 中整合 ElasticJob-Lite
2.3.1、引入依赖
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-spring-boot-starter</artifactId>
<version>3.0.0-RC1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
2.3.2、application.yml
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://10.49.196.10:3306/itest?useUnicode=true&characterEncoding=UTF-8
username: root
password: 123456
elasticjob:
reg-center:
server-lists: 10.49.196.10:2181
namespace: my-job
max-sleep-time-milliseconds: 30000
connection-timeout-milliseconds: 30000
jobs:
FirstJob:
elasticJobClass: com.abc.demo.job.FirstJob
cron: 0/10 * * * * ?
shardingTotalCount: 2
shardingItemParameters: 0=Beijing,1=Shanghai
ScriptJob:
elasticJobType: SCRIPT
cron: 0/20 * * * * ?
shardingTotalCount: 2
props:
script.command.line: "/home/demo/test.sh"
tracing:
type: RDB
2.3.3、开发任务
com.abc.demo.job.FirstJob:
package com.abc.demo.job;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class FirstJob implements SimpleJob {
private static Logger logger = LoggerFactory.getLogger(FirstJob.class);
@Override
public void execute(ShardingContext shardingContext) {
logger.info(shardingContext.getJobName() + "|" + shardingContext.getShardingItem() + "|" + shardingContext.getShardingParameter());
try {
Thread.sleep(1000 * 3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
脚本任务(/home/demo/test.sh):
echo 'hello'
echo 'hello2'
echo 'hello3'
2.3.4、启动应用
应用启动后,任务开始运行。
2.4、部署控制台
下载ElasticJob-Lite-UI 二进制包并解压:https://shardingsphere.apache.org/elasticjob/current/cn/downloads/
在lib 目录下增加 MySQL、Druid 的驱动包,然后执行 bin/start.sh。
访问地址为:http://10.49.196.10:8088/ (root/root)