中间件:定时调度框架-XXL-JOB-源码分析

中间件:定时调度框架-XXL-JOB-源码分析,第1张

中间件:定时调度框架-XXL-JOB-源码分析

一、XXL-JOB 版本:

    
      com.xuxueli
      xxl-job-core
      2.2.0
    

  每个版本差异比较大,大家可以尝试下载几个版本,阅读一下源码,看别人的写法也是一种享受。

二、源码分析:

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
    private static ApplicationContext applicationContext;

    public XxlJobSpringExecutor() {
    }

    public void afterSingletonsInstantiated() {
        //代码入口,bean完成初装配后,织入的代码
        this.initJobHandlerMethodRepository(applicationContext);
        GlueFactory.refreshInstance(1);

        try {
            super.start();//調用XxlJobExecutor.start()方法
        } catch (Exception var2) {
            throw new RuntimeException(var2);
        }
    }

    public void destroy() {
        super.destroy();
    }

    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext != null) {
            String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
            String[] var3 = beanDefinitionNames;
            int var4 = beanDefinitionNames.length;

            label86:
            for(int var5 = 0; var5 < var4; ++var5) {
                String beanDefinitionName = var3[var5];
                Object bean = applicationContext.getBean(beanDefinitionName);
                Map annotatedMethods = null;

                try {
                    //查找方法上有@XxlJob注解的方法
                    annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), new metadataLookup() {
                        public XxlJob inspect(Method method) {
                            return (XxlJob)AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
                } catch (Throwable var19) {
                    logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", var19);
                }

                if (annotatedMethods != null && !annotatedMethods.isEmpty()) {
                    Iterator var9 = annotatedMethods.entrySet().iterator();

                    while(true) {
                        Method method;
                        XxlJob xxlJob;
                        do {
                            if (!var9.hasNext()) {
                                continue label86;
                            }

                            Entry methodXxlJobEntry = (Entry)var9.next();
                            method = (Method)methodXxlJobEntry.getKey();
                            xxlJob = (XxlJob)methodXxlJobEntry.getValue();
                        } while(xxlJob == null);

                        String name = xxlJob.value();
                        if (name.trim().length() == 0) {
                            throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                        }
                        //判段是否已經存在相同的處理器
                        if (loadJobHandler(name) != null) {
                            throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                        }

                        if (method.getParameterTypes().length != 1 || !method.getParameterTypes()[0].isAssignableFrom(String.class)) {
                            throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , The correct method format like " public ReturnT execute(String param) " .");
                        }

                        if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                            throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , The correct method format like " public ReturnT execute(String param) " .");
                        }

                        method.setAccessible(true);
                        Method initMethod = null;
                        Method destroyMethod = null;
                        if (xxlJob.init().trim().length() > 0) {
                            try {
                                initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                                initMethod.setAccessible(true);
                            } catch (NoSuchMethodException var18) {
                                throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                            }
                        }

                        if (xxlJob.destroy().trim().length() > 0) {
                            try {
                                destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                                destroyMethod.setAccessible(true);
                            } catch (NoSuchMethodException var17) {
                                throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                            }
                        }
                        注冊到job處理容器中
                        registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
                    }
                }
            }

        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //第一步,容器启动完成,初始化上下文对象
        XxlJobSpringExecutor.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
}

重點查看XxlJobExecutor的start()方法: 

public class XxlJobExecutor {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
    private String adminAddresses;
    private String accessToken;
    private String appname;
    private String address;
    private String ip;
    private int port;
    private String logPath;
    private int logRetentionDays;
    private static List adminBizList;
    private EmbedServer embedServer = null;
    private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap();
    private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap();

    public XxlJobExecutor() {
    }

    public void setAdminAddresses(String adminAddresses) {
        this.adminAddresses = adminAddresses;
    }

    public void setAccessToken(String accessToken) {
        this.accessToken = accessToken;
    }

    public void setAppname(String appname) {
        this.appname = appname;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public void setLogPath(String logPath) {
        this.logPath = logPath;
    }

    public void setLogRetentionDays(int logRetentionDays) {
        this.logRetentionDays = logRetentionDays;
    }

    public void start() throws Exception {
        XxlJobFileAppender.initLogPath(this.logPath);
        this.initAdminBizList(this.adminAddresses, this.accessToken);
        JobLogFileCleanThread.getInstance().start((long)this.logRetentionDays);
        TriggerCallbackThread.getInstance().start();
        //使用netty起用服務端监听
        this.initEmbedServer(this.address, this.ip, this.port, this.appname, this.accessToken);
    }

    public void destroy() {
        this.stopEmbedServer();
        if (jobThreadRepository.size() > 0) {
            Iterator var1 = jobThreadRepository.entrySet().iterator();

            while(var1.hasNext()) {
                Entry item = (Entry)var1.next();
                JobThread oldJobThread = removeJobThread((Integer)item.getKey(), "web container destroy and kill the job.");
                if (oldJobThread != null) {
                    try {
                        oldJobThread.join();
                    } catch (InterruptedException var5) {
                        logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), var5);
                    }
                }
            }

            jobThreadRepository.clear();
        }

