1 分布式任务调度

1.1 分布式任务调度介绍

​ 很多时候,我们需要定时执行一些程序完成一些预定要完成的操作,如果手动处理,一旦任务量过大,就非常麻烦,所以用定时任务去操作是个非常不错的选项。

​ 现在的应用多数是分布式或者微服务,所以我们需要的是分布式任务调度,那么现在分布式任务调度流行的主要有elastic-job、xxl-job、quartz等,我们这里做一个对比:

featurequartzelastic-jobxxl-jobantaresopencron
依赖mysqljdk1.7+, zookeeper 3.4.6+ ,maven3.0.4+mysql ,jdk1.7+ , maven3.0+jdk 1.7+ , redis , zookeeperjdk1.7+ , Tomcat8.0+
HA多节点部署,通过竞争数据库锁来保证只有一个节点执行任务通过zookeeper的注册与发现,可以动态的添加服务器。 支持水平扩容集群部署集群部署
任务分片支持支持支持
文档完善完善完善完善文档略少文档略少
管理界面支持支持支持支持
难易程度简单简单简单一般一般
公司OpenSymphony当当网个人个人个人
高级功能弹性扩容,多种作业模式,失效转移,运行状态收集,多线程处理数据,幂等性,容错处理,spring命名空间支持弹性扩容,分片广播,故障转移,Rolling实时日志,GLUE(支持在线编辑代码,免发布),任务进度监控,任务依赖,数据加密,邮件报警,运行报表,国际化任务分片, 失效转移,弹性扩容 ,时间规则支持quartz和crontab ,kill任务, 现场执行,查询任务运行状态
使用企业大众化产品,对分布式调度要求不高的公司大面积使用36氪,当当网,国美,金柚网,联想,唯品会,亚信,平安,猪八戒大众点评,运满满,优信二手车,拍拍贷

1.2 elastic-job讲解

官网:http://elasticjob.io/index_zh.html

1.2.1 静态任务案例

​ 使用elastic-job很容易,我们接下来学习下elastic-job的使用,这里的案例我们先实现静态任务案例,静态任务案例也就是执行时间事先写好。

实现步骤:

1.引入依赖包
2.配置zookeeper节点以及任务名称命名空间
3.实现自定义任务,需要实现SimpleJob接口

1)在seckill-goods中引入依赖

<!-- ElasticJobAutoConfiguration自动配置类作用-->
<dependency>
    <groupId>com.github.kuhn-he</groupId>
    <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
    <version>2.1.5</version>
</dependency>

2)配置elastic-job

bootstrap.yml中配置elastic-job,如下:

elaticjob:
  zookeeper:
    server-lists: zk-server:3181
    namespace: updatetask

讲解:

server-lists:zookeeper的地址
namespace:定时任务命名空间

这里我们只展示了部分常用的参数,还有很多参数,但不一定常用,大家可以参考下面地址学习:

3)任务创建

创建com.seckill.goods.task.statictask.ElasticjobTask,代码如下:

@ElasticSimpleJob(
        cron = "5/10 * * * * ?",
        jobName = "updateTask",
        shardingTotalCount = 1
)
@Component
public class ElasticjobTask implements SimpleJob {
 
    /***
     * 执行任务
     * @param shardingContext
     */
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("-----------执行!");
    }
}

讲解:

cron:定时表达式
jobName:这里和bootstrap.yml中的namespace保持一致
shardingTotalCount:分片数量

1.2.2 动态任务案例

​ 参考地址:https://github.com/LuoLiangDSGA/spring-learning/tree/master/boot-elasticjob

​ 动态任务案例主要是讲解程序在运行时,动态添加定时任务,这种场景应用非常广泛。使用elastic-job实现动态添加定时任务的实现有点复杂,我们接下来实际操作一次。

步骤:

1.配置初始化的zookeeper地址
2.配置的定时任务命名空间(不一定会使用)
3.注册初始化数据
4.监听器->任务执行前后监听(可有可无)
5.动态添加定时任务实现
6.自定义任务处理过程-实现SimpleJob
  1. 监听器创建

​ 监听器采用AOP模式,类似前置通知和后置通知,doBeforeJobExecutedAtLastStarteddoAfterJobExecutedAtLastCompleted分别会在任务执行前和执行后调用,我们创建一个监听器实现任务调度前后拦截,com.seckill.goods.task.dynamic.ElasticJobListener:

public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
 
    /****
     * 构造函数
     * @param startedTimeoutMilliseconds
     * @param completedTimeoutMilliseconds
     */
    public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }
 
    /***
     * 任务初始化前要做的事情,类似前置通知
     * @param shardingContexts
     */
    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println("========doBeforeJobExecutedAtLastStarted========"+ TimeUtil.date2FormatHHmmss(new Date()));
    }
 
    /***
     * 任务执行完成后要做的事情,类似后置通知
     * @param shardingContexts
     */
    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.out.println("=======doAfterJobExecutedAtLastCompleted============="+ TimeUtil.date2FormatHHmmss(new Date()));
    }
}

