1、改造目标

  • 调度器、执行器全部随机端口,注册到注册中心
  • 在执行器随机端口的情况下查看日志功能正常
  • 调度器掉线后,执行器能够重新向调度器注册

2、定义CLOUD执行器 XxlJobCloudExecutor

@Slf4j
public class XxlJobCloudExecutor extends XxlJobSpringExecutor {


    @Override
    public void start() throws Exception {
        super.start();
    }

    public XxlJobCloudExecutor(DiscoveryClient discoveryClient, String xxlAdminServiceId) {
        List<ServiceInstance> serviceInstances = null;
        while (true) {
            serviceInstances = discoveryClient.getInstances(xxlAdminServiceId);
            if (!CollectionUtils.isEmpty(serviceInstances)) {
                break;
            }
            log.info("未查询到调度服务,执行器将等待");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        List<String> addrs = DiscoveryClientUtil.getAddrList(discoveryClient,xxlAdminServiceId);
        setAdminAddresses(StringUtils.join(addrs, ','));
    }
}

3、定义CLOUD执行器

    @EventListener
    public void createExecuor(InstanceRegisteredEvent event){
        ConfigurableApplicationContext context = (ConfigurableApplicationContext) this.applicationContext;

        DiscoveryClient discoveryClient = context.getBean(DiscoveryClient.class);
        this.discoveryClient = discoveryClient;
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) context.getBeanFactory();
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(XxlJobCloudExecutor.class);
        builder.setInitMethodName("start");
        builder.setDestroyMethodName("destroy");
        builder.addConstructorArgValue(discoveryClient);
        builder.addConstructorArgValue(serviceId);

        builder.addPropertyValue("appName", appName);
        builder.addPropertyValue("accessToken", accessToken);
        builder.addPropertyValue("appTitle", appTitle);
        builder.addPropertyValue("logPath", logPath);
        builder.addPropertyValue("port", executorPort);
        builder.addPropertyValue("logRetentionDays", logRetentionDays);
        logger.info(">>>>>>>>>>> dsp-job config init.");
        defaultListableBeanFactory.registerBeanDefinition("xxlJobExecutor", builder.getBeanDefinition());

        if (null == this.executor) {
            this.executor = defaultListableBeanFactory.getBean("xxlJobExecutor", XxlJobCloudExecutor.class);
            applicationInfoManager.getInfo().getMetadata().put(EXECUTOR_PORT, executorPort.toString());
            applicationInfoManager.getInfo().setIsDirty();
        }
    }

4、调度器掉线执行器重新注册

    @EventListener
    public void refreshAdminAddr(HeartbeatEvent event) {

        if (null == discoveryClient || null == this.executor) {
            return;
        }

        List<String> addrs = DiscoveryClientUtil.getAddrList(discoveryClient,serviceId);
        String origin = this.executor.getAdminAddresses();
        String now = StringUtils.join(addrs, ',');
        if (!origin.equals(now)) {
            this.executor.resetAdminBizList(now);
        }
    }

5、执行器随机端口查看日志

@RequestMapping("/logDetailCat")
    @ResponseBody
    public ReturnT<LogResult> logDetailCat(String executorAddress, Long triggerTime, Integer logId, Integer fromLineNum){
        try {

            XxlJobLog xxlJobLog = xxlJobLogDao.load(logId);
            XxlJobGroup xxlJobGroup = xxlJobGroupDao.load(xxlJobLog.getJobGroup());
            String serviceName = xxlJobGroup.getAppName();

            Optional<String> optional = discoveryClient.getInstances(serviceName).stream().filter(serviceInstance -> {
                String addr = serviceInstance.getHost() + ":" + serviceInstance.getMetadata().get(EXECUTOR_PORT);
                log.debug("addr={}",addr);
                try {
                    ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(addr);
                    ReturnT<String> res = executorBiz.hasLog(triggerTime, logId);
                    log.debug("has log ::{}", res);
                    if (res.getCode()==200) {
                        return true;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return false;
            }).findFirst().map(serviceInstance -> serviceInstance.getHost()+":"+serviceInstance.getMetadata().get(EXECUTOR_PORT));

            if (!optional.isPresent()) {
                throw new Exception("未找到日志,日志id为"+logId);
            }

            ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(optional.get());
            ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum);

            // is end
            if (logResult.getContent()!=null && logResult.getContent().getFromLineNum() > logResult.getContent().getToLineNum()) {
                XxlJobLog jobLog = xxlJobLogDao.load(logId);
                if (jobLog.getHandleCode() > 0) {
                    logResult.getContent().setEnd(true);
                }
            }

            return logResult;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new ReturnT<LogResult>(ReturnT.FAIL_CODE, e.getMessage());
        }
    }

6、执行器判断日志是否存在

    @Override
    public ReturnT<String> hasLog(long logDateTim, int logId) {

        String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId);
        boolean exists = Files.isReadable(Paths.get(logFileName));

        if (exists) {
            return  ReturnT.SUCCESS;
        }

        return ReturnT.FAIL;
    }


SPRING CLOUD      xxl-job

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!