多线程与高并发

多线程与高并发,第1张

目录
  • JMH工具
  • Disruptor框架

JMH工具

JMH即Java Microbenchmark Harness,是Java用来做基准测试的一个工具,该工具由OpenJDK提供并维护,测试结果可信度高。JVM下的性能测试工具。

使用步骤:

  1. 引入Maven依赖

    <dependency>
        <groupId>org.openjdk.jmh</groupId>
        <artifactId>jmh-core</artifactId>
        <version>1.23</version>
    </dependency>
    
    <dependency>
        <groupId>org.openjdk.jmh</groupId>
        <artifactId>jmh-generator-annprocess</artifactId>
        <version>1.23</version>
    </dependency>
    
  2. 安装idea插件,并开启注解设置

  3. 代码使用

    public class JMHTest {
    
        @BenchmarkMode(Mode.All) //假设测试平均耗时,可以指定测试维度为Mode.AverageTime。
        @Benchmark //声明一个public方法为基准测试方法
        @Fork(1) //指定fork出多少个子进程来执行同一基准测试方法。假设我们不需要多个进程,可以使用@Fork指定为进程数为1。
        @Threads(1) //指定使用多少个线程来执行基准测试方法
        @OutputTimeUnit(TimeUnit.MILLISECONDS) // 指定输出的耗时时长的单位,默认是秒
        @Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS) //预热
        @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
        //假设@Measurement指定iterations为100,time为10s,则: 每个线程实际执行基准测试方法的次数等于time除以基准测试方法单次执行的耗时,假设基准测试方法执行耗时为1s,那么一次测量最多只执行10(time为10s / 方法执行耗时1s)次基准测试方法,而iterations为100指的是测试100次(不是执行100次基准测试方法)
        public void getJson() {
            foreach();
        }
    
    
        private void foreach() {
            List<Integer> list = new ArrayList<>();
            for (int i = 0; i < 10000; i++) {
                list.add(i);
            }
            list.forEach(v -> fun(v));
        }
    
        private void parallel() {
            List<Integer> list = new ArrayList<>();
            for (int i = 0; i < 10000; i++) {
                list.add(i);
            }
            list.parallelStream().forEach(v -> fun(v));
        }
    
        private int fun(int num) {
            for (int i = 0; i < 100; i++) {
                for (int j = 0; j < 1000; j++) {
                    if (i == j) {
                        num++;
                    }
                }
            }
            return num;
        }
    }
    
    
    
    

    (如果只使用单个线程,一次测量只会执行一次基准测试方法,如果使用10个线程,一次测量就能执行10次基准测试方法。)

    参考:https://blog.csdn.net/ZYC88888/article/details/113741316

Disruptor框架

RingBuffer:环形队列,采用数组实现,没有首尾指针

特点:性能最快的内存队列,无锁,高并发,队列如果满了会直接覆盖旧数据降低GC频率,实现了生产者消费者模式(观察者模式)

背景:目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。在美团点评技术团队它也有不少应用,有的项目架构借鉴了它的设计机制。本文从实战角度剖析了Disruptor的实现原理。
Log4j 2相对于Log4j 1最大的优势在于多线程并发场景下性能更优。该特性源自于Log4j 2的异步模式采用了Disruptor来处理。

需要特别指出的是,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。

代码使用:使用Disruptor比使用ArrayBlockingQueue略微复杂,为方便读者上手,增加代码样例。
代码实现的功能:每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端。详细逻辑请细读代码。
以下代码基于3.3.4版本的Disruptor包。


package com.meituan.Disruptor;
 
/**
 * @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
 */
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
 
import java.util.concurrent.ThreadFactory;
 
 
public class DisruptorMain
{
    public static void main(String[] args) throws Exception
    {
        // 队列中的元素
        class Element {
 
            private int value;
 
            public int get(){
                return value;
            }
 
            public void set(int value){
                this.value= value;
            }
 
        }
 
        // 生产者的线程工厂
        ThreadFactory threadFactory = new ThreadFactory(){
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };
 
        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };
 
        // 处理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>(){
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch)
            {
                System.out.println("Element: " + element.get());
            }
        };
 
        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();
 
        // 指定RingBuffer的大小
        int bufferSize = 16;
 
        // 创建disruptor,采用单生产者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
 
        // 设置EventHandler
        disruptor.handleEventsWith(handler);
 
        // 启动disruptor的线程
        disruptor.start();
 
        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
 
        for (int l = 0; true; l++)
        {
            // 获取下一个可用位置的下标
            long sequence = ringBuffer.next();  
            try
            {
                // 返回可用位置的元素
                Element event = ringBuffer.get(sequence); 
                // 设置该位置元素的值
                event.set(l); 
            }
            finally
            {
                ringBuffer.publish(sequence);
            }
            Thread.sleep(10);
        }
    }
}

注意:多个消费者时,每个消费者都会消费一遍一样的消息。

参考:https://blog.csdn.net/ZYC88888/article/details/80006226?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165164896316782184651479%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=165164896316782184651479&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2blogfirst_rank_ecpm_v1~rank_v31_ecpm-2-80006226.nonecase&utm_term=disruptor&spm=1018.2226.3001.4450

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/870673.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存