Countdownlatch执行任务+Future

Countdownlatch执行任务+Future,第1张

目录

概念:

例子:线程池执行任务,任务执行完后,主线程再执行

案例:模拟王者荣耀玩家准备加载后,游戏开始 

案例:模拟分布式获取接口数据(三种->串行and并行and利用Future封装各个接口数据,最后在主线程get) 

1.利用线程池中线程处理任务封装->Future中,最后主线程get数据

2.串行执行各个任务 

 3.并行执行

CycliBarrier



概念:

CountDownLatch是一个同步工具类,也就是说它运行一个或多个线程一直等待;这种等待类似于我们之前学的join,与之不同的是:join:可以具体到具体线程,只要该线程执行完毕,后面代码就结束等待;CountDownLatch:可以一直等待;

比如:主线程需要所有组件加载完毕才会启动,类似于boot的启动;

例子:线程池执行任务,任务执行完后,主线程再执行

思路:利用CountDownLatch,每一个线程执行完一个任务,CountDownLatch-1,==0时,CountDownLatch.await()后面的代码才能执行; 

package com.example.juc.CountDownLatch;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

import static java.lang.Thread.sleep;

/**
 * @author diao 2022/5/1
 */
@Slf4j(topic = "c.Test02")
public class Test02 {
    public static void main(String[] args) throws InterruptedException {
        //1.定义一个线程池
         ThreadPoolExecutor threadPool=new ThreadPoolExecutor(3, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue(5),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r,"Task_pool_"+r.hashCode());
                    }
                },new ThreadPoolExecutor.DiscardOldestPolicy());

         //2.CountDownLatch
        CountDownLatch latch = new CountDownLatch(3);

        //3.任务处理
        threadPool.submit(()->{
           log.debug("开始...");
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            latch.countDown();
            log.debug("结束..{}",latch.getCount());
        });

        threadPool.submit(()->{
           log.debug("开始...");
            try {
                sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            log.debug("结束...{}",latch.getCount());
        });

        threadPool.submit(()->{
            log.debug("开始...");
            try {
                sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            log.debug("结束...{}",latch.getCount());
        });

        latch.await();
        log.debug("终于轮到我了");
    }
}


案例:模拟王者荣耀玩家准备加载后,游戏开始 

思路:首先定义一个线程池与CountDownLatch,然后实现一个数组里面封装游戏玩家,一个玩家相当于一个任务然后进行遍历,每个任务进行sumit执行——>(里面模拟%加载 *** 作,延迟我们用sleep模拟),一个用户任务执行完后,CountDownLatch-1 ,当减为0,游戏开始;

package com.example.juc.CountDownLatch;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;

/**
 * @author diao 2022/5/1
 */
@Slf4j(topic = "c.GameTest")
public class GameTest {
    public static void main(String[] args) throws InterruptedException {
         //1.定义一个线程池
        ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 50, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue(10),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "Task_pool_" + r.hashCode());
                    }
                }, new ThreadPoolExecutor.DiscardOldestPolicy());

        //2.设置CountDownLatch
        CountDownLatch latch = new CountDownLatch(10);

        //3.任务处理
        Random r=new Random();
        String[] gamer = new String[10];//数组里面放的是玩家状态

        for(int j=0;j<10;j++){
            /**
             *因为j是lambda外的变量,所以不能写入lambda中,得用中间常量代替
             */
            int temp=j;
            //4.提交任务
            pool.submit(()->{
                for (int i = 0; i <=100; i++) {
                    try {
                        Thread.sleep(r.nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    gamer[temp]=i+"%";
                    System.out.print("\r"+Arrays.toString(gamer));
                }
                /**
                 * 一个用户执行完,CountDownLatch-1
                 */
                latch.countDown();
            });
        }

        /**
         * 主线程等待,当用户全部准备完毕,CountDownLatch为0,主线程执行
         */
        latch.await();
        System.out.println("\n游戏开始");
        pool.shutdown();
    }
}


案例:模拟分布式获取接口数据(三种->串行and并行and利用Future封装各个接口数据,最后在主线程get) 
package com.example.juc.CountDownLatch;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.client.RestTemplate;

import java.util.Map;
import java.util.concurrent.*;

/**
 * @author diao 2022/5/2
 */
