目录
背景
代码地址
方案灵感来源
Zookeeper结构设计
流程图
接入方式
说明
背景
将一个大任务划分成几个小任务,由几个实例分别运行这些小任务,可以提高任务的执行效率。
举例:
clock.ip.fdd闹钟服务,需要将闹钟数据从mysql表读取到内存中,多个实例的情况下,是用分布式锁来解决读取数据的冲突。
这样是能解决问题,但是读取数据到内存中就变成一个串行的 *** 作,当某一刻积累了大量的闹钟数据,可能会出现加载不及时导致闹钟延迟的情况。
如果把存储闹钟的表拆分成8个表,如果2个实例,则每个实例负责读取4个表的数据;如果4个实例,则每个实例负责读取2个表的数据。
多个实例之间是并行的,就能提高加载任务的执行效率。
目前组件处于测试阶段~~~
代码地址sharding-job: 将一个大任务划分成几个小任务,由几个实例分别运行这些小任务,可以提高任务的执行效率。
方案灵感来源灵感来源ElasticJob-Lite——无中心的分布式定时任务调度
官方文档:ElasticJob-Lite :: ElasticJob
其实直接使用ElasticJob-Lite,在闹钟服务,是能满足需求的,因为闹钟服务的场景是定时执行,完全符合ElasticJob-Lite的场景。
但是,ElasticJob-Lite中存在servers目录,每增加一个ip,就会在servers目录下增加一个持久节点。
而我们的服务是部署在docker上,每次启动,ip地址都会发生变化,这就会导致servers的子节点一直增加,不合适。
Zookeeper结构设计 流程图 接入方式添加maven依赖
pom.xml
com.zidongxiangxi sharding-job-spring-boot-starter1.0-SNAPSHOT
添加配置
application.yml
sharding-job: application-name: bcc.bp.fdd zookeeper: address: zk1.esf.fdd:2181,zk2.esf.fdd:2181,zk3.esf.fdd:2181
创建分片任务执行类
ShardingJobTest
package com.zidongxiangxi.bp.bcc.server; import com.zidongxiangxi.sharding.job.core.executor.runnable.ShardingJobRunnable; import com.zidongxiangxi.sharding.job.starter.annotation.ShardingJobRunner; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Component @Slf4j @ShardingJobRunner(jobName = "test", shardingItems = {"table1", "table2", "table3", "table4"}, override = true) public class ShardingJobTest implements ShardingJobRunnable { @Override public void run(String shardingItem) throws InterruptedException { log.warn("执行分片:" + shardingItem); Thread.sleep(30000L); } }说明
@ShardingJobRunner注解有三个属性:
jobName:任务名,同个服务里任务名相同,任务是同一个任务,启动会失败
shardingItems:分片项,例如上面举例的闹钟服务,分片项就看是表名
override:启动的时候,是否覆盖配置,默认为true。减少或增加分片项的时候,必须指定为true
加了@ShardingJobRunner注解的类,必须实现ShardingJobRunnable接口。目前任务的执行是交给ThreadPoolShardingJobExecutor类来运行,每个分片任务,会启动一个线程独立运行,该独立线程会不停调用ShardingJobRunnable.run方法,直到被中断
ThreadPoolShardingJobExecutor
@Slf4j public class ThreadPoolShardingJobExecutor implements ShardingJobExecutor { ...... @Override public synchronized void execute(String shardingItem) { if (shutdown) { log.warn("任务【{}】执行器已经关闭", jobName); return; } if (shardingItems.contains(shardingItem)) { log.warn("任务【{}】的分片【{}】已经存在", jobName, shardingItem); return; } FutureTask futureTask = (FutureTask)threadPool.submit(() -> { shardingNode.setShardingItemRunning(shardingItem); while (!Thread.interrupted()) { try { runnable.run(shardingItem); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } log.warn("任务:{}, 分片:{},被中断,结束分片任务的执行", jobName, shardingItem); shardingNode.clearShardingItemRunning(Collections.singletonList(shardingItem)); }); shardingItems.add(shardingItem); shardingItemTaskMap.put(shardingItem, futureTask); } ...... }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)