黑马程序员技术交流社区

标题: 【上海校区】分布式作业系统 Elastic-Job-Lite 源码分析 —— ... [打印本页]

作者: 不二晨    时间: 2018-9-7 09:41
标题: 【上海校区】分布式作业系统 Elastic-Job-Lite 源码分析 —— ...
1. 概述
本文主要分享 Elastic-Job-Lite 作业初始化。
涉及到主要类的类图如下( 打开大图 ):
你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门
2. 作业注册表
作业注册表( JobRegistry ),维护了单个 Elastic-Job-Lite 进程内作业相关信息,可以理解成其专属的 Spring IOC 容器。因此,其本身是一个单例。
public final class JobRegistry {    /**     * 单例     */    private static volatile JobRegistry instance;    /**     * 作业调度控制器集合     * key:作业名称     */    private Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap<>();    /**     * 注册中心集合     * key:作业名称     */    private Map<String, CoordinatorRegistryCenter> regCenterMap = new ConcurrentHashMap<>();    /**     * 作业运行实例集合     * key:作业名称     */    private Map<String, JobInstance> jobInstanceMap = new ConcurrentHashMap<>();    /**     * 运行中作业集合     * key:作业名字     */    private Map<String, Boolean> jobRunningMap = new ConcurrentHashMap<>();    /**     * 作业总分片数量集合     * key:作业名字     */    private Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap<>();        /**     * 获取作业注册表实例.     *      * @return 作业注册表实例     */    public static JobRegistry getInstance() {        if (null == instance) {            synchronized (JobRegistry.class) {                if (null == instance) {                    instance = new JobRegistry();                }            }        }        return instance;    }        // .... 省略方法}3. 作业调度器
作业调度器( JobScheduler ),创建并初始化后,进行作业调度。
Elastic-Job-Lite 使用 Quartz 作为调度内核。
3.1 创建public class JobScheduler {    /**     * Lite作业配置     */    private final LiteJobConfiguration liteJobConfig;    /**     * 注册中心     */    private final CoordinatorRegistryCenter regCenter;    /**     * 调度器门面对象     */    @Getter    private final SchedulerFacade schedulerFacade;    /**     * 作业门面对象     */    private final JobFacade jobFacade;        public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final ElasticJobListener... elasticJobListeners) {        this(regCenter, liteJobConfig, new JobEventBus(), elasticJobListeners);    }        public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig,                         final ElasticJobListener... elasticJobListeners) {        this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);    }        private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {        // 添加 作业运行实例        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());        // 设置 Lite作业配置        this.liteJobConfig = liteJobConfig;        this.regCenter = regCenter;        // 设置 作业监听器        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);        // 设置 调度器门面对象        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);        // 设置 作业门面对象        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);    }}
SchedulerFacade 和 LiteJobFacade,看起来很相近,实际差别很大。它们分别为调度器、作业提供需要的方法。下文也会体现这一特点。
3.2 初始化
作业调度器创建后,调用 #init() 方法初始化,作业方开始调度。
/*** 初始化作业.*/public void init() {   // 更新 作业配置   LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);   // 设置 当前作业分片总数   JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());   // 创建 作业调度控制器   JobScheduleController jobScheduleController = new JobScheduleController(           createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());   // 添加 作业调度控制器   JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);   // 注册 作业启动信息   schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());   // 调度作业   jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());}3.2.1 更新作业配置// SchedulerFacade.java/*** 更新作业配置.** @param liteJobConfig 作业配置* @return 更新后的作业配置*/public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {   // 更新 作业配置   configService.persist(liteJobConfig);   // 读取 作业配置   return configService.load(false);}3.2.2 设置当前作业分片总数// JobRegistry.javaprivate Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap<>();/*** 设置当前分片总数.** @param jobName 作业名称* @param currentShardingTotalCount 当前分片总数*/public void setCurrentShardingTotalCount(final String jobName, final int currentShardingTotalCount) {   currentShardingTotalCountMap.put(jobName, currentShardingTotalCount);}3.2.3 创建作业调度控制器public void init() {   // .... 省略   // 创建 作业调度控制器   JobScheduleController jobScheduleController = new JobScheduleController(           createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());   // .... 省略}3.2.4 注册作业启动信息/*** 注册作业启动信息.* * @param enabled 作业是否启用*/public void registerStartUpInfo(final boolean enabled) {   // 开启 所有监听器   listenerManager.startAllListeners();   // 选举 主节点   leaderService.electLeader();   // 持久化 作业服务器上线信息   serverService.persistOnline(enabled);   // 持久化 作业运行实例上线相关信息   instanceService.persistOnline();   // 设置 需要重新分片的标记   shardingService.setReshardingFlag();   // 初始化 作业监听服务   monitorService.listen();   // 初始化 调解作业不一致状态服务   if (!reconcileService.isRunning()) {       reconcileService.startAsync();   }}3.2.5 调度作业// JobScheduler.javapublic void init() {   // .... 省略部分代码   // 调度作业   jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());}// JobScheduleController.java/*** 调度作业.** @param cron CRON表达式*/public void scheduleJob(final String cron) {   try {       if (!scheduler.checkExists(jobDetail.getKey())) {           scheduler.scheduleJob(jobDetail, createTrigger(cron));       }       scheduler.start();   } catch (final SchedulerException ex) {       throw new JobSystemException(ex);   }}

作者: 不二晨    时间: 2018-9-13 16:19

很不错,受教了
作者: zhengyh    时间: 2020-7-5 00:38
6666666666666




欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2