基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想,大致意思就是多个线程同时监听一个变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。这也是最简单的一种实现方式。
public class VolatileShareDemo { // 定义一个共享变量来实现通信,它需要是volatile修饰,否则线程不能及时感知 static volatile boolean notice = false; public static void main(String[] args) { Listwait/notifylist = new ArrayList<>(); // 实现线程A Thread threadA = new Thread(() -> { for (int i = 1; i <= 10; i++) { list.add("abc"); System.out.println("线程A向list中添加一个元素,此时list中的元素个数为:" + list.size()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if (list.size() == 5) notice = true; } }); // 实现线程B Thread threadB = new Thread(() -> { while (true) { if (notice) { System.out.println("线程B收到通知,开始执行自己的业务..."); break; } } }); // 需要先启动线程B threadB.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 再启动线程A threadA.start(); } }
众所周知,Object类提供了线程间通信的方法:wait()、notify()、notifyaAl(),它们是多线程通信的基础,而这种实现方式的思想自然是线程间通信。
注意:
wait和 notify必须配合synchronized使用,wait方法释放锁,notify方法不释放锁
wait和notify调用的顺序一定要注意先后,如果先调用了notify,然后才调用wait方法的话,则调用了wait方法被阻塞的线程则不会被唤醒,会一直处于阻塞状态。
我们可以通过生产者/消费者问题来学习如何使用。
public class Producer implements Runnable { private Queuebags; private int maxSize; public Producer(Queue bags, int maxSize) { this.bags = bags; this.maxSize = maxSize; } @Override public void run() { int i = 0; while (true) { i++; synchronized (bags) { //抢占锁 if (bags.size() == maxSize) { System.out.println("bags 满了"); try { //park(); ->JVM ->Native bags.wait(); //满了,阻塞当前线程并且释放Producer抢到的锁 } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("生产者生产:bag" + i); bags.add("bag" + i); //生产bag bags.notify(); //表示当前已经生产了数据,提示消费者可以消费了 } //同步代码快执行结束 } } }
public class Consumer implements Runnable { private Queuebags; private int maxSize; public Consumer(Queue bags, int maxSize) { this.bags = bags; this.maxSize = maxSize; } @Override public void run() { while (true) { synchronized (bags) { if (bags.isEmpty()) { System.out.println("bags为空"); try { bags.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String bag = bags.remove(); System.out.println("消费者消费:" + bag); bags.notify(); //这里只是唤醒Producer线程,但是Producer线程并不能马上执行。 } //同步代码块执行结束, monitorexit指令执行完成 } } }
public class ProducerConsumerExample { public static void main(String[] args) throws InterruptedException { Queuestrings = new linkedList<>(); Producer producer = new Producer(strings, 3); Consumer consumer = new Consumer(strings, 3); new Thread(producer).start(); //生产者在消费者后 Thread.sleep(100); new Thread(consumer).start(); } }
运行效果:
生产者生产:bag1 生产者生产:bag2 生产者生产:bag3 bags 满了 消费者消费:bag1 消费者消费:bag2 消费者消费:bag3 bags为空 生产者生产:bag4
从运行效果我们可以看出,消费者发出notify()唤醒通知之后,依然是走完了自己线程的业务之后,生产者才开始执行,这也正好说明了,notify()方法不释放锁,而wait()方法释放锁。
ConditionCondition接口提供了与Object阻塞(wait())与唤醒(notify()或notifyAll())相似的功能,只不过Condition接口提供了更为丰富的功能,如:限定等待时长等。Condition需要与Lock结合使用,需要通过锁对象获取Condition。
基本使用从以下代码可以清楚了解condition如何使用,以及他的执行顺序。
public class ConditionDemoWait implements Runnable { private Lock lock; private Condition condition; public ConditionDemoWait(Lock lock, Condition condition) { this.lock = lock; this.condition = condition; } @Override public void run() { System.out.println("begin - ConditionDemoWait"); lock.lock(); try { condition.await(); //让当前线程阻塞,Object.wait(); System.out.println("end - ConditionDemoWait"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
public class ConditionDemoNotify implements Runnable { private Lock lock; private Condition condition; public ConditionDemoNotify(Lock lock, Condition condition) { this.lock = lock; this.condition = condition; } @Override public void run() { System.out.println("begin - ConditionDemoNotify"); lock.lock(); //synchronized(lock) try { condition.signal(); //让当前线程唤醒 Object.notify(); //因为任何对象都会有monitor System.out.println("end - ConditionDemoNotify"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
public class ConditionExample { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); ConditionDemoWait cd = new ConditionDemoWait(lock, condition); ConditionDemoNotify cdn = new ConditionDemoNotify(lock, condition); new Thread(cd).start(); new Thread(cdn).start(); } }
运行效果:
begin - ConditionDemoWait begin - ConditionDemoNotify end - ConditionDemoNotify end - ConditionDemoWait源码分析
上述示例中的Condition对象是调用了Lock#newCondition()方法,源码如下:
public class ReentrantLock implements Lock, java.io.Serializable { ... public Condition newCondition() { return sync.newCondition(); } abstract static class Sync extends AbstractQueuedSynchronizer { ... final ConditionObject newCondition() { return new ConditionObject(); } ... } ... }
上述的ConditionObject定义在AQS中,如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... public class ConditionObject implements Condition, java.io.Serializable { ... } ... }
首先来分析下Condition#await()方法
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
根据AQS队列的特性,若有多个线程执行lock#lock()方法,会将处于阻塞状态的线程维护到一个双向链表中,如下:
假设当前是线程A获取到锁,其他线程执行lock#lock()方法时,将会构建成一个上述链表。
若获取锁的线程(线程A)执行Condition#await()方法,则会将当前线程添加至Condition队列中,如下:
然后在调用fullyRelease()方法时会释放当前线程的锁,然后唤醒处于阻塞队列中的下一个线程:
在调用isonSyncQueue()方法时会检查当前节点是否在同步队列中,若不存在,则会调用LockSupport.park()进行阻塞。
假设当前线程A是生产者线程,调用await()方法后,会释放锁,并且将当前线程加入到Condition队列中。此时,消费者能获取到锁资源,然后继续执行。
假设线程B是消费者线程,当添加一个元素后会调用condition#signal()方法,定义如下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
执行signal()方法,会将Condition队列中的第一个节点移除,将其变为同步队列中的尾结点,如下:
至此,完成了Condition队列转换为同步队列的过程。后续流程基本就是重复以上 *** 作。
实际应用虽然我们在日常开发中可能很少用到Condition,但是他还是具有实际用处的。
- 实现阻塞队列
- 在线程池中会用到阻塞队列
- 生产者消费者
- 流量缓冲等
队列是一种只允许在一端进行删除 *** 作,在另一端进行插入 *** 作的线性表,允许插入的一端称为队尾、允许删除的一端称为队头。
那么阻塞队列,实际上是在队列的基础上增加了两个 *** 作。
-
支持阻塞插入:队列满了的情况下,会阻塞继续往队列中添加数据的线程,直到队列元素被释放。
-
支持阻塞移除:队列为空的情况下,会阻塞从队列中获取元素的线程,直到队列添加了新的元素。
-
添加元素
针对队列满了之后的不同的处理策略
- add -> 如果队列满了,抛出异常
- offer -> true/false , 添加成功返回true,否则返回false
- put -> 如果队列满了,则一直阻塞
- offer(timeout) , 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout时长,超过阻塞时长,返回false。
-
移除元素
-
element-> 队列为空,抛异常
-
peek -> true/false , 移除成功返回true,否则返回false
-
take -> 一直阻塞
-
poll(timeout) -> 如果超时了,还没有元素,则返回null
-
-
ArrayBlockingQueue 基于数组结构
-
linkedBlockingQueue 基于链表结构
-
PriorityBlcokingQueue 基于优先级队列
-
DelayQueue 允许延时执行的队列
-
SynchronousQueue 没有任何存储结构的的队列
//首先需要实现Delayed接口 public class DelayQueueExampleTask implements Delayed { private String orderId; private long start = System.currentTimeMillis(); private long time; // public DelayQueueExampleTask(String orderId, long time) { this.orderId = orderId; this.time = time; } //计算延迟时间 @Override public long getDelay(TimeUnit unit) { return unit.convert((start + time) - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } //比较 @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return "DelayQueueExampleTask{" + "orderId='" + orderId + ''' + ", start=" + start + ", time=" + time + '}'; } }
public class DelayQueueMain { private static DelayQueuedelayQueue = new DelayQueue(); public static void main(String[] args) { delayQueue.offer(new DelayQueueExampleTask("1001", 1000)); delayQueue.offer(new DelayQueueExampleTask("1002", 5000)); delayQueue.offer(new DelayQueueExampleTask("1003", 3000)); delayQueue.offer(new DelayQueueExampleTask("1004", 6000)); delayQueue.offer(new DelayQueueExampleTask("1005", 2000)); delayQueue.offer(new DelayQueueExampleTask("1006", 8000)); delayQueue.offer(new DelayQueueExampleTask("1007", 3000)); while (true) { try { DelayQueueExampleTask task = delayQueue.take(); System.out.println(task); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行效果:
DelayQueueExampleTask{orderId='1001', start=1636615343142, time=1000} DelayQueueExampleTask{orderId='1005', start=1636615343142, time=2000} DelayQueueExampleTask{orderId='1007', start=1636615343142, time=3000} DelayQueueExampleTask{orderId='1003', start=1636615343142, time=3000} DelayQueueExampleTask{orderId='1002', start=1636615343142, time=5000} DelayQueueExampleTask{orderId='1004', start=1636615343142, time=6000} DelayQueueExampleTask{orderId='1006', start=1636615343142, time=8000}通过condition 实现阻塞队列
我们可以通过以下代码初步实现阻塞队列的效果。
public class ConditionBlockedQueueExample { //表示阻塞队列中的容器(正常应该是一个queue) private Listitems; //元素个数(表示已经添加的元素个数) private volatile int size; //数组的容量 private volatile int count; private Lock lock = new ReentrantLock(); //让take方法阻塞 ->wait/notify private final Condition notEmpty = lock.newCondition(); //放add方法阻塞 private final Condition notFull = lock.newCondition(); public ConditionBlockedQueueExample(int count) { this.count = count; items = new ArrayList<>(count); //写死了 } //添加一个元素,并且阻塞添加 public void put(String item) throws InterruptedException { lock.lock(); try { if (size >= count) { System.out.println("队列满了,需要先等一会"); notFull.await(); } ++size; //增加元素个数 items.add(item); notEmpty.signal(); } finally { lock.unlock(); } } public String take() throws InterruptedException { lock.lock(); try { if (size == 0) { System.out.println("阻塞队列空了,先等一会"); notEmpty.await(); } --size; String item = items.remove(0); notFull.signal(); return item; } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { ConditionBlockedQueueExample cbqe = new ConditionBlockedQueueExample(10); //生产者线程 Thread t1 = new Thread(() -> { Random random = new Random(); for (int i = 0; i < 1000; i++) { String item = "item-" + i; try { cbqe.put(item); //如果队列满了,put会阻塞 System.out.println("生产一个元素:" + item); Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); Thread.sleep(100); Thread t2 = new Thread(() -> { Random random = new Random(); for (; ; ) { try { String item = cbqe.take(); System.out.println("消费者线程消费一个元素:" + item); Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } }); t2.start(); } }
运行效果:
..... 生产一个元素:item-5 消费者线程消费一个元素:item-5 生产一个元素:item-6 消费者线程消费一个元素:item-6 生产一个元素:item-7 消费者线程消费一个元素:item-7 生产一个元素:item-8 消费者线程消费一个元素:item-8 生产一个元素:item-9 消费者线程消费一个元素:item-9 生产一个元素:item-10 消费者线程消费一个元素:item-10 阻塞队列空了,先等一会 生产一个元素:item-11 消费者线程消费一个元素:item-11 阻塞队列空了,先等一会 生产一个元素:item-12 消费者线程消费一个元素:item-12 阻塞队列空了,先等一会 生产一个元素:item-13 消费者线程消费一个元素:item-13
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)