共享内存
基于某个条件来等待或者唤醒:Wait/Notify、join、Condition
生产者
import java.util.Queue;
public class Producer implements Runnable {
private Queue<String> bags;
private int maxSize;
public Producer(Queue<String> bags, int maxSize) {
this.bags = bags;
this.maxSize = maxSize;
}
@Override
public void run() {
int i=0;
while(true){
i++;
synchronized (bags){ //抢占锁
if(bags.size()==maxSize){
System.out.println("bags 满了");
try {
//park(); ->JVM ->Native
bags.wait(); //满了,阻塞当前线程并且释放Producer抢到的锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产者生产:bag"+i);
bags.add("bag"+i); //生产bag
bags.notify(); //表示当前已经生产了数据,提示消费者可以消费了
} //同步代码快执行结束
}
}
}
消费者
import java.util.Queue;
public class Consumer implements Runnable{
private Queue<String> bags;
private int maxSize;
public Consumer(Queue<String> bags, int maxSize) {
this.bags = bags;
this.maxSize = maxSize;
}
@Override
public void run() {
while(true){
synchronized (bags){
if(bags.isEmpty()){
System.out.println("bags为空");
try {
bags.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String bag=bags.remove();
System.out.println("消费者消费:"+bag);
bags.notify(); //这里只是唤醒Producer线程,但是Producer线程并不能马上执行。
} //同步代码块执行结束, monitorexit指令执行完成
}
}
}
测试
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumerExample {
public static void main(String[] args) throws InterruptedException {
Queue<String> strings=new LinkedList<>();
Producer producer=new Producer(strings,10);
Consumer consumer=new Consumer(strings,10);
new Thread(producer).start();
Thread.sleep(100);
new Thread(consumer).start();
}
}
2.2 原理分析
三、join
join也是基于wait/notify来实现,notify是在线程销毁之后调用的。
应用案例
import java.util.concurrent.TimeUnit;
public class JoinExample extends Thread{
private static int x=0;
@Override
public void run() {
try {
x=100;
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
//当run方法执行结束,给一个notify的信号
// lock.notify_all(thread);
}
public static void main(String[] args) throws InterruptedException {
JoinExample je=new JoinExample();
je.start();
//TODO ......doSomething();
//wait()
je.join(); //等待je线程运行结束|如果没有执行结束,会阻塞main线程(那个线程调用,就阻塞那个线程)
if(x==100){//成立
System.out.println("main线程执行结束");
}
}
}
阻塞(wait)线程在join方法实现,唤醒线程在jvm实现
jvm相关源码
static void ensure_join(JavaThread* thread) {
// We do not need to grap the Threads_lock, since we are operating on ourself.
Handle threadObj(thread, thread->threadObj());
assert(threadObj.not_null(), "java thread object must exist");
ObjectLocker lock(threadObj, thread);
// Ignore pending exception (ThreadDeath), since we are exiting anyway
thread->clear_pending_exception();
// Thread is exiting. So set thread_status field in java.lang.Thread class to
TERMINATED.
java_lang_Thread::set_thread_status(threadObj(),
java_lang_Thread::TERMINATED);
// Clear the native thread instance - this makes isAlive return false and
allows the join()
// to complete once we've done the notify_all below
java_lang_Thread::set_thread(threadObj(), NULL);
lock.notify_all(thread);
// Ignore pending exception (ThreadDeath), since we are exiting anyway
thread->clear_pending_exception();
}
四、Condition
Condition实际上就是J.U.C版本的wait/notify。可以让线程基于某个条件去等待和唤醒。Condition是JUC中为了配合Lock而开发的。
4.1 应用案例ConditionDemeNotify.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ConditionDemeNotify implements Runnable{
private Lock lock;
private Condition condition;
public ConditionDemeNotify(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemeNotify");
lock.lock(); //synchronized(lock)
try{
condition.signal(); //让当前线程唤醒 Object.notify(); //因为任何对象都会有monitor
System.out.println("end - ConditionDemeNotify");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
ConditionDemoWait.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ConditionDemoWait implements Runnable{
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoWait");
lock.lock();
try{
condition.await(); //让当前线程阻塞,Object.wait();
System.out.println("end - ConditionDemoWait");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
ConditionExample.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionExample {
public static void main(String[] args) {
Lock lock=new ReentrantLock();
Condition condition=lock.newCondition();
ConditionDemoWait cd=new ConditionDemoWait(lock,condition);
ConditionDemeNotify cdn=new ConditionDemeNotify(lock,condition);
new Thread(cd).start();
new Thread(cdn).start();
}
}
4.2 设计猜想
作用
实现线程的阻塞和唤醒
前提条件
必须先要获得锁
方法
await/signal;signalAll
await -> 让线程阻塞, 并且释放锁
signal -> 唤醒阻塞的线程
队列
1、加锁的 *** 作,必然会涉及到AQS的阻塞队列。
2、await 释放锁的时候,AQS队列中不存在已经释放锁的线程,这个被释放的线程去了哪里?通过await方法释放的线程,必须要有一个地方来存储,并且还需要被阻塞,会存在一个等待队列,LockSupport.park阻塞。
3、signal 唤醒被阻塞的线程从哪里唤醒?在等待队列中,唤醒一个线程, 放哪里去? 是不是应该再放到AQS队列?
await:可以阻塞N个线程,会释放持有的锁
signal、signalAll
主要问题:
如何让线程等待
等待队列来存储等待中的线程
唤醒等待的线程
AQS中的同步队列和Condition中的等待队列的线程的转移
- 释放锁
- 让释放锁的线程,应该被阻塞。
- 被阻塞之后要存储到队列中。
- 重新去竞争锁。->AQS的逻辑
- 要能够处理interupt()的中断响应。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加到等待队列
Node node = addConditionWaiter();
//完整的释放锁(考虑重入问题)
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//上下文切换(程序计数器、寄存器) 用户态-内核态的转化(上下文切换)
LockSupport.park(this); //阻塞当前线程(当其他线程调用signal()方法时,该线程会从
这个位置去执行)
//要判断当前被阻塞的线程是否是因为interrupt()唤醒
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//重新竞争锁,savedState表示的是被释放的锁的重入次数.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4.3.2 signal
要把被阻塞的线程,先唤醒(signal、signalAll)
把等待队列中被唤醒的线程转移到AQS队列中
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; //得到当前的等待队列
if (first != null)
doSignal(first);
}
//唤醒等待队列中的一个线程
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//这里是把当前从等待队列中头部节点的保存到AQS队列
Node p = enq(node);
int ws = p.waitStatus;
//
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒.
return true;
}
4.3.3 图解condition原理
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)