//其他线程执行完,主线程才能运行,但是主线程没有拿到其他线程的结果(要用future)
@Slf4j(topic = "c.CountDownLatchTest")
public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
//        test();
//        test2();
        test3();
    }

    /**
     * 串行执行
     */
    private static void test(){
        RestTemplate restTemplate = new RestTemplate();
        log.debug("开始...");

        Map response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
        log.debug("结束 order:{}",response);

        Map response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
        log.debug("结束 product:{}",response1);

    }

    /**
     * 并行执行,时间为 *** 作最长的时间
     */
    private static void test2() throws InterruptedException {
        RestTemplate restTemplate = new RestTemplate();
        log.debug("开始...");

        //1.先定义一个线程池
        ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(3, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue(5),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r,"Task_pool_"+r.hashCode());
                    }
                },new ThreadPoolExecutor.DiscardOldestPolicy());

        //定义CountDownLatch
        CountDownLatch latch = new CountDownLatch(2);

        //2.线程池执行任务,保证任务被并行
        poolExecutor.submit(()->{
            Map response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
            log.debug("结束 order:{}",response);
            latch.countDown();
        });

        poolExecutor.submit(()->{
            Map response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
            log.debug("结束 product:{}",response1);
            latch.countDown();
        });

        latch.await();
        log.debug("执行完毕...");
        poolExecutor.shutdown();
    }

    private static void test3() throws ExecutionException, InterruptedException {
        RestTemplate restTemplate = new RestTemplate();
        log.debug("开始...");
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3, 5, 100, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(5),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "Thread_Task" + r.hashCode());
                    }
                }, new ThreadPoolExecutor.DiscardOldestPolicy());

        Future> data = poolExecutor.submit(() -> {
            Map response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
            log.debug("order数据传输中...");
            return response;
        });

        Future> data2 = poolExecutor.submit(() -> {
            Map response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
            log.debug("product数据传输中...");
            return response1;
        });

        //主线程拿其他线程的数据(数据封装到future中)
        log.debug("开始获取数据...");
        System.out.println(data.get());
        System.out.println(data2.get());
    }
}
1.利用线程池中线程处理任务封装->Future中,最后主线程get数据

结合CountDownLatch思想:

我们可以再在各个任务里面加CountDownLatch-1的 *** 作,只有CountDownLatch数量减到0,主线程才能执行,也就是说才能获取数据,保证获取数据的 *** 作在最后面;

2.串行执行各个任务 

没有用线程池,接口被挨个执行;

花了3s

 3.并行执行

耗时2s,也就是任务执行最久的任务就是总共的耗时长,因为咱有多个线程可以同时执行不同任务,不过具体可能不同时,多少会有差异,除非多核下;


CycliBarrier

对比与CountDownLatch,除了设计初始值,进行减数之外,还能重置初始值,达到一个重用的效果;

如果说CountDownLatch想进行重置,需要重新构造对象,CycliBarrier满足了需求;

package com.example.juc.CountDownLatch.CycliBarrier;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.lang.Thread.sleep;

/**
 * @author diao 2022/5/2
 */
//CyclicBarrier能够重复使用,线程数与要跟任务数保持一致;
@Slf4j(topic = "c.TestCycliBarrier01")
public class TestCycliBarrier01 {
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(2);
        CyclicBarrier barrier = new CyclicBarrier(2,()->{
            log.debug("全部执行完毕...");
        });

        /**
         * 线程池执行线程任务
         * 线程数与任务数要一致
         * 如果是三个线程的话,他就会一下执行三个任务,aba,那么CyclicBarrier被减的就是两个a进行的
         */
        for (int i = 0; i < 3; i++) {
            service.submit(()->{
                log.debug("任务开始...");
                try {
                    sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                try {
                    //阻塞住了,2-1,需要等待下面那个任务将CyclicBarrier减为0,才能继续运行
                    barrier.await();
                    log.debug("继续运行...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });

            service.submit(()->{
                log.debug("任务开始...");
                try {
                    sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    barrier.await();
                    log.debug("继续运行...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
       
        service.shutdown();

    }
}

记住像这种循环执行任务,需要注意线程数与任务数大小,如果线程数>task,会把下一次循环的任务执行;那么CyclicBarrier的修改就会有问题;

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/796216.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存