2011年,企业应用软件专家Martin Fowler在自己的网站上写了一篇LMAX架构的文章。在文章中他介绍了LMAX是一种新型零售金融交易平台,它能以很低的延迟产生大量的交易。这个LMAX系统是建立在JVM平台上,其核心是一个业务逻辑处理器。能够在一个线程里每秒处理六百万订单。
通过上面的介绍,相比大家对这个东西都有一个大大的问号吧,它能处理这么高的并发,那么它到底是采用了什么样的机制呢?或者有什么好的优化手段呢?
其实我们也不难猜测,一个线程里既然能处理六百万订单,那么很显然它的业务逻辑一定是在内存中处理的,因为如果要走IO的话,那性能肯定没有那么高,当然无论是互联网任何一个高性能的体现,无非就是走内存。 那么可能有的小伙伴会说缓存也可以啊,但是缓存性能肯定也是没有内存高的。
事实上Disruptor也使用了一个事件驱动的方式,这也是他的一个核心编程模型。
真正要想使用Disruptor,需要以下步骤:
- 建立一个工厂Event类,用于创建Event类实例对象。
- 需要一个事件监听类,用于处理数据(Event类)
- 实例化Disruptor实例,配置一系列参数,编写Disruptor核心组件
- 编写生产者组件,向Disruptor容器中去投递数据
让我们开始coding吧~
1.添加依赖首先创建一个Maven项目并添加disruptor依赖:
2.建立Event类与Event工厂com.lmax disruptor3.4.2
@Data @AllArgsConstructor @NoArgsConstructor public class TestEvent { private long value; }
public class TestEventFactory implements EventFactory3.创建事件监听类{ @Override public TestEvent newInstance() { return new TestEvent();//这个方法就是为了返回空的数据对象 } }
public class TestEventHandler implements EventHandler4.创建生产者{ @Override public void onEvent(TestEvent testEvent, long l, boolean b) throws Exception { System.out.println("消费者:"+testEvent.getValue()); } }
public class TestEventProducer { private RingBuffer5.创建Main方法,启动测试ringBuffer; public TestEventProducer(RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data){ //1.在生产者发送消息的时候,首先需要在我们的ringBuffer里面获取一个可用的序号 long sequence = ringBuffer.next(); try{ //2.根据这个序号,找到具体的"TestEvent"元素 注意:此时获取的event对象是一个没有被赋值的空对象。 TestEvent event = ringBuffer.get(sequence); //3.进行实际的赋值 *** 作 event.setValue(data.getLong(0)); }finally { //4.提交发布 *** 作 ringBuffer.publish(sequence); } } }
public class Main { public static void main(String[] args) { //参数准备工作 TestEventFactory testEventFactory = new TestEventFactory(); int ringBufferSize = 1024 * 1024; ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //1.实例化disruptor对象 Disruptordisruptor = new Disruptor<>( testEventFactory, ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); //2.添加消费者的监听 disruptor.handleEventsWith(new TestEventHandler()); //3.启动disruptor disruptor.start(); //4.获取实际存储数据的容器:RingBuffer RingBuffer ringBuffer = disruptor.getRingBuffer(); TestEventProducer producer = new TestEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < 100; i++) { bb.putLong(0,i); producer.sendData(bb); } disruptor.shutdown(); executor.shutdown(); } }
通过上面的实验,我们可以看到我们编写的代码是可以运行成功的,我们的producer可以正常发送数据,同样我们的消费者也能正常监听并消费。
Disruptor核心原理我们已经从上面的代码演示中了解到,其实Disruptor就是从生产者去同步消息,然后扔到一个RingBuffer容器里面,消费者进行监听,然后去获取具体的数据。
那么我们不难看出,RingBuffer是Disruptor的一个核心组件,生产者想RingBuffer写元素,消费者从RingBuffer中消费元素。这个也是RingBuffer给我们带来的最初的印象。
什么是RingBuffer那么RingBuffer到底是什么呢?
- 其实正如它的名字一样,它是一个环(首尾相接的环)
- 它用做在不同上下文(线程)间传递数据的buffer。
- RingBuffer拥有一个序号,这个序号指向数组中下一个可用元素。
由此我们可以知道,这个RingBuffer不仅是一个首尾相接的环,它的底层还是一个数组的结构,我们来看一下这个图:
这个图里就是一个环,把它分成了若干个小份,并且他还是有相应的序号的,我们图中一共10个位置。
假设RingBuffer里有三个的元素后,有一个生产者它想去把Event对象再扔到这个RingBuffer里,它第一步就是需要调用next获取下一个可用序号,
假设我们容器中有三个元素,那么如果向第四个位置填充的时候,它首先通过next方法获取下一个可用序号,即序号4,然后我们需要通过get方法得到第四个位置的对象,
只不过这时的对象,它只是给new出来了,不过没有赋值,然后我们给赋值就好了。
这就是整体RingBuffer的概念。
随着不停的填充这个buffer,这个序号会一直增长,那么RingBuffer是如何找到对一个元素的?
- 要想找到当前序号指向的元素,可以通过mod *** 作:sequence mod array .length = array.index(取模 *** 作)
这个也是RingBuffer设计的巧妙之处,如下图,假设数组长度还是10,那么如果生产的消息超过了10个,如下图:
如果要找到12 这个位置的元素,就可以使用 12 % 10 = 2 就可以快速定位到元素的位置。
事实上,上图RingBuffer只有10个槽完全是个意外。如果槽的个数是2的N次方更有利于基于二进制的计算机进行计算。
通过上述例子,我们可以知道,RingBuffer是一个基于数组的缓存实现,也是创建sequencer与定义WaitStrategy的入口。
什么是SequenceSequence的作用就是通过顺序递增的序号,管理进行交换的数据(事件)。
它对数据的处理过程总是沿着序号逐个递增处理。
一个Sequence用于标识某个特定的事件处理者(RingBuffer/Producer/Consumer)的处理进度。
而Sequencer才是Disruptor的核心,它包含了Sequence。
这个接口有两个实现类:
- SingleProducerSequencer
- MultiProducerSequencer
他们主要是实现生产者和消费者之间快速、正确的传递数据的并发算法。
当我们有单个生产者的时候是使用SingleProducerSequencer,当多个生产者的话,就用MultiProducerSequencer。
Sequence BarrierBarrier的意思就是栅栏,序号栅栏这是什么概念呢?
这是我们生产和消费平衡关系的一个核心的类。
用于保持RingBuffer的Main Published Sequence(Producer)和Consumer之间的平衡关系;Sequence Barrier还自定义了决定Consumer是否还有可处理的事件的逻辑。
这个就是我们上文快速入门案例中代码里提到的等待策略,它的主要目的是为了决定消费者将如何等待生产者将Event置入Disruptor。
其主要的策略有:
- BlockingWaitStrategy (阻塞的方式)
- SleepingWaitStrategy(休眠的方式)
- YieldingWaitStrategy(线程之间相互切换竞争的方式)
这个是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现。
我们进入到源码里,可以看到,它首先定义了一个ReentrantLock的可重入锁,并且调用了newCondition,创建了一个Condition。Condition就是实现对线程唤醒或阻塞的工作。类似于wait和notify。
首先,它是实现了WaitStrategy,重写了waitFor和signalAllWhenBlocking方法。
这段代码也比较简单,我们大体看一下吧:
首先就是说判断是否满足条件,如果满足,就加锁,然后通过一个whild循环,这里意思就是说是否有可用的sequence,如果没有可用的,就用condition.await 一直等待,如果有可用的:
就在这里唤醒阻塞的线程。
这里注意一点,其实我们这里所说的WaitStrategy其实都是针对与消费者。
这里我们就先简单了解一下就好了,后面我们再详细探讨。
它的表现跟BlockingWaitStrategy差不多,对CPU消耗也类似,但其对生产者线程的影响最小,适用于异步日志类似的场景。
这个其实也是实现了WaitStrategy,我们也大体看一下源码:
可以看到这里其实也是用了一个循环,不过这个类除了实现了waitFor和signalAllWhenBlocking方法之外,它还多了一个方法:
我们可以看出,这种方式是采用了无锁的方式,因为它没有使用JDK中任何的锁,那它是怎么做到的呢?
它首先就是一个while循环,然后通过applyWaitMethod对counter进行一个++ --的 *** 作,由于我们这里是刚接触,就不详细解释了,总之就是做一个空运转。
这种策略的性能是最好的,适用于低延迟的系统,在要求极高性能且事件处理线程数小于CPU核心数的场景中,推荐此策略;例如CPU开启超线程特性。
它有一个SPIN_TRIES一个默认值,并且也实现了waitFor和signalAllWhenBlocking方法,你会发现,只要是无锁的方式,这里的:
都是一个空实现。
这个方法就就比上一个实现的更简单了,只要counter==0 就不断让线程进行yield,else的话就对counter–。实现了多线程不断进行交替执行。这样他的线程利用率是最高的,因此性能也是最好的。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)