2)注册中心配置

在bootstrap.yml中配置zk和namespace

#配置动态任务案例的zk和namespace
zk: zk-server:3181
namesp: autotask

创建配置类配置注册中心信息,com.seckill.goods.task.dynamic.ElasticJobConfig:

@Configuration
public class ElasticJobConfig {
 
    //配置文件中的zookeeper的ip和端口
    @Value(value = "${zk}")
    private String serverlists;
    //指定一个命名空间
    @Value("${namesp}")
    private String namespace;
 
    /***
     * 配置Zookeeper和namespace
     * @return
     */
    @Bean
    public ZookeeperConfiguration zkConfig() {
        return new ZookeeperConfiguration(serverlists, namespace);
    }
 
    /***
     * 向zookeeper注册初始化信息
     * @param config
     * @return
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
        return new ZookeeperRegistryCenter(config);
    }
 
    /****
     * 创建ElasticJob的监听器实例
     * @return
     */
    @Bean
    public ElasticJobListener elasticJobListener() {
        //初始化要给定超时多少秒重连
        return new ElasticJobListener(100L,100L);
    }
}

3)任务构建

​ 我们创建一个动态配置任务的类,任何逻辑代码需要创建定时任务,可以直接调用该类的指定方法即可。创建类:com.seckill.goods.task.dynamic.ElasticJobHandler,代码如下:

@Component
public class ElasticJobHandler {
 
    @Resource
    private ZookeeperRegistryCenter registryCenter;
 
    @Resource
    private ElasticJobListener elasticJobListener;
 
    /**
     * @param jobName:任务的命名空间
     * @param jobClass:执行的定时任务对象
     * @param shardingTotalCount:分片个数
     * @param cron:定时周期表达式
     * @param id:自定义参数
     * @return
     */
    private static LiteJobConfiguration.Builder simpleJobConfigBuilder(String jobName,
                                                                       Class<? extends SimpleJob> jobClass,
                                                                       int shardingTotalCount,
                                                                       String cron,
                                                                       String id) {
        //创建任务构建对象
        LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.
                        //任务命名空间名字、任务执行周期表达式、分片个数
                        newBuilder(jobName, cron, shardingTotalCount).
                        //自定义参数
                        jobParameter(id).
                        build(),
                jobClass.getCanonicalName()));
        //本地配置是否可覆盖注册中心配置
        builder.overwrite(true);
        return builder;
    }
 
    /**
     * 添加一个定时任务
     * @param cron:周期执行表达式
     * @param id:自定义参数
     * @param jobName:命名空间
     * @param instance:任务对象
     */
    public void addPublishJob(String cron,String id,String jobName,SimpleJob instance) {
        LiteJobConfiguration jobConfig = simpleJobConfigBuilder(
                jobName,
                instance.getClass(),
                1,
                cron,
                id).overwrite(true).build();
        //DynamicTask为具体的任务执行逻辑类
        new SpringJobScheduler(instance, registryCenter, jobConfig, elasticJobListener).init();
    }
 
    /***
     * Date转cron表达式
     */
    public static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";
 
    /**
     * 获得定时
     * @param date
     * @return
     */
    public static String getCron(final Date date) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(CRON_DATE_FORMAT);
        return simpleDateFormat.format(date);
    }
}

4)执行逻辑

我们接着创建一个类,用于执行自己所需要操作的逻辑,com.seckill.goods.task.dynamic.DynamicTask,代码如下:

public class DynamicTask implements SimpleJob {
 
    @Override
    public void execute(ShardingContext shardingContext) {
        //传递的参数
        String id = shardingContext.getJobParameter();
        try {
            //具体任务逻辑
            System.out.println("执行你的逻辑代码!param:"+id);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

5)调用测试

创建com.seckill.goods.controller.TaskController动态调用创建任务的方法,代码如下:

@RestController
@RequestMapping(value = "/task")
public class TaskController {
 
    @Autowired
    ElasticJobHandler elasticJobHandler;
 
    /***
     * 动态创建任务
     * @param times:延迟时间,为了测试到效果,所以在当前时间往后延迟
     * @param jobname:任务名字
     * @param param:自定义参数
     * @return
     */
    @GetMapping
    public Result add(Long times,String jobname,String param){
        //在当前指定时间内延迟times毫秒执行任务
        Date date = new Date(System.currentTimeMillis()+times);
        //需要传递给定时任务的参数
        String cron = ElasticJobHandler.getCron(date);
 
        //执行任务
        elasticJobHandler.addPublishJob(cron,param,jobname,new DynamicTask());
        return new Result(true, StatusCode.OK,"添加任务成功!");
    }
}

6)测试

访问:http://localhost:18081/task?times=15000&jobname=asyncname&param=No001

后台执行效果如下: