目录
概念:
例子:线程池执行任务,任务执行完后,主线程再执行
案例:模拟王者荣耀玩家准备加载后,游戏开始
案例:模拟分布式获取接口数据(三种->串行and并行and利用Future封装各个接口数据,最后在主线程get)
1.利用线程池中线程处理任务封装->Future中,最后主线程get数据
2.串行执行各个任务
3.并行执行
CycliBarrier
概念:
CountDownLatch是一个同步工具类,也就是说它运行一个或多个线程一直等待;这种等待类似于我们之前学的join,与之不同的是:join:可以具体到具体线程,只要该线程执行完毕,后面代码就结束等待;CountDownLatch:可以一直等待;
比如:主线程需要所有组件加载完毕才会启动,类似于boot的启动;
例子:线程池执行任务,任务执行完后,主线程再执行
思路:利用CountDownLatch,每一个线程执行完一个任务,CountDownLatch-1,==0时,CountDownLatch.await()后面的代码才能执行;
package com.example.juc.CountDownLatch;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import static java.lang.Thread.sleep;
/**
* @author diao 2022/5/1
*/
@Slf4j(topic = "c.Test02")
public class Test02 {
public static void main(String[] args) throws InterruptedException {
//1.定义一个线程池
ThreadPoolExecutor threadPool=new ThreadPoolExecutor(3, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue(5),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"Task_pool_"+r.hashCode());
}
},new ThreadPoolExecutor.DiscardOldestPolicy());
//2.CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
//3.任务处理
threadPool.submit(()->{
log.debug("开始...");
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("结束..{}",latch.getCount());
});
threadPool.submit(()->{
log.debug("开始...");
try {
sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("结束...{}",latch.getCount());
});
threadPool.submit(()->{
log.debug("开始...");
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("结束...{}",latch.getCount());
});
latch.await();
log.debug("终于轮到我了");
}
}
案例:模拟王者荣耀玩家准备加载后,游戏开始
思路:首先定义一个线程池与CountDownLatch,然后实现一个数组里面封装游戏玩家,一个玩家相当于一个任务然后进行遍历,每个任务进行sumit执行——>(里面模拟%加载 *** 作,延迟我们用sleep模拟),一个用户任务执行完后,CountDownLatch-1 ,当减为0,游戏开始;
package com.example.juc.CountDownLatch;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;
/**
* @author diao 2022/5/1
*/
@Slf4j(topic = "c.GameTest")
public class GameTest {
public static void main(String[] args) throws InterruptedException {
//1.定义一个线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 50, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue(10),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Task_pool_" + r.hashCode());
}
}, new ThreadPoolExecutor.DiscardOldestPolicy());
//2.设置CountDownLatch
CountDownLatch latch = new CountDownLatch(10);
//3.任务处理
Random r=new Random();
String[] gamer = new String[10];//数组里面放的是玩家状态
for(int j=0;j<10;j++){
/**
*因为j是lambda外的变量,所以不能写入lambda中,得用中间常量代替
*/
int temp=j;
//4.提交任务
pool.submit(()->{
for (int i = 0; i <=100; i++) {
try {
Thread.sleep(r.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
gamer[temp]=i+"%";
System.out.print("\r"+Arrays.toString(gamer));
}
/**
* 一个用户执行完,CountDownLatch-1
*/
latch.countDown();
});
}
/**
* 主线程等待,当用户全部准备完毕,CountDownLatch为0,主线程执行
*/
latch.await();
System.out.println("\n游戏开始");
pool.shutdown();
}
}
案例:模拟分布式获取接口数据(三种->串行and并行and利用Future封装各个接口数据,最后在主线程get)
package com.example.juc.CountDownLatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.client.RestTemplate;
import java.util.Map;
import java.util.concurrent.*;
/**
* @author diao 2022/5/2
*/
//其他线程执行完,主线程才能运行,但是主线程没有拿到其他线程的结果(要用future)
@Slf4j(topic = "c.CountDownLatchTest")
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// test();
// test2();
test3();
}
/**
* 串行执行
*/
private static void test(){
RestTemplate restTemplate = new RestTemplate();
log.debug("开始...");
Map response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
log.debug("结束 order:{}",response);
Map response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
log.debug("结束 product:{}",response1);
}
/**
* 并行执行,时间为 *** 作最长的时间
*/
private static void test2() throws InterruptedException {
RestTemplate restTemplate = new RestTemplate();
log.debug("开始...");
//1.先定义一个线程池
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(3, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue(5),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"Task_pool_"+r.hashCode());
}
},new ThreadPoolExecutor.DiscardOldestPolicy());
//定义CountDownLatch
CountDownLatch latch = new CountDownLatch(2);
//2.线程池执行任务,保证任务被并行
poolExecutor.submit(()->{
Map response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
log.debug("结束 order:{}",response);
latch.countDown();
});
poolExecutor.submit(()->{
Map response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
log.debug("结束 product:{}",response1);
latch.countDown();
});
latch.await();
log.debug("执行完毕...");
poolExecutor.shutdown();
}
private static void test3() throws ExecutionException, InterruptedException {
RestTemplate restTemplate = new RestTemplate();
log.debug("开始...");
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3, 5, 100, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Thread_Task" + r.hashCode());
}
}, new ThreadPoolExecutor.DiscardOldestPolicy());
Future
1.利用线程池中线程处理任务封装->Future中,最后主线程get数据
结合CountDownLatch思想:
我们可以再在各个任务里面加CountDownLatch-1的 *** 作,只有CountDownLatch数量减到0,主线程才能执行,也就是说才能获取数据,保证获取数据的 *** 作在最后面;
2.串行执行各个任务没有用线程池,接口被挨个执行;
花了3s
3.并行执行耗时2s,也就是任务执行最久的任务就是总共的耗时长,因为咱有多个线程可以同时执行不同任务,不过具体可能不同时,多少会有差异,除非多核下;
CycliBarrier
对比与CountDownLatch,除了设计初始值,进行减数之外,还能重置初始值,达到一个重用的效果;
如果说CountDownLatch想进行重置,需要重新构造对象,CycliBarrier满足了需求;
package com.example.juc.CountDownLatch.CycliBarrier;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.lang.Thread.sleep;
/**
* @author diao 2022/5/2
*/
//CyclicBarrier能够重复使用,线程数与要跟任务数保持一致;
@Slf4j(topic = "c.TestCycliBarrier01")
public class TestCycliBarrier01 {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
CyclicBarrier barrier = new CyclicBarrier(2,()->{
log.debug("全部执行完毕...");
});
/**
* 线程池执行线程任务
* 线程数与任务数要一致
* 如果是三个线程的话,他就会一下执行三个任务,aba,那么CyclicBarrier被减的就是两个a进行的
*/
for (int i = 0; i < 3; i++) {
service.submit(()->{
log.debug("任务开始...");
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//阻塞住了,2-1,需要等待下面那个任务将CyclicBarrier减为0,才能继续运行
barrier.await();
log.debug("继续运行...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
service.submit(()->{
log.debug("任务开始...");
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
barrier.await();
log.debug("继续运行...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
}
记住像这种循环执行任务,需要注意线程数与任务数大小,如果线程数>task,会把下一次循环的任务执行;那么CyclicBarrier的修改就会有问题;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)