一、XXL-JOB 版本:
com.xuxueli xxl-job-core2.2.0
每个版本差异比较大,大家可以尝试下载几个版本,阅读一下源码,看别人的写法也是一种享受。
二、源码分析:
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class); private static ApplicationContext applicationContext; public XxlJobSpringExecutor() { } public void afterSingletonsInstantiated() { //代码入口,bean完成初装配后,织入的代码 this.initJobHandlerMethodRepository(applicationContext); GlueFactory.refreshInstance(1); try { super.start();//調用XxlJobExecutor.start()方法 } catch (Exception var2) { throw new RuntimeException(var2); } } public void destroy() { super.destroy(); } private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext != null) { String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true); String[] var3 = beanDefinitionNames; int var4 = beanDefinitionNames.length; label86: for(int var5 = 0; var5 < var4; ++var5) { String beanDefinitionName = var3[var5]; Object bean = applicationContext.getBean(beanDefinitionName); Map annotatedMethods = null; try { //查找方法上有@XxlJob注解的方法 annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), new metadataLookup() { public XxlJob inspect(Method method) { return (XxlJob)AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class); } }); } catch (Throwable var19) { logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", var19); } if (annotatedMethods != null && !annotatedMethods.isEmpty()) { Iterator var9 = annotatedMethods.entrySet().iterator(); while(true) { Method method; XxlJob xxlJob; do { if (!var9.hasNext()) { continue label86; } Entry methodXxlJobEntry = (Entry)var9.next(); method = (Method)methodXxlJobEntry.getKey(); xxlJob = (XxlJob)methodXxlJobEntry.getValue(); } while(xxlJob == null); String name = xxlJob.value(); if (name.trim().length() == 0) { throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } //判段是否已經存在相同的處理器 if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } if (method.getParameterTypes().length != 1 || !method.getParameterTypes()[0].isAssignableFrom(String.class)) { throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , The correct method format like " public ReturnT execute(String param) " ."); } if (!method.getReturnType().isAssignableFrom(ReturnT.class)) { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , The correct method format like " public ReturnT execute(String param) " ."); } method.setAccessible(true); Method initMethod = null; Method destroyMethod = null; if (xxlJob.init().trim().length() > 0) { try { initMethod = bean.getClass().getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException var18) { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } } if (xxlJob.destroy().trim().length() > 0) { try { destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy()); destroyMethod.setAccessible(true); } catch (NoSuchMethodException var17) { throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } } 注冊到job處理容器中 registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod)); } } } } } public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { //第一步,容器启动完成,初始化上下文对象 XxlJobSpringExecutor.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { return applicationContext; } }
重點查看XxlJobExecutor的start()方法:
public class XxlJobExecutor { private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); private String adminAddresses; private String accessToken; private String appname; private String address; private String ip; private int port; private String logPath; private int logRetentionDays; private static List adminBizList; private EmbedServer embedServer = null; private static ConcurrentMapjobHandlerRepository = new ConcurrentHashMap(); private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap(); public XxlJobExecutor() { } public void setAdminAddresses(String adminAddresses) { this.adminAddresses = adminAddresses; } public void setAccessToken(String accessToken) { this.accessToken = accessToken; } public void setAppname(String appname) { this.appname = appname; } public void setAddress(String address) { this.address = address; } public void setIp(String ip) { this.ip = ip; } public void setPort(int port) { this.port = port; } public void setLogPath(String logPath) { this.logPath = logPath; } public void setLogRetentionDays(int logRetentionDays) { this.logRetentionDays = logRetentionDays; } public void start() throws Exception { XxlJobFileAppender.initLogPath(this.logPath); this.initAdminBizList(this.adminAddresses, this.accessToken); JobLogFileCleanThread.getInstance().start((long)this.logRetentionDays); TriggerCallbackThread.getInstance().start(); //使用netty起用服務端监听 this.initEmbedServer(this.address, this.ip, this.port, this.appname, this.accessToken); } public void destroy() { this.stopEmbedServer(); if (jobThreadRepository.size() > 0) { Iterator var1 = jobThreadRepository.entrySet().iterator(); while(var1.hasNext()) { Entry item = (Entry)var1.next(); JobThread oldJobThread = removeJobThread((Integer)item.getKey(), "web container destroy and kill the job."); if (oldJobThread != null) { try { oldJobThread.join(); } catch (InterruptedException var5) { logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), var5); } } } jobThreadRepository.clear(); } jobHandlerRepository.clear(); JobLogFileCleanThread.getInstance().toStop(); TriggerCallbackThread.getInstance().toStop(); } private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses != null && adminAddresses.trim().length() > 0) { String[] var3 = adminAddresses.trim().split(","); int var4 = var3.length; for(int var5 = 0; var5 < var4; ++var5) { String address = var3[var5]; if (address != null && address.trim().length() > 0) { AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); if (adminBizList == null) { adminBizList = new ArrayList(); } adminBizList.add(adminBiz); } } } } public static List getAdminBizList() { return adminBizList; } private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { port = port > 0 ? port : NetUtil.findAvailablePort(9999); ip = ip != null && ip.trim().length() > 0 ? ip : IpUtil.getIp(); if (address == null || address.trim().length() == 0) { String ip_port_address = IpUtil.getIpPort(ip, port); address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); } this.embedServer = new EmbedServer(); this.embedServer.start(address, port, appname, accessToken); } private void stopEmbedServer() { try { this.embedServer.stop(); } catch (Exception var2) { logger.error(var2.getMessage(), var2); } } public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) { logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); return (IJobHandler)jobHandlerRepository.put(name, jobHandler); } public static IJobHandler loadJobHandler(String name) { return (IJobHandler)jobHandlerRepository.get(name); } public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) { JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); JobThread oldJobThread = (JobThread)jobThreadRepository.put(jobId, newJobThread); if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; } public static JobThread removeJobThread(int jobId, String removeOldReason) { JobThread oldJobThread = (JobThread)jobThreadRepository.remove(jobId); if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); return oldJobThread; } else { return null; } } public static JobThread loadJobThread(int jobId) { JobThread jobThread = (JobThread)jobThreadRepository.get(jobId); return jobThread; } }
public class EmbedServer { private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class); private ExecutorBiz executorBiz; private Thread thread; public EmbedServer() { } //启用netty服务 public void start(final String address, final int port, final String appname, final String accessToken) { this.executorBiz = new ExecutorBizImpl(); this.thread = new Thread(new Runnable() { public void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); final ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0, 200, 60L, TimeUnit.SECONDS, new linkedBlockingQueue(2000), new ThreadFactory() { public Thread newThread(Runnable r) { return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); } }); try { ServerBootstrap bootstrap = new ServerBootstrap(); ((ServerBootstrap)bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer() { public void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 90L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new EmbedServer.EmbedHttpServerHandler(EmbedServer.this.executorBiz, accessToken, bizThreadPool)}); } }).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.bind(port).sync(); EmbedServer.logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); EmbedServer.this.startRegistry(appname, address); future.channel().closeFuture().sync(); } catch (InterruptedException var14) { if (var14 instanceof InterruptedException) { EmbedServer.logger.info(">>>>>>>>>>> xxl-job remoting server stop."); } else { EmbedServer.logger.error(">>>>>>>>>>> xxl-job remoting server error.", var14); } } finally { try { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } catch (Exception var13) { EmbedServer.logger.error(var13.getMessage(), var13); } } } }); this.thread.setDaemon(true); this.thread.start(); } public void stop() throws Exception { if (this.thread != null && this.thread.isAlive()) { this.thread.interrupt(); } this.stopRegistry(); logger.info(">>>>>>>>>>> xxl-job remoting server destroy success."); } public void startRegistry(String appname, String address) { ExecutorRegistryThread.getInstance().start(appname, address); } public void stopRegistry() { ExecutorRegistryThread.getInstance().toStop(); } public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(EmbedServer.EmbedHttpServerHandler.class); private ExecutorBiz executorBiz; private String accessToken; private ThreadPoolExecutor bizThreadPool; public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) { this.executorBiz = executorBiz; this.accessToken = accessToken; this.bizThreadPool = bizThreadPool; } protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { final String requestData = msg.content().toString(CharsetUtil.UTF_8); final String uri = msg.uri(); final HttpMethod httpMethod = msg.method(); final boolean keepAlive = HttpUtil.isKeepAlive(msg); final String accessTokenReq = msg.headers().get("XXL-JOB-ACCESS-TOKEN"); this.bizThreadPool.execute(new Runnable() { public void run() { Object responseObj = EmbedHttpServerHandler.this.process(httpMethod, uri, requestData, accessTokenReq); String responseJson = GsonTool.toJson(responseObj); EmbedHttpServerHandler.this.writeResponse(ctx, keepAlive, responseJson); } }); } private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { if (HttpMethod.POST != httpMethod) { return new ReturnT(500, "invalid request, HttpMethod not support."); } else if (uri != null && uri.trim().length() != 0) { if (this.accessToken != null && this.accessToken.trim().length() > 0 && !this.accessToken.equals(accessTokenReq)) { return new ReturnT(500, "The access token is wrong."); } else { try { if ("/beat".equals(uri)) { return this.executorBiz.beat(); } else if ("/idleBeat".equals(uri)) { IdleBeatParam idleBeatParam = (IdleBeatParam)GsonTool.fromJson(requestData, IdleBeatParam.class); return this.executorBiz.idleBeat(idleBeatParam); } else if ("/run".equals(uri)) { TriggerParam triggerParam = (TriggerParam)GsonTool.fromJson(requestData, TriggerParam.class); return this.executorBiz.run(triggerParam); } else if ("/kill".equals(uri)) { KillParam killParam = (KillParam)GsonTool.fromJson(requestData, KillParam.class); return this.executorBiz.kill(killParam); } else if ("/log".equals(uri)) { LogParam logParam = (LogParam)GsonTool.fromJson(requestData, LogParam.class); return this.executorBiz.log(logParam); } else { return new ReturnT(500, "invalid request, uri-mapping(" + uri + ") not found."); } } catch (Exception var6) { logger.error(var6.getMessage(), var6); return new ReturnT(500, "request error:" + ThrowableUtil.toString(var6)); } } } else { return new ReturnT(500, "invalid request, uri-mapping empty."); } } private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); if (keepAlive) { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); } ctx.writeAndFlush(response); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause); ctx.close(); } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { ctx.channel().close(); logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel."); } else { super.userEventTriggered(ctx, evt); } } } }
配置文件:
# 调度中心部署跟地址:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调" xxl.job.admin.addresses=http://192.168.115.128:8080/xxl-job-admin # 执行器"AppName"和地址信息配置:AppName执行器心跳注册分组依据;地址信息用于"调度中心请求并触发任务"和"执行器注册"。执行器默认端口为9999,执行器IP默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用。单机部署多个执行器时,注意要配置不同执行器端口 xxl.job.executor.appname=xxl-job-spring xxl.job.executor.ip=192.168.0.107 xxl.job.executor.port=9999 # 执行器通讯TOKEN,非空时启用 xxl.job.accessToken= # 执行器运行日志文件存储的磁盘位置,需要对该路径拥有读写权限 xxl.job.executor.logpath=D: # 执行器Log文件定期清理功能,指定日志保存天数,日志文件过期自动删除。限制至少保持3天,否则功能不生效; xxl.job.executor.logretentiondays=-1
代码示范:
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; //import com.xxl.job.core.handler.annotation.JobHandler; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; import org.springframework.stereotype.Component; @Component public class DemoJobHandler extends IJobHandler { @Override @XxlJob(value = "testJobHandler") public ReturnTexecute(String param) { XxlJobLogger.log("XXL-JOB, testJobHandler."); System.out.println("XXL-JOB测试"); return SUCCESS; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)