Fork/Join框架是Java 7提供的一个用于并行执行任务的框架, 核心思想就是把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果,其实现思想与MapReduce有异曲同工之妙。
Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下:
Fork/Join框架使用一个巧妙的算法来平衡线程的负载,称为工作窃取(work-stealing)算法。工作窃取的运行流程图如下:
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
ForkJoinTask : 基本任务,使用fork、join框架必须创建的对象,提供fork,join *** 作,常用的三个子类
- RecursiveAction : 无结果返回的任务
- RecursiveTask : 有返回结果的任务
- CountedCompleter:无返回值任务,完成任务后可以触发回调。
ForkJoinTask提供了两个重要的方法:
- fork : 让task异步执行
- join : 让task同步执行,可以获取返回值
ForkJoinPool : 专门用来运行 ForkJoinTask 的线程池,(在实际使用中,也可以接收
Runnable/Callable任务,但在真正运行时,也会把这些任务封装成 ForkJoinTask类型的任务)
ForkJoinTask 在不显式使用 ForkJoinPool.execute/invoke/submit() 方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行
ForkJoinExample.java
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ForkJoinExample {
//java8 parallStream
//针对一个数字,做计算。
private static final Integer MAX=200;
static class CalcForJoinTask extends RecursiveTask<Integer> {
private Integer startValue; //子任务的开始计算的值
private Integer endValue; //子任务结束计算的值
public CalcForJoinTask(Integer startValue, Integer endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
protected Integer compute() {
//如果当前的数据区间已经小于MAX了,那么接下来的计算不需要做拆分
if(endValue-startValue<MAX){
System.out.println("开始计算:startValue:"+startValue+" ; endValue:"+endValue);
Integer totalValue=0;
for(int i=this.startValue;i<=this.endValue;i++){
totalValue+=i;
}
return totalValue;
}
CalcForJoinTask subTask=new CalcForJoinTask(startValue,(startValue+endValue)/2);
subTask.fork();
CalcForJoinTask calcForJoinTask=new CalcForJoinTask((startValue+endValue)/2+1,endValue);
calcForJoinTask.fork();
return subTask.join()+calcForJoinTask.join();
}
}
public static void main(String[] args) {
CalcForJoinTask calcForJoinTask=new CalcForJoinTask(1,10000);
ForkJoinPool pool=new ForkJoinPool();
ForkJoinTask<Integer> taskFuture=pool.submit(calcForJoinTask);
try {
Integer result=taskFuture.get();
System.out.println("result:"+result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
2.2 业务应用
业务背景:对商品信息、商品评论信息、商品销量数据、商家信息进行聚合查询。
2.2.1 项目搭建创建maven项目
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>spring-boot-fork-join</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-fork-join</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.2.2 创建实体类
Item.java
/**
* 商品信息
**/
public class Item {
private String productName;
private int num;
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
}
Comment.java
/**
商品评论信息
**/
public class Comment {
private String name;
private String content;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
Seller.java
/**
* 销售信息
**/
public class Seller {
private int totalNum;
private int sellerNum;
public int getTotalNum() {
return totalNum;
}
public void setTotalNum(int totalNum) {
this.totalNum = totalNum;
}
public int getSellerNum() {
return sellerNum;
}
public void setSellerNum(int sellerNum) {
this.sellerNum = sellerNum;
}
}
Shop.java
/**
* 商家信息
**/
public class Shop {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
Context.java
/**
* 聚合信息
**/
public class Context {
private Item item; //商品
private Comment comment; //评论
private Seller seller; //销售信息
private Shop shop; //店铺信息
public Item getItem() {
return item;
}
public void setItem(Item item) {
this.item = item;
}
public Comment getComment() {
return comment;
}
public void setComment(Comment comment) {
this.comment = comment;
}
public Seller getSeller() {
return seller;
}
public void setSeller(Seller seller) {
this.seller = seller;
}
public Shop getShop() {
return shop;
}
public void setShop(Shop shop) {
this.shop = shop;
}
@Override
public String toString() {
return "Context{" +
"item=" + item +
", comment=" + comment +
", seller=" + seller +
", shop=" + shop +
'}';
}
}
2.2.3 创建基础接口和类
ILoadDataProcessor.java
public interface ILoadDataProcessor {
/**
* 加载对应的数据
* @param context
*/
void load(Context context);
}
AbstractLoadDataProcessor.java
import java.util.concurrent.RecursiveAction;
/**
数据加载抽象类
**/
public abstract class AbstractLoadDataProcessor extends RecursiveAction implements ILoadDataProcessor{
protected Context context;
@Override
protected void compute() {
load(context); //调用子类的具体实现
}
public Context getContext() {
this.join(); //得到一个聚合的结果
return context;
}
public void setContext(Context context) {
this.context = context;
}
}
2.2.4 创建服务类
CommentService.java
import org.springframework.stereotype.Service;
/**
评论服务类
**/
@Service
public class CommentService extends AbstractLoadDataProcessor{
@Override
public void load(Context context) {
//RPC.
Comment comment=new Comment();
comment.setName("XIWANG");
comment.setContent("商品质量很好");
context.setComment(comment);
}
}
ItemService.java
import org.springframework.stereotype.Service;
/**
商品服务类
**/
@Service
public class ItemService extends AbstractLoadDataProcessor{
@Override
public void load(Context context) {
Item item=new Item();
item.setNum(100);
item.setProductName("键盘");
context.setItem(item);
}
}
SellerService.java
import org.springframework.stereotype.Service;
/**
销量服务类
**/
@Service
public class SellerService extends AbstractLoadDataProcessor{
@Override
public void load(Context context) {
Seller seller=new Seller();
seller.setSellerNum(100);
seller.setTotalNum(1000);
context.setSeller(seller);
}
}
ShopService.java
import org.springframework.stereotype.Service;
/**
商家服务类
**/
@Service
public class ShopService extends AbstractLoadDataProcessor{
@Override
public void load(Context context) {
Shop shop=new Shop();
shop.setName("令狐冲小店");
context.setShop(shop);
}
}
2.2.5 创建聚合任务类
ComplexTradeTaskService.java
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinTask;
/**
销量和商家聚合任务类
**/
@Service
public class ComplexTradeTaskService extends AbstractLoadDataProcessor implements ApplicationContextAware {
ApplicationContext applicationContext;
private List<AbstractLoadDataProcessor> taskDataProcessors=new ArrayList<>();
@Override
public void load(Context context) {
taskDataProcessors.forEach(abstractLoadDataProcessor->{
abstractLoadDataProcessor.setContext(this.context);
abstractLoadDataProcessor.fork();//创建一个fork task
});
}
@Override
public Context getContext() {
this.taskDataProcessors.forEach(ForkJoinTask::join);
return super.getContext();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext=applicationContext;
taskDataProcessors.add(applicationContext.getBean(SellerService.class));
taskDataProcessors.add(applicationContext.getBean(ShopService.class));
}
}
ItemTaskForkJoinDataProcessor.java
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinTask;
/**
* 全局聚合任务
**/
@Service
public class ItemTaskForkJoinDataProcessor extends AbstractLoadDataProcessor implements ApplicationContextAware {
ApplicationContext applicationContext;
private List<AbstractLoadDataProcessor> taskDataProcessors=new ArrayList<>();
@Override
public void load(Context context) {
taskDataProcessors.forEach(abstractLoadDataProcessor->{
abstractLoadDataProcessor.setContext(this.context);
abstractLoadDataProcessor.fork();//创建一个fork task
});
}
@Override
public Context getContext() {
//ForkJoinTask::join java8方法引用
// * 构造引用
// * 静态方法引用
// * 实例方法引用
this.taskDataProcessors.forEach(ForkJoinTask::join);
return super.getContext();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext=applicationContext;
taskDataProcessors.add(applicationContext.getBean(CommentService.class));
taskDataProcessors.add(applicationContext.getBean(ItemService.class));
taskDataProcessors.add(applicationContext.getBean(ComplexTradeTaskService.class));
}
}
2.2.6 创建控制器类和启动类
IndexController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ForkJoinPool;
@RestController
public class IndexController {
@Autowired
ItemTaskForkJoinDataProcessor itemTaskForkJoinDataProcessor;
@GetMapping("/say")
public Context index(){
Context context=new Context();
itemTaskForkJoinDataProcessor.setContext(context);
ForkJoinPool forkJoinPool=new ForkJoinPool();
forkJoinPool.submit(itemTaskForkJoinDataProcessor);
return itemTaskForkJoinDataProcessor.getContext();
}
}
SpringBootForkJoinApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootForkJoinApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootForkJoinApplication.class, args);
}
}
2.2.7 测试
启动项目后浏览器访问http://localhost:8080/say
输出结果:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)