在学习了阻塞队列ArrayBlockingQueue、linkedBlockingQueue和linkedBlockingDeque我们对Java的阻塞队列有了一定的认识,有了生产消费者模型基本的概念。但是此时若有这样的场景:
我们想建立一个以生产和消费者为主体的队列,当生产者生产没有被消费时阻塞,消费者消费没有生产数据时也阻塞消费线程。保证生产和消费能够组成CP才能成功
那么本节我们便来学习SynchronousQueue和linkedTransferQueue达此目的。
SynchronousQueue
我们知道传统的生产消费模型是这样的:
有多个生产者,可以并发生产产品,把产品置入队列中,如果队列满了,生产者就会阻塞;有多个消费者,并发从队列中获取产品,如果队列空了,消费者就会阻塞
而另一种反向压力的实现是:消费者唤醒生产者
其中著名的场景便是线程池
不像ArrayBlockingQueue、linkedBlockingDeque之类的阻塞队列依赖AQS实现并发 *** 作,SynchronousQueue直接使用CAS实现线程的安全访问,且SynchronousQueue内部没有容器,例如之前的队列或数组存储数据。
我们可以来看看SynchronousQueue的具体实现:
public class SynchronousQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { private static final long serialVersionUID = -3223113410248163686L; static final int NCPUS = Runtime.getRuntime().availableProcessors(); static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; static final int maxUntimedSpins = maxTimedSpins * 16; static final long spinForTimeoutThreshold = 1000L; static final class TransferStack extends Transferer {} static final class TransferQueue extends Transferer {} private transient volatile Transferer transferer; }
- NCPUS :CPU核数
- maxTimedSpins :限时等待阻塞前自旋的次数,单核为0,多核32
- maxUntimedSpins :不限时等待阻塞前自旋的次数,maxTimedSpins * 16
- spinForTimeoutThreshold :纳秒数,这个为了更快的自旋而非park时间。初略估计足够了,默认100
SynchronousQueue的内部实现了两个类,一个是TransferStack类,使用LIFO顺序存储元素,这个类用于非公平模式;还有一个类是TransferQueue,使用FIFI顺序存储元素,这个类用于公平模式。
队列和栈都继承了抽象类Transferer,这个类只定义了一个方法transfer,此方法可以既可以执行put也可以执行take *** 作。这两个 *** 作被统一到了一个方法中,因为在dual数据结构中,put和take *** 作是对称的,发送或者消费线程会阻塞,只有有一对消费和发送线程匹配上,才同时退出,所以这种阻塞队列可以理解为"配对"队列。
transfer方法
- 如果队列为空或者头节点模式和自己的模式相同,尝试将自己增加到队列的等待者中,等待被满足或者被取消
- 如果队列包含了在等待的节点,并且本次调用是与之模式匹配的调用,尝试通过CAS修改等待节点item字段然后将其出队或出栈
在构造函数中可以通过传boolean值构建是否公平模式,默认是非公平模式。
public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack (); } public SynchronousQueue() { this(false); }
公平模式:
公平模式下,底层实现使用的是TransferQueue这个内部队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。
例如在队列中已有两个put *** 作处于阻塞中,当再次有tack *** 作时,这时和put2匹配成功,将put1唤醒,是的,根据公平策略,先进先出。put1线程被唤醒消费,实现了线程间的一对一通信。
非公平模式:
与公平模式相比,大体相同,底层的实现使用的是TransferStack。总结而言,就是先进后出,来了一个线程take1,执行了take *** 作,这时候发现栈顶为put2线程,匹配成功,但是实现会先把take1线程入栈,然后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程
linkedTransferQueue
我们知道 SynchronousQueue内部无法存储元素,当要添加元素的时候,需要阻塞,不够完美,linkedBolckingQueue 则内部使用了一些锁,性能不高,所以linkedTransferQueue出现了,它可以算是 linkedBolckingQueue 和 SynchronousQueue 的结合体,实现了接口TransferQueue
public class linkedTransferQueueextends AbstractQueue implements TransferQueue , java.io.Serializable { transient volatile Node head; private transient volatile Node tail; }
linkedTransferQueue相比SynchronousQueue 多了指向队列 的指针
public interface TransferQueueextends BlockingQueue { // 如果可能,立即将元素转移给等待的消费者。 // 更确切地说,如果存在消费者已经等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则返回 false。 boolean tryTransfer(E e); // 将元素转移给消费者,如果需要的话等待。 // 更准确地说,如果存在一个消费者已经等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则等待直到元素由消费者接收。 void transfer(E e) throws InterruptedException; // 上面方法的基础上设置超时时间 boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; // 如果至少有一位消费者在等待,则返回 true boolean hasWaitingConsumer(); // 返回等待消费者人数的估计值 int getWaitingConsumerCount(); }
从put举例
public void put(E e) { xfer(e, true, ASYNC, 0); }
xfer方法
private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race // 从 head 开始 for (Node h = head, p = h; p != null;) { // find & match first node // head 的类型。 boolean isData = p.isData; // head 的数据 Object item = p.item; // item != null 有 2 种情况,一是 put *** 作, 二是 take 的 itme 被修改了(匹配成功) // (itme != null) == isData 要么表示 p 是一个 put *** 作, 要么表示 p 是一个还没匹配成功的 take *** 作 if (item != p && (item != null) == isData) { // 如果当前 *** 作和 head *** 作相同,就没有匹配上,结束循环,进入下面的 if 块。 if (isData == haveData) // can't match break; // 如果 *** 作不同,匹配成功, 尝试替换 item 成功, if (p.casItem(item, e)) { // match // 更新 head for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // 唤醒原 head 线程. LockSupport.unpark(p.waiter); return linkedTransferQueue.cast(item); } } // 找下一个 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } // 如果这个 *** 作不是立刻就返回的类型 if (how != NOW) { // No matches available // 且是第一次进入这里 if (s == null) // 创建一个 node s = new Node(e, haveData); // 尝试将 node 追加对队列尾部,并返回他的上一个节点。 Node pred = tryAppend(s, haveData); // 如果返回的是 null, 表示不能追加到 tail 节点,因为 tail 节点的模式和当前模式相反. if (pred == null) // 重来 continue retry; // lost race vs opposite mode // 如果不是异步 *** 作(即立刻返回结果) if (how != ASYNC) // 阻塞等待匹配值 return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
xfer逻辑就是:
找到 head 节点,如果 head 节点是匹配的 *** 作,就直接赋值,如果不是,添加到队列中
相比较 SynchronousQueue 多了一个可以存储的队列,相比较 linkedBlockingQueue 多了直接传递元素,少了用锁来同步。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)