1、改造目标
- 调度器、执行器全部随机端口,注册到注册中心
- 在执行器随机端口的情况下查看日志功能正常
- 调度器掉线后,执行器能够重新向调度器注册
2、定义CLOUD执行器 XxlJobCloudExecutor
@Slf4j
public class XxlJobCloudExecutor extends XxlJobSpringExecutor {
@Override
public void start() throws Exception {
super.start();
}
public XxlJobCloudExecutor(DiscoveryClient discoveryClient, String xxlAdminServiceId) {
List<ServiceInstance> serviceInstances = null;
while (true) {
serviceInstances = discoveryClient.getInstances(xxlAdminServiceId);
if (!CollectionUtils.isEmpty(serviceInstances)) {
break;
}
log.info("未查询到调度服务,执行器将等待");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
List<String> addrs = DiscoveryClientUtil.getAddrList(discoveryClient,xxlAdminServiceId);
setAdminAddresses(StringUtils.join(addrs, ','));
}
}
3、定义CLOUD执行器
@EventListener
public void createExecuor(InstanceRegisteredEvent event){
ConfigurableApplicationContext context = (ConfigurableApplicationContext) this.applicationContext;
DiscoveryClient discoveryClient = context.getBean(DiscoveryClient.class);
this.discoveryClient = discoveryClient;
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) context.getBeanFactory();
BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(XxlJobCloudExecutor.class);
builder.setInitMethodName("start");
builder.setDestroyMethodName("destroy");
builder.addConstructorArgValue(discoveryClient);
builder.addConstructorArgValue(serviceId);
builder.addPropertyValue("appName", appName);
builder.addPropertyValue("accessToken", accessToken);
builder.addPropertyValue("appTitle", appTitle);
builder.addPropertyValue("logPath", logPath);
builder.addPropertyValue("port", executorPort);
builder.addPropertyValue("logRetentionDays", logRetentionDays);
logger.info(">>>>>>>>>>> dsp-job config init.");
defaultListableBeanFactory.registerBeanDefinition("xxlJobExecutor", builder.getBeanDefinition());
if (null == this.executor) {
this.executor = defaultListableBeanFactory.getBean("xxlJobExecutor", XxlJobCloudExecutor.class);
applicationInfoManager.getInfo().getMetadata().put(EXECUTOR_PORT, executorPort.toString());
applicationInfoManager.getInfo().setIsDirty();
}
}
4、调度器掉线执行器重新注册
@EventListener
public void refreshAdminAddr(HeartbeatEvent event) {
if (null == discoveryClient || null == this.executor) {
return;
}
List<String> addrs = DiscoveryClientUtil.getAddrList(discoveryClient,serviceId);
String origin = this.executor.getAdminAddresses();
String now = StringUtils.join(addrs, ',');
if (!origin.equals(now)) {
this.executor.resetAdminBizList(now);
}
}
5、执行器随机端口查看日志
@RequestMapping("/logDetailCat")
@ResponseBody
public ReturnT<LogResult> logDetailCat(String executorAddress, Long triggerTime, Integer logId, Integer fromLineNum){
try {
XxlJobLog xxlJobLog = xxlJobLogDao.load(logId);
XxlJobGroup xxlJobGroup = xxlJobGroupDao.load(xxlJobLog.getJobGroup());
String serviceName = xxlJobGroup.getAppName();
Optional<String> optional = discoveryClient.getInstances(serviceName).stream().filter(serviceInstance -> {
String addr = serviceInstance.getHost() + ":" + serviceInstance.getMetadata().get(EXECUTOR_PORT);
log.debug("addr={}",addr);
try {
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(addr);
ReturnT<String> res = executorBiz.hasLog(triggerTime, logId);
log.debug("has log ::{}", res);
if (res.getCode()==200) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}).findFirst().map(serviceInstance -> serviceInstance.getHost()+":"+serviceInstance.getMetadata().get(EXECUTOR_PORT));
if (!optional.isPresent()) {
throw new Exception("未找到日志,日志id为"+logId);
}
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(optional.get());
ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum);
// is end
if (logResult.getContent()!=null && logResult.getContent().getFromLineNum() > logResult.getContent().getToLineNum()) {
XxlJobLog jobLog = xxlJobLogDao.load(logId);
if (jobLog.getHandleCode() > 0) {
logResult.getContent().setEnd(true);
}
}
return logResult;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<LogResult>(ReturnT.FAIL_CODE, e.getMessage());
}
}
6、执行器判断日志是否存在
@Override
public ReturnT<String> hasLog(long logDateTim, int logId) {
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId);
boolean exists = Files.isReadable(Paths.get(logFileName));
if (exists) {
return ReturnT.SUCCESS;
}
return ReturnT.FAIL;
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!