聊一下常见的并发编程的一些场景

聊一下常见的并发编程的一些场景,第1张

工作中,合理的使用并发编程将大大提高代码运行效率,下面聊一下工作中常见的几种并发编程场景

1、定时任务

这个肯定大家是经常用到的,除了spring提供的@Schedule注解能够建立一个定时任务,简单new一个线程也可以实现

public static void test() {
    new Thread(() -> {
        while (true) {
            try {
               // 业务代码
                Thread.sleep( 60 * 5 * 1000);
            } catch (Exception e) {
                log.error(e);
            }
        }
    }).start();
}

通过这种方式,也可以实现一个每隔五分钟执行一次的定时任务,一些简单的任务可以满足。

2、日志入库

每个项目都会有将运行日志保存到es或者数据库的需求,除了使用消息队列(kafka)等,也可以使用并发编程来实现
首先新建一个阻塞队列,来保存生产的日志消息

@Component
public class LogQueue {
    private static final int QUEUE_MAX_SIZE    = 1000;

    private BlockingQueue queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);

    //投递消息
    public boolean push(Object obj) {
        return this.queue.add(obj);
    } 

    //消费消息
    public Object poll() {
        Object obj = null;
        try {
            obj = this.queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return obj ;
    }
}

然后,就可以创建一个线程去异步 消费队列中的日志消息

@Service
public class Consumer {
    @Autowired
    private LogQueue queue;

    @PostConstruct
    public voit init {
       new Thread(() -> {
          while (true) {
              Object obj = queue.take();
              //写入数据库/es
          }
        }).start();
    }
}

为了提升性能,也可以使用线程池来处理业务逻辑。

3、并行stream流

stream流相信大家都经常用,当数据量比较多,处理的业务比较复杂时,可以使用并行stream流parallelStream

4、接口优化

大多数情况下,一个接口的内部还需要通过feign接口调用别的服务,获取数据,然后整合再返回给前端,如果串行执行这些请求,则接口的响应速度就会大大延长,这时候可以并行执行feign接口请求,等数据都返回后,返回给前端。

public XX getXX(Long id) throws InterruptedException, ExecutionException {
    final XX X= new XX();
    CompletableFuture X1 = CompletableFuture.supplyAsync(() -> {
        getX1(id, X);
        return Boolean.TRUE;
    }, executor);

    CompletableFuture X2 = CompletableFuture.supplyAsync(() -> {
        getX2(id, X);
        return Boolean.TRUE;
    }, executor);

    CompletableFuture X3 = CompletableFuture.supplyAsync(() -> {
        getX3(id, X);
        return Boolean.TRUE;
    }, executor);
    CompletableFuture.allOf(userFuture, bonusFuture, growthFuture).join();

    X1.get();
    X2.get();
    X3.get();
    return userInfo;
}

另外需要注意的是,在多线程异步请求时,要使用线程池,避免高并发场景下创建线程过多的问题。

5、模拟高并发

可以使用countdownlatch来写一个模拟高并发的压测程序

public static void concurrenceTest() {
// 模拟高并发
    final AtomicInteger atomicInteger = new AtomicInteger(0);
    final CountDownLatch countDownLatch = new CountDownLatch(1000); // 相当于计数器,当所有都准备好了,再一起执行,模仿多并发,保证并发量
    final CountDownLatch countDownLatch2 = new CountDownLatch(1000); // 保证所有线程执行完了再打印atomicInteger的值
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    try {
        for (int i = 0; i < 1000; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        countDownLatch.await(); //一直阻塞当前线程,直到计时器的值为0,保证同时并发
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(),e);
                    }
                    //每个线程增加1000次,每次加1
                    for (int j = 0; j < 1000; j++) {
                        atomicInteger.incrementAndGet();
                    }
                    countDownLatch2.countDown();
                }
            }); // submit 
            countDownLatch.countDown();
        }

        countDownLatch2.await();// 保证所有线程执行完
        executorService.shutdown();
    } catch (Exception e){
        log.error(e.getMessage(),e);
    }
}

6、处理积压的mq消息

首先我们需要定义一个线程池,以避免线程的重复创建和销毁,代码就省略了,消费消息的代码,以kafka距离
消费者部分:

@Service
public class ConsumerService {
    @Autowired
    private Executor messageExecutor;
    
    @KafkaListener(id="test",topics={"topic-test"})
    public void listen(String message){
     // 往线程池提交任务
        messageExecutor.submit(new MyWork(message);
    }
}

消息的业务处理部分,runnable接口实现

public class MyWork implements Runnable {
    private String message;
    // 通过构造方法传参
    public MyWork(String message) {
       this.message = message;
    }

    @Override
    public void run() {
        System.out.println(message);
    }
}
7、统计数量

其实这里就是使用并发包里面的原子 *** 作类,atomicInteger,atomiclong之类的,使用普通的count++,在多线程的情况下,会有线程安全问题,而原子 *** 作类院里就是自旋锁 + cas;自旋锁其实就是一个死循环,不停的尝试去获取锁,而CAS是比较和交换的意思,它的实现逻辑是:将内存位置处的旧值与预期值进行比较,若相等,则将内存位置处的值替换为新值。若不相等,则不做任何 *** 作。

8、执行延迟定时任务

其实就是使用ScheduledExecutorService,这是一种能够设置执行频率,以及延迟时间的线程池,比单独使用一个线程在性能上又有提高,解决了Timer单线程执行,多个任务之间会互相影响的问题;具体使用的方法有

1schedule(Runnable command,long delay,TimeUnit unit),带延迟时间的调度,只执行一次,调度之后可通过Future.get()阻塞直至任务执行完毕。
2schedule(Callablecallable,long delay,TimeUnit unit),带延迟时间的调度,只执行一次,调度之后可通过Future.get()阻塞直至任务执行完毕,并且可以获取执行结果。
3、scheduleAtFixedRate,表示以固定频率执行的任务,如果当前任务耗时较多,超过定时周期period,则当前任务结束后会立即执行。
4、scheduleWithFixedDelay,表示以固定延时执行任务,延时是相对当前任务结束为起点计算开始时间。

以上是简单总结的一些场景,当然工作中不止有这些,就简单写到这里了

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/917474.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存