        jobHandlerRepository.clear();
        JobLogFileCleanThread.getInstance().toStop();
        TriggerCallbackThread.getInstance().toStop();
    }

    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        if (adminAddresses != null && adminAddresses.trim().length() > 0) {
            String[] var3 = adminAddresses.trim().split(",");
            int var4 = var3.length;

            for(int var5 = 0; var5 < var4; ++var5) {
                String address = var3[var5];
                if (address != null && address.trim().length() > 0) {
                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
                    if (adminBizList == null) {
                        adminBizList = new ArrayList();
                    }

                    adminBizList.add(adminBiz);
                }
            }
        }

    }

    public static List getAdminBizList() {
        return adminBizList;
    }

    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
        port = port > 0 ? port : NetUtil.findAvailablePort(9999);
        ip = ip != null && ip.trim().length() > 0 ? ip : IpUtil.getIp();
        if (address == null || address.trim().length() == 0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }

        this.embedServer = new EmbedServer();
        this.embedServer.start(address, port, appname, accessToken);
    }

    private void stopEmbedServer() {
        try {
            this.embedServer.stop();
        } catch (Exception var2) {
            logger.error(var2.getMessage(), var2);
        }

    }

    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
        logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
        return (IJobHandler)jobHandlerRepository.put(name, jobHandler);
    }

    public static IJobHandler loadJobHandler(String name) {
        return (IJobHandler)jobHandlerRepository.get(name);
    }

    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) {
        JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
        JobThread oldJobThread = (JobThread)jobThreadRepository.put(jobId, newJobThread);
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }

        return newJobThread;
    }

    public static JobThread removeJobThread(int jobId, String removeOldReason) {
        JobThread oldJobThread = (JobThread)jobThreadRepository.remove(jobId);
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
            return oldJobThread;
        } else {
            return null;
        }
    }

    public static JobThread loadJobThread(int jobId) {
        JobThread jobThread = (JobThread)jobThreadRepository.get(jobId);
        return jobThread;
    }
}

 

