Java基础学习之并发篇:SynchronousQueue和LinkedTransferQueue

Java基础学习之并发篇:SynchronousQueue和LinkedTransferQueue,第1张

Java基础学习之并发篇:SynchronousQueue和LinkedTransferQueue 学习目标

在学习了阻塞队列ArrayBlockingQueue、linkedBlockingQueue和linkedBlockingDeque我们对Java的阻塞队列有了一定的认识,有了生产消费者模型基本的概念。但是此时若有这样的场景:

我们想建立一个以生产和消费者为主体的队列,当生产者生产没有被消费时阻塞,消费者消费没有生产数据时也阻塞消费线程。保证生产和消费能够组成CP才能成功

那么本节我们便来学习SynchronousQueue和linkedTransferQueue达此目的。


SynchronousQueue

我们知道传统的生产消费模型是这样的:

有多个生产者,可以并发生产产品,把产品置入队列中,如果队列满了,生产者就会阻塞;有多个消费者,并发从队列中获取产品,如果队列空了,消费者就会阻塞
而另一种反向压力的实现是:消费者唤醒生产者


其中著名的场景便是线程池

不像ArrayBlockingQueue、linkedBlockingDeque之类的阻塞队列依赖AQS实现并发 *** 作,SynchronousQueue直接使用CAS实现线程的安全访问,且SynchronousQueue内部没有容器,例如之前的队列或数组存储数据。
我们可以来看看SynchronousQueue的具体实现:

public class SynchronousQueue extends AbstractQueue
    implements BlockingQueue, java.io.Serializable {
    private static final long serialVersionUID = -3223113410248163686L;

   static final int NCPUS = Runtime.getRuntime().availableProcessors();
   static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

    
    static final int maxUntimedSpins = maxTimedSpins * 16;

    
    static final long spinForTimeoutThreshold = 1000L;
    
    static final class TransferStack extends Transferer{}
     
    static final class TransferQueue extends Transferer {}
    private transient volatile Transferer transferer;
   }
     
  • NCPUS :CPU核数
  • maxTimedSpins :限时等待阻塞前自旋的次数,单核为0,多核32
  • maxUntimedSpins :不限时等待阻塞前自旋的次数,maxTimedSpins * 16
  • spinForTimeoutThreshold :纳秒数,这个为了更快的自旋而非park时间。初略估计足够了,默认100
    SynchronousQueue的内部实现了两个类,一个是TransferStack类,使用LIFO顺序存储元素,这个类用于非公平模式;还有一个类是TransferQueue,使用FIFI顺序存储元素,这个类用于公平模式。

队列和栈都继承了抽象类Transferer,这个类只定义了一个方法transfer,此方法可以既可以执行put也可以执行take *** 作。这两个 *** 作被统一到了一个方法中,因为在dual数据结构中,put和take *** 作是对称的,发送或者消费线程会阻塞,只有有一对消费和发送线程匹配上,才同时退出,所以这种阻塞队列可以理解为"配对"队列。

transfer方法

  1. 如果队列为空或者头节点模式和自己的模式相同,尝试将自己增加到队列的等待者中,等待被满足或者被取消
  2. 如果队列包含了在等待的节点,并且本次调用是与之模式匹配的调用,尝试通过CAS修改等待节点item字段然后将其出队或出栈

在构造函数中可以通过传boolean值构建是否公平模式,默认是非公平模式。

public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }
public SynchronousQueue() {
        this(false);
    }    

公平模式:
公平模式下,底层实现使用的是TransferQueue这个内部队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。

例如在队列中已有两个put *** 作处于阻塞中,当再次有tack *** 作时,这时和put2匹配成功,将put1唤醒,是的,根据公平策略,先进先出。put1线程被唤醒消费,实现了线程间的一对一通信。

非公平模式:
与公平模式相比,大体相同,底层的实现使用的是TransferStack。总结而言,就是先进后出,来了一个线程take1,执行了take *** 作,这时候发现栈顶为put2线程,匹配成功,但是实现会先把take1线程入栈,然后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程


linkedTransferQueue

我们知道 SynchronousQueue内部无法存储元素,当要添加元素的时候,需要阻塞,不够完美,linkedBolckingQueue 则内部使用了一些锁,性能不高,所以linkedTransferQueue出现了,它可以算是 linkedBolckingQueue 和 SynchronousQueue 的结合体,实现了接口TransferQueue

public class linkedTransferQueue extends AbstractQueue
    implements TransferQueue, java.io.Serializable {
        
    transient volatile Node head;

    
    private transient volatile Node tail;
    }

linkedTransferQueue相比SynchronousQueue 多了指向队列 的指针

public interface TransferQueue extends BlockingQueue {
    // 如果可能,立即将元素转移给等待的消费者。 
    // 更确切地说,如果存在消费者已经等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则返回 false。
    boolean tryTransfer(E e);

    // 将元素转移给消费者,如果需要的话等待。 
    // 更准确地说,如果存在一个消费者已经等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则等待直到元素由消费者接收。
    void transfer(E e) throws InterruptedException;

    // 上面方法的基础上设置超时时间
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    // 如果至少有一位消费者在等待,则返回 true
    boolean hasWaitingConsumer();

    // 返回等待消费者人数的估计值
    int getWaitingConsumerCount();
}

从put举例

public void put(E e) {
     xfer(e, true, ASYNC, 0);
}

xfer方法

private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race
        // 从  head 开始
        for (Node h = head, p = h; p != null;) { // find & match first node
            // head 的类型。
            boolean isData = p.isData;
            // head 的数据
            Object item = p.item;
            // item != null 有 2 种情况,一是 put  *** 作, 二是 take 的 itme 被修改了(匹配成功)
            // (itme != null) == isData 要么表示 p 是一个 put  *** 作, 要么表示 p 是一个还没匹配成功的 take  *** 作
            if (item != p && (item != null) == isData) { 
                // 如果当前 *** 作和 head  *** 作相同,就没有匹配上,结束循环,进入下面的 if 块。
                if (isData == haveData)   // can't match
                    break;
                // 如果 *** 作不同,匹配成功, 尝试替换 item 成功,
                if (p.casItem(item, e)) { // match
                    // 更新 head
                    for (Node q = p; q != h;) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    // 唤醒原 head 线程.
                    LockSupport.unpark(p.waiter);
                    return linkedTransferQueue.cast(item);
                }
            }
            // 找下一个
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }
        // 如果这个 *** 作不是立刻就返回的类型    
        if (how != NOW) {                 // No matches available
            // 且是第一次进入这里
            if (s == null)
                // 创建一个 node
                s = new Node(e, haveData);
            // 尝试将 node 追加对队列尾部,并返回他的上一个节点。
            Node pred = tryAppend(s, haveData);
            // 如果返回的是 null, 表示不能追加到 tail 节点,因为 tail 节点的模式和当前模式相反.
            if (pred == null)
                // 重来
                continue retry;           // lost race vs opposite mode
            // 如果不是异步 *** 作(即立刻返回结果)
            if (how != ASYNC)
                // 阻塞等待匹配值
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

xfer逻辑就是:
找到 head 节点,如果 head 节点是匹配的 *** 作,就直接赋值,如果不是,添加到队列中
相比较 SynchronousQueue 多了一个可以存储的队列,相比较 linkedBlockingQueue 多了直接传递元素,少了用锁来同步。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5596083.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存