分片任务组件

分片任务组件,第1张

分片任务组件

目录

背景

代码地址

方案灵感来源

Zookeeper结构设计

流程图

接入方式

说明


背景

将一个大任务划分成几个小任务,由几个实例分别运行这些小任务,可以提高任务的执行效率。

举例:

clock.ip.fdd闹钟服务,需要将闹钟数据从mysql表读取到内存中,多个实例的情况下,是用分布式锁来解决读取数据的冲突。

这样是能解决问题,但是读取数据到内存中就变成一个串行的 *** 作,当某一刻积累了大量的闹钟数据,可能会出现加载不及时导致闹钟延迟的情况。

如果把存储闹钟的表拆分成8个表,如果2个实例,则每个实例负责读取4个表的数据;如果4个实例,则每个实例负责读取2个表的数据。

多个实例之间是并行的,就能提高加载任务的执行效率。

目前组件处于测试阶段~~~

代码地址

sharding-job: 将一个大任务划分成几个小任务,由几个实例分别运行这些小任务,可以提高任务的执行效率。

方案灵感来源

灵感来源ElasticJob-Lite——无中心的分布式定时任务调度

官方文档:ElasticJob-Lite :: ElasticJob

其实直接使用ElasticJob-Lite,在闹钟服务,是能满足需求的,因为闹钟服务的场景是定时执行,完全符合ElasticJob-Lite的场景。

但是,ElasticJob-Lite中存在servers目录,每增加一个ip,就会在servers目录下增加一个持久节点。

而我们的服务是部署在docker上,每次启动,ip地址都会发生变化,这就会导致servers的子节点一直增加,不合适。

Zookeeper结构设计

流程图

接入方式

添加maven依赖

pom.xml



    com.zidongxiangxi

    sharding-job-spring-boot-starter

    1.0-SNAPSHOT

添加配置

application.yml

sharding-job:
  application-name: bcc.bp.fdd
  zookeeper:
    address: zk1.esf.fdd:2181,zk2.esf.fdd:2181,zk3.esf.fdd:2181 

创建分片任务执行类

ShardingJobTest

package com.zidongxiangxi.bp.bcc.server;



import com.zidongxiangxi.sharding.job.core.executor.runnable.ShardingJobRunnable;

import com.zidongxiangxi.sharding.job.starter.annotation.ShardingJobRunner;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;





@Component

@Slf4j

@ShardingJobRunner(jobName = "test", shardingItems = {"table1", "table2", "table3", "table4"}, override = true)

public class ShardingJobTest implements ShardingJobRunnable {

    @Override

    public void run(String shardingItem) throws InterruptedException {

        log.warn("执行分片:" + shardingItem);

        Thread.sleep(30000L);

    }

}

说明
@ShardingJobRunner注解有三个属性:

jobName:任务名,同个服务里任务名相同,任务是同一个任务,启动会失败

shardingItems:分片项,例如上面举例的闹钟服务,分片项就看是表名

override:启动的时候,是否覆盖配置,默认为true。减少或增加分片项的时候,必须指定为true

加了@ShardingJobRunner注解的类,必须实现ShardingJobRunnable接口。目前任务的执行是交给ThreadPoolShardingJobExecutor类来运行,每个分片任务,会启动一个线程独立运行,该独立线程会不停调用ShardingJobRunnable.run方法,直到被中断

ThreadPoolShardingJobExecutor

@Slf4j

public class ThreadPoolShardingJobExecutor implements ShardingJobExecutor {

    ......

    @Override

    public synchronized void execute(String shardingItem) {

        if (shutdown) {

            log.warn("任务【{}】执行器已经关闭", jobName);

            return;

        }

        if (shardingItems.contains(shardingItem)) {

            log.warn("任务【{}】的分片【{}】已经存在", jobName, shardingItem);

            return;

        }

        FutureTask futureTask = (FutureTask)threadPool.submit(() -> {

            shardingNode.setShardingItemRunning(shardingItem);

            while (!Thread.interrupted()) {

                try {

                    runnable.run(shardingItem);

                } catch (InterruptedException e) {

                    Thread.currentThread().interrupt();

                }

            }

            log.warn("任务:{}, 分片:{},被中断,结束分片任务的执行", jobName, shardingItem);

            shardingNode.clearShardingItemRunning(Collections.singletonList(shardingItem));

        });

        shardingItems.add(shardingItem);

        shardingItemTaskMap.put(shardingItem, futureTask);

    }

    ......

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5681367.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存