上一篇我们说到了SpsclinkedArrayQueue,它是一个单生产者单消费者的队列。在实际的开发过程中不可能都是单生产者,很多时候都是在高并发情况下需要引入队列来进行削峰。在Reactor3中MpsclinkedQueue就是这样一个多生产者单消费者的队列。
成员变量//当前生产节点 private volatile linkedQueueNode构造方法producerNode; //生产节点伴随的并发控制成员变量 private final static AtomicReferenceFieldUpdater PRODUCER_NODE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(MpsclinkedQueue.class, linkedQueueNode.class, "producerNode"); //当前消费节点 private volatile linkedQueueNode consumerNode; //消费节点伴随的并发控制成员变量 private final static AtomicReferenceFieldUpdater CONSUMER_NODE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(MpsclinkedQueue.class, linkedQueueNode.class, "consumerNode");
public MpsclinkedQueue() { //初始化时使用一个空节点作为头结点 linkedQueueNodenode = new linkedQueueNode<>(); //这里使用lazySet只保证了前面代码的有序性,后面的可见性没有保证,也就是只有storestore没有storeload CONSUMER_NODE_UPDATER.lazySet(this, node); //下面的代码会在最后加上storeLoad保证了整个结构的内存可见性 PRODUCER_NODE_UPDATER.getAndSet(this, node);// this ensures correct construction: // StoreLoad }
这里使用lazySet的性能优化技巧在上一篇SpsclinkedArrayQueue没有提到,使用lazySet并不能保证修改变量后其他线程能马上看到最新值,所以在使用lazySet提升性能时,要清楚的知道自己再做什么。如果没有把握最好还是在保证程序正确的前提下在去优化性能。
上面的构造函数最后使用CAS会在最后加上StoreLoad内存屏障,将所有值都刷到主存,保证其他线程的可见。在Volatile变量写的前后分别加上StoreStore,StoreLoad是一种保守的策略,有些CPU会根据程序做一些优化。比如在这个例子中,中间如果将lazySet修改为CAS,有些CPU也是可以将这个CAS的效果优化为lazySet的效果,去掉后面的StoreLoad内存屏障。
public final boolean offer(final E e) { Objects.requireNonNull(e, "The offered value 'e' must be non-null"); //创建新的节点 final linkedQueueNode消费nextNode = new linkedQueueNode<>(e); //这里使用CAS来控制并发,每次只有一个线程能抢占到节点 final linkedQueueNode prevProducerNode = PRODUCER_NODE_UPDATER.getAndSet(this, nextNode); // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed // and completes the store in prev.next. //抢占成功后就可以偷懒使用lazySet慢悠悠的将值存放到这个节点,其他线程不会立马看到这个新的节点, //不过不会对结果有影响,订阅者会循环的拉取消息,过很小的一段时间就可以看到这个节点了 prevProducerNode.soNext(nextNode); // StoreStore return true; } public void soNext(linkedQueueNode n) { NEXT_UPDATER.lazySet(this, n); }
public E poll() { linkedQueueNode总结currConsumerNode = consumerNode; // don't load twice, it's alright linkedQueueNode nextNode = currConsumerNode.lvNext(); if (nextNode != null) { // we have to null out the value because we are going to hang on to the node //获取下一个节点的值,并将他的值设置为null,因为这个节点即将成为新的头结点, //由于只是单消费者的原因,所以不会有并发安全的问题 final E nextValue = nextNode.getAndNullValue(); // Fix up the next ref of currConsumerNode to prevent promoted nodes from keeping new ones alive. // We use a reference to self instead of null because null is already a meaningful value (the next of // producer node is null). //将当前节点的下个节点的指针指向自己,不使用null的原因是null表示当前节点的下个节点可能是null. currConsumerNode.soNext(currConsumerNode); CONSUMER_NODE_UPDATeR.lazySet(this, nextNode); // currConsumerNode is now no longer referenced and can be collected return nextValue; } else if (currConsumerNode != producerNode) { //如果当前头节点的下一个节点为空,但是当前的头结点有和当前的生产节点是同一个节点, // 则说明当前头结点的数据不是最新的,下面通过一个循环来等待本地缓存刷新 while ((nextNode = currConsumerNode.lvNext()) == null) { } //下面的逻辑就和上面一致了 // got the next node... // we have to null out the value because we are going to hang on to the node final E nextValue = nextNode.getAndNullValue(); // Fix up the next ref of currConsumerNode to prevent promoted nodes from keeping new ones alive. // We use a reference to self instead of null because null is already a meaningful value (the next of // producer node is null). currConsumerNode.soNext(currConsumerNode); CONSUMER_NODE_UPDATER.lazySet(this, nextNode); // currConsumerNode is now no longer referenced and can be collected return nextValue; } return null; }
MpsclinkedQueue和SpsclinkedArrayQueue一样也是参考了JCTools,都是无锁队列。都再恰当的地方使用了lazySet来优化性能同时保证程序的正确性。当然,MpsclinkedQueue的单消费者的特性也不是由它自己来保证的。下一篇我们再来Reactor3如何使用这两个队列。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)