public class EmbedServer {
    private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);
    private ExecutorBiz executorBiz;
    private Thread thread;

    public EmbedServer() {
    }

   
    //启用netty服务
    public void start(final String address, final int port, final String appname, final String accessToken) {
        this.executorBiz = new ExecutorBizImpl();
        this.thread = new Thread(new Runnable() {
            public void run() {
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                final ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0, 200, 60L, TimeUnit.SECONDS, new linkedBlockingQueue(2000), new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                    }
                }, new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                    }
                });

                try {
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    ((ServerBootstrap)bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer() {
                        public void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 90L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new EmbedServer.EmbedHttpServerHandler(EmbedServer.this.executorBiz, accessToken, bizThreadPool)});
                        }
                    }).childOption(ChannelOption.SO_KEEPALIVE, true);
                    ChannelFuture future = bootstrap.bind(port).sync();
                    EmbedServer.logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
                    EmbedServer.this.startRegistry(appname, address);
                    future.channel().closeFuture().sync();
                } catch (InterruptedException var14) {
                    if (var14 instanceof InterruptedException) {
                        EmbedServer.logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                    } else {
                        EmbedServer.logger.error(">>>>>>>>>>> xxl-job remoting server error.", var14);
                    }
                } finally {
                    try {
                        workerGroup.shutdownGracefully();
                        bossGroup.shutdownGracefully();
                    } catch (Exception var13) {
                        EmbedServer.logger.error(var13.getMessage(), var13);
                    }

                }

            }
        });
        this.thread.setDaemon(true);
        this.thread.start();
    }

    public void stop() throws Exception {
        if (this.thread != null && this.thread.isAlive()) {
            this.thread.interrupt();
        }

        this.stopRegistry();
        logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");
    }

    public void startRegistry(String appname, String address) {
        ExecutorRegistryThread.getInstance().start(appname, address);
    }

    public void stopRegistry() {
        ExecutorRegistryThread.getInstance().toStop();
    }

    public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler {
        private static final Logger logger = LoggerFactory.getLogger(EmbedServer.EmbedHttpServerHandler.class);
        private ExecutorBiz executorBiz;
        private String accessToken;
        private ThreadPoolExecutor bizThreadPool;

        public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
            this.executorBiz = executorBiz;
            this.accessToken = accessToken;
            this.bizThreadPool = bizThreadPool;
        }

        protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
            final String requestData = msg.content().toString(CharsetUtil.UTF_8);
            final String uri = msg.uri();
            final HttpMethod httpMethod = msg.method();
            final boolean keepAlive = HttpUtil.isKeepAlive(msg);
            final String accessTokenReq = msg.headers().get("XXL-JOB-ACCESS-TOKEN");
            this.bizThreadPool.execute(new Runnable() {
                public void run() {
                    Object responseObj = EmbedHttpServerHandler.this.process(httpMethod, uri, requestData, accessTokenReq);
                    String responseJson = GsonTool.toJson(responseObj);
                    EmbedHttpServerHandler.this.writeResponse(ctx, keepAlive, responseJson);
                }
            });
        }

        private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
            if (HttpMethod.POST != httpMethod) {
                return new ReturnT(500, "invalid request, HttpMethod not support.");
            } else if (uri != null && uri.trim().length() != 0) {
                if (this.accessToken != null && this.accessToken.trim().length() > 0 && !this.accessToken.equals(accessTokenReq)) {
                    return new ReturnT(500, "The access token is wrong.");
                } else {
                    try {
                        if ("/beat".equals(uri)) {
                            return this.executorBiz.beat();
                        } else if ("/idleBeat".equals(uri)) {
                            IdleBeatParam idleBeatParam = (IdleBeatParam)GsonTool.fromJson(requestData, IdleBeatParam.class);
                            return this.executorBiz.idleBeat(idleBeatParam);
                        } else if ("/run".equals(uri)) {
                            TriggerParam triggerParam = (TriggerParam)GsonTool.fromJson(requestData, TriggerParam.class);
                            return this.executorBiz.run(triggerParam);
                        } else if ("/kill".equals(uri)) {
                            KillParam killParam = (KillParam)GsonTool.fromJson(requestData, KillParam.class);
                            return this.executorBiz.kill(killParam);
                        } else if ("/log".equals(uri)) {
                            LogParam logParam = (LogParam)GsonTool.fromJson(requestData, LogParam.class);
                            return this.executorBiz.log(logParam);
                        } else {
                            return new ReturnT(500, "invalid request, uri-mapping(" + uri + ") not found.");
                        }
                    } catch (Exception var6) {
                        logger.error(var6.getMessage(), var6);
                        return new ReturnT(500, "request error:" + ThrowableUtil.toString(var6));
                    }
                }
            } else {
                return new ReturnT(500, "invalid request, uri-mapping empty.");
            }
        }

        private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
            if (keepAlive) {
                response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            }

            ctx.writeAndFlush(response);
        }

        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);
            ctx.close();
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                ctx.channel().close();
                logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");
            } else {
                super.userEventTriggered(ctx, evt);
            }

        }
    }
}

 

配置文件:

# 调度中心部署跟地址:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调"
xxl.job.admin.addresses=http://192.168.115.128:8080/xxl-job-admin
# 执行器"AppName"和地址信息配置:AppName执行器心跳注册分组依据;地址信息用于"调度中心请求并触发任务"和"执行器注册"。执行器默认端口为9999,执行器IP默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用。单机部署多个执行器时,注意要配置不同执行器端口
xxl.job.executor.appname=xxl-job-spring
xxl.job.executor.ip=192.168.0.107
xxl.job.executor.port=9999
# 执行器通讯TOKEN,非空时启用
xxl.job.accessToken=
# 执行器运行日志文件存储的磁盘位置,需要对该路径拥有读写权限
xxl.job.executor.logpath=D:
# 执行器Log文件定期清理功能,指定日志保存天数,日志文件过期自动删除。限制至少保持3天,否则功能不生效;
xxl.job.executor.logretentiondays=-1

代码示范:

import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
//import com.xxl.job.core.handler.annotation.JobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import org.springframework.stereotype.Component;



@Component
public class DemoJobHandler extends IJobHandler {

    @Override
    @XxlJob(value = "testJobHandler")
    public ReturnT execute(String param) {
        XxlJobLogger.log("XXL-JOB, testJobHandler.");
        System.out.println("XXL-JOB测试");
        return SUCCESS;
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5685324.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存