Java并发编程——线程通信Condition及其原理分析

Java并发编程——线程通信Condition及其原理分析,第1张

Java并发编程——线程通信Condition及其原理分析 线程通信 使用 volatile 关键字

基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想,大致意思就是多个线程同时监听一个变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。这也是最简单的一种实现方式。

public class VolatileShareDemo {
    // 定义一个共享变量来实现通信,它需要是volatile修饰,否则线程不能及时感知
    static volatile boolean notice = false;

    public static void main(String[] args) {
        List list = new ArrayList<>();
        // 实现线程A
        Thread threadA = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                list.add("abc");
                System.out.println("线程A向list中添加一个元素,此时list中的元素个数为:" + list.size());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (list.size() == 5)
                    notice = true;
            }
        });
        // 实现线程B
        Thread threadB = new Thread(() -> {
            while (true) {
                if (notice) {
                    System.out.println("线程B收到通知,开始执行自己的业务...");
                    break;
                }
            }
        });
        // 需要先启动线程B
        threadB.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 再启动线程A
        threadA.start();
    }
}
wait/notify

众所周知,Object类提供了线程间通信的方法:wait()notify()notifyaAl(),它们是多线程通信的基础,而这种实现方式的思想自然是线程间通信。

注意:

  • wait和 notify必须配合synchronized使用,wait方法释放锁,notify方法不释放锁

  • wait和notify调用的顺序一定要注意先后,如果先调用了notify,然后才调用wait方法的话,则调用了wait方法被阻塞的线程则不会被唤醒,会一直处于阻塞状态。

我们可以通过生产者/消费者问题来学习如何使用。

public class Producer implements Runnable {
    
    private Queue bags;
    
    private int maxSize;

    public Producer(Queue bags, int maxSize) {
        this.bags = bags;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        int i = 0;
        while (true) {
            i++;
            synchronized (bags) { //抢占锁
                if (bags.size() == maxSize) {
                    System.out.println("bags 满了");
                    try {
                        //park(); ->JVM ->Native
                        bags.wait(); //满了,阻塞当前线程并且释放Producer抢到的锁
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("生产者生产:bag" + i);
                bags.add("bag" + i); //生产bag
                bags.notify(); //表示当前已经生产了数据,提示消费者可以消费了
            } //同步代码快执行结束
        }
    }
}
public class Consumer implements Runnable {
    
    private Queue bags;
    
    private int maxSize;

    public Consumer(Queue bags, int maxSize) {
        this.bags = bags;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (bags) {
                if (bags.isEmpty()) {
                    System.out.println("bags为空");
                    try {
                        bags.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String bag = bags.remove();
                System.out.println("消费者消费:" + bag);
                bags.notify(); //这里只是唤醒Producer线程,但是Producer线程并不能马上执行。
            } //同步代码块执行结束, monitorexit指令执行完成
        }
    }
}
public class ProducerConsumerExample {
    public static void main(String[] args) throws InterruptedException {
        Queue strings = new linkedList<>();
        Producer producer = new Producer(strings, 3);
        Consumer consumer = new Consumer(strings, 3);
        new Thread(producer).start();
        //生产者在消费者后
        Thread.sleep(100);
        new Thread(consumer).start();
    }
}

运行效果:

生产者生产:bag1
生产者生产:bag2
生产者生产:bag3
bags 满了
消费者消费:bag1
消费者消费:bag2
消费者消费:bag3
bags为空
生产者生产:bag4

从运行效果我们可以看出,消费者发出notify()唤醒通知之后,依然是走完了自己线程的业务之后,生产者才开始执行,这也正好说明了,notify()方法不释放锁,而wait()方法释放锁。

Condition

Condition接口提供了与Object阻塞(wait())与唤醒(notify()或notifyAll())相似的功能,只不过Condition接口提供了更为丰富的功能,如:限定等待时长等。Condition需要与Lock结合使用,需要通过锁对象获取Condition。

基本使用

从以下代码可以清楚了解condition如何使用,以及他的执行顺序。

public class ConditionDemoWait implements Runnable {
    
    private Lock lock;
    
    private Condition condition;

    public ConditionDemoWait(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        System.out.println("begin - ConditionDemoWait");
        lock.lock();
        try {
            condition.await(); //让当前线程阻塞,Object.wait();
            System.out.println("end - ConditionDemoWait");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public class ConditionDemoNotify implements Runnable {

    private Lock lock;

    private Condition condition;

    public ConditionDemoNotify(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        System.out.println("begin - ConditionDemoNotify");
        lock.lock(); //synchronized(lock)
        try {
            condition.signal(); //让当前线程唤醒  Object.notify(); //因为任何对象都会有monitor
            System.out.println("end - ConditionDemoNotify");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public class ConditionExample {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        
        ConditionDemoWait cd = new ConditionDemoWait(lock, condition);
        ConditionDemoNotify cdn = new ConditionDemoNotify(lock, condition);

        new Thread(cd).start();
        new Thread(cdn).start();
    }
}

运行效果:

begin - ConditionDemoWait
begin - ConditionDemoNotify
    
end - ConditionDemoNotify
end - ConditionDemoWait
源码分析

上述示例中的Condition对象是调用了Lock#newCondition()方法,源码如下:

public class ReentrantLock implements Lock, java.io.Serializable {
	...
	public Condition newCondition() {
        return sync.newCondition();
    }
	
	abstract static class Sync extends AbstractQueuedSynchronizer {
		...
		final ConditionObject newCondition() {
            return new ConditionObject();
        }
		...
	}
	...
}

上述的ConditionObject定义在AQS中,如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	...
	public class ConditionObject implements Condition, java.io.Serializable {
		...
	}
	...
}

首先来分析下Condition#await()方法

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}


private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

根据AQS队列的特性,若有多个线程执行lock#lock()方法,会将处于阻塞状态的线程维护到一个双向链表中,如下:

假设当前是线程A获取到锁,其他线程执行lock#lock()方法时,将会构建成一个上述链表。
若获取锁的线程(线程A)执行Condition#await()方法,则会将当前线程添加至Condition队列中,如下:

然后在调用fullyRelease()方法时会释放当前线程的锁,然后唤醒处于阻塞队列中的下一个线程:

在调用isonSyncQueue()方法时会检查当前节点是否在同步队列中,若不存在,则会调用LockSupport.park()进行阻塞。

假设当前线程A是生产者线程,调用await()方法后,会释放锁,并且将当前线程加入到Condition队列中。此时,消费者能获取到锁资源,然后继续执行。

假设线程B是消费者线程,当添加一个元素后会调用condition#signal()方法,定义如下:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

执行signal()方法,会将Condition队列中的第一个节点移除,将其变为同步队列中的尾结点,如下:

至此,完成了Condition队列转换为同步队列的过程。后续流程基本就是重复以上 *** 作。

实际应用

虽然我们在日常开发中可能很少用到Condition,但是他还是具有实际用处的。

  • 实现阻塞队列
  • 在线程池中会用到阻塞队列
  • 生产者消费者
  • 流量缓冲等
阻塞队列

队列是一种只允许在一端进行删除 *** 作,在另一端进行插入 *** 作的线性表,允许插入的一端称为队尾、允许删除的一端称为队头。

那么阻塞队列,实际上是在队列的基础上增加了两个 *** 作。

  • 支持阻塞插入:队列满了的情况下,会阻塞继续往队列中添加数据的线程,直到队列元素被释放。

  • 支持阻塞移除:队列为空的情况下,会阻塞从队列中获取元素的线程,直到队列添加了新的元素。

阻塞队列中的方法
  • 添加元素

    针对队列满了之后的不同的处理策略

    • add -> 如果队列满了,抛出异常
    • offer -> true/false , 添加成功返回true,否则返回false
    • put -> 如果队列满了,则一直阻塞
    • offer(timeout) , 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout时长,超过阻塞时长,返回false。
  • 移除元素

    • element-> 队列为空,抛异常

    • peek -> true/false , 移除成功返回true,否则返回false

    • take -> 一直阻塞

    • poll(timeout) -> 如果超时了,还没有元素,则返回null

JUC中的阻塞队列
  • ArrayBlockingQueue 基于数组结构

  • linkedBlockingQueue 基于链表结构

  • PriorityBlcokingQueue 基于优先级队列

  • DelayQueue 允许延时执行的队列

  • SynchronousQueue 没有任何存储结构的的队列

延时队列的实现
//首先需要实现Delayed接口
public class DelayQueueExampleTask implements Delayed {
    private String orderId;

    private long start = System.currentTimeMillis();

    private long time; //

    public DelayQueueExampleTask(String orderId, long time) {
        this.orderId = orderId;
        this.time = time;
    }

    //计算延迟时间
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((start + time) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    //比较
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "DelayQueueExampleTask{" +
                "orderId='" + orderId + ''' +
                ", start=" + start +
                ", time=" + time +
                '}';
    }
}
public class DelayQueueMain {

    private static DelayQueue delayQueue = new DelayQueue();

    public static void main(String[] args) {
        delayQueue.offer(new DelayQueueExampleTask("1001", 1000));
        delayQueue.offer(new DelayQueueExampleTask("1002", 5000));
        delayQueue.offer(new DelayQueueExampleTask("1003", 3000));
        delayQueue.offer(new DelayQueueExampleTask("1004", 6000));
        delayQueue.offer(new DelayQueueExampleTask("1005", 2000));
        delayQueue.offer(new DelayQueueExampleTask("1006", 8000));
        delayQueue.offer(new DelayQueueExampleTask("1007", 3000));
        while (true) {
            try {
                DelayQueueExampleTask task = delayQueue.take();
                System.out.println(task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

运行效果:

DelayQueueExampleTask{orderId='1001', start=1636615343142, time=1000}
DelayQueueExampleTask{orderId='1005', start=1636615343142, time=2000}
DelayQueueExampleTask{orderId='1007', start=1636615343142, time=3000}
DelayQueueExampleTask{orderId='1003', start=1636615343142, time=3000}
DelayQueueExampleTask{orderId='1002', start=1636615343142, time=5000}
DelayQueueExampleTask{orderId='1004', start=1636615343142, time=6000}
DelayQueueExampleTask{orderId='1006', start=1636615343142, time=8000}
通过condition 实现阻塞队列

我们可以通过以下代码初步实现阻塞队列的效果。

public class ConditionBlockedQueueExample {

    //表示阻塞队列中的容器(正常应该是一个queue)
    private List items;

    //元素个数(表示已经添加的元素个数)
    private volatile int size;

    //数组的容量
    private volatile int count;

    private Lock lock = new ReentrantLock();

    //让take方法阻塞 ->wait/notify
    private final Condition notEmpty = lock.newCondition();

    //放add方法阻塞
    private final Condition notFull = lock.newCondition();

    public ConditionBlockedQueueExample(int count) {
        this.count = count;
        items = new ArrayList<>(count); //写死了
    }

    //添加一个元素,并且阻塞添加
    public void put(String item) throws InterruptedException {
        lock.lock();
        try {
            if (size >= count) {
                System.out.println("队列满了,需要先等一会");
                notFull.await();
            }
            ++size; //增加元素个数
            items.add(item);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public String take() throws InterruptedException {
        lock.lock();
        try {
            if (size == 0) {
                System.out.println("阻塞队列空了,先等一会");
                notEmpty.await();
            }
            --size;
            String item = items.remove(0);
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionBlockedQueueExample cbqe = new ConditionBlockedQueueExample(10);
        //生产者线程
        Thread t1 = new Thread(() -> {
            Random random = new Random();
            for (int i = 0; i < 1000; i++) {
                String item = "item-" + i;
                try {
                    cbqe.put(item); //如果队列满了,put会阻塞
                    System.out.println("生产一个元素:" + item);
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        Thread.sleep(100);
        Thread t2 = new Thread(() -> {
            Random random = new Random();
            for (; ; ) {
                try {
                    String item = cbqe.take();
                    System.out.println("消费者线程消费一个元素:" + item);
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t2.start();
    }

}

运行效果:

.....
    
生产一个元素:item-5
消费者线程消费一个元素:item-5
生产一个元素:item-6
消费者线程消费一个元素:item-6
生产一个元素:item-7
消费者线程消费一个元素:item-7
生产一个元素:item-8
消费者线程消费一个元素:item-8
生产一个元素:item-9
消费者线程消费一个元素:item-9
生产一个元素:item-10
消费者线程消费一个元素:item-10
阻塞队列空了,先等一会
生产一个元素:item-11
消费者线程消费一个元素:item-11
阻塞队列空了,先等一会
生产一个元素:item-12
消费者线程消费一个元素:item-12
阻塞队列空了,先等一会
生产一个元素:item-13
消费者线程消费一个元素:item-13

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5438093.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存