并发编程之通信

并发编程之通信,第1张

一、线程通信方式

共享内存
基于某个条件来等待或者唤醒:Wait/Notify、join、Condition

二、Wait/Notify 2.1 生产者消费者案例

生产者

import java.util.Queue;

public class Producer implements Runnable {
    private Queue<String> bags;
    private int maxSize;

    public Producer(Queue<String> bags, int maxSize) {
        this.bags = bags;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        int i=0;
        while(true){
            i++;
            synchronized (bags){ //抢占锁
                if(bags.size()==maxSize){
                    System.out.println("bags 满了");
                    try {
                        //park(); ->JVM ->Native
                        bags.wait(); //满了,阻塞当前线程并且释放Producer抢到的锁
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("生产者生产:bag"+i);
                bags.add("bag"+i); //生产bag
                bags.notify(); //表示当前已经生产了数据,提示消费者可以消费了
            } //同步代码快执行结束
        }
    }
}

消费者

import java.util.Queue;

public class Consumer implements Runnable{
    private Queue<String> bags;
    private int maxSize;

    public Consumer(Queue<String> bags, int maxSize) {
        this.bags = bags;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while(true){
            synchronized (bags){
                if(bags.isEmpty()){
                    System.out.println("bags为空");
                    try {
                        bags.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String bag=bags.remove();
                System.out.println("消费者消费:"+bag);
                bags.notify(); //这里只是唤醒Producer线程,但是Producer线程并不能马上执行。
            } //同步代码块执行结束, monitorexit指令执行完成
        }
    }
}

测试

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumerExample {
    public static void main(String[] args) throws InterruptedException {
        Queue<String> strings=new LinkedList<>();
        Producer producer=new Producer(strings,10);
        Consumer consumer=new Consumer(strings,10);
        new Thread(producer).start();
        Thread.sleep(100);
        new Thread(consumer).start();
    }
}
2.2 原理分析

三、join

join也是基于wait/notify来实现,notify是在线程销毁之后调用的。
应用案例

import java.util.concurrent.TimeUnit;

public class JoinExample extends Thread{
    private static int x=0;
    @Override
    public void run() {
        try {
            x=100;
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //当run方法执行结束,给一个notify的信号
        // lock.notify_all(thread);
    }

    public static void main(String[] args) throws InterruptedException {
        JoinExample je=new JoinExample();
        je.start();
        //TODO ......doSomething();
        //wait()
        je.join(); //等待je线程运行结束|如果没有执行结束,会阻塞main线程(那个线程调用,就阻塞那个线程)
        if(x==100){//成立
            System.out.println("main线程执行结束");
        }
    }
}

阻塞(wait)线程在join方法实现,唤醒线程在jvm实现
jvm相关源码

static void ensure_join(JavaThread* thread) {
 // We do not need to grap the Threads_lock, since we are operating on ourself.
 Handle threadObj(thread, thread->threadObj());
 assert(threadObj.not_null(), "java thread object must exist");
 ObjectLocker lock(threadObj, thread);
 // Ignore pending exception (ThreadDeath), since we are exiting anyway
 thread->clear_pending_exception();
 // Thread is exiting. So set thread_status field in java.lang.Thread class to
TERMINATED.
 java_lang_Thread::set_thread_status(threadObj(),
java_lang_Thread::TERMINATED);
 // Clear the native thread instance - this makes isAlive return false and
allows the join()
 // to complete once we've done the notify_all below
 java_lang_Thread::set_thread(threadObj(), NULL);
 lock.notify_all(thread);
 // Ignore pending exception (ThreadDeath), since we are exiting anyway
 thread->clear_pending_exception();
}
四、Condition

Condition实际上就是J.U.C版本的wait/notify。可以让线程基于某个条件去等待和唤醒。Condition是JUC中为了配合Lock而开发的。Condition相较于wait/notify使用更加灵活,wait/notify只能由synchronized锁对象来调用,而Condition可根据不同的条件来创建多个condition对象来进行await或者signal *** 作。

4.1 应用

Condition的实际应用
1、实现阻塞队列(业务组件)
2、在线程池中会用到阻塞队列
3、生产者消费者
4、流量缓冲

生产者消费者案例
ConditionDemeNotify.java

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class ConditionDemeNotify implements Runnable{
    private Lock lock;
    private Condition condition;

    public ConditionDemeNotify(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }
    @Override
    public void run() {
        System.out.println("begin - ConditionDemeNotify");
        lock.lock(); //synchronized(lock)
        try{
            condition.signal(); //让当前线程唤醒  Object.notify(); //因为任何对象都会有monitor
            System.out.println("end - ConditionDemeNotify");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

ConditionDemoWait.java

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class ConditionDemoWait implements Runnable{
    private Lock lock;
    private Condition condition;

    public ConditionDemoWait(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }
    @Override
    public void run() {
        System.out.println("begin - ConditionDemoWait");
        lock.lock();
        try{
            condition.await(); //让当前线程阻塞,Object.wait();
            System.out.println("end - ConditionDemoWait");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

ConditionExample.java

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
    public static void main(String[] args) {
        Lock lock=new ReentrantLock();
        Condition condition=lock.newCondition();
        ConditionDemoWait cd=new ConditionDemoWait(lock,condition);
        ConditionDemeNotify cdn=new ConditionDemeNotify(lock,condition);

        new Thread(cd).start();
        new Thread(cdn).start();
    }
}
4.2 设计猜想

作用
实现线程的阻塞和唤醒
前提条件
必须先要获得锁
方法
await/signal;signalAll
await -> 让线程阻塞, 并且释放锁
signal -> 唤醒阻塞的线程
队列
1、加锁的 *** 作,必然会涉及到AQS的阻塞队列。
2、await 释放锁的时候,AQS队列中不存在已经释放锁的线程,这个被释放的线程去了哪里?通过await方法释放的线程,必须要有一个地方来存储,并且还需要被阻塞,会存在一个等待队列,LockSupport.park阻塞。
3、signal 唤醒被阻塞的线程从哪里唤醒?在等待队列中,唤醒一个线程, 放哪里去? 是不是应该再放到AQS队列?

4.3 原理分析

await:可以阻塞N个线程,会释放持有的锁
signal、signalAll
主要问题:
如何让线程等待
等待队列来存储等待中的线程
唤醒等待的线程
AQS中的同步队列和Condition中的等待队列的线程的转移

4.3.1 await
  1. 释放锁
  2. 让释放锁的线程,应该被阻塞。
  3. 被阻塞之后要存储到队列中。
  4. 重新去竞争锁。->AQS的逻辑
  5. 要能够处理interupt()的中断响应。
public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  //添加到等待队列
  Node node = addConditionWaiter();
  //完整的释放锁(考虑重入问题)
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  while (!isOnSyncQueue(node)) {
    //上下文切换(程序计数器、寄存器) 用户态-内核态的转化(上下文切换)
    LockSupport.park(this); //阻塞当前线程(当其他线程调用signal()方法时,该线程会从
这个位置去执行)
    //要判断当前被阻塞的线程是否是因为interrupt()唤醒
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
 }
  //重新竞争锁,savedState表示的是被释放的锁的重入次数.
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
  if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
  Node t = lastWaiter;
  // If lastWaiter is cancelled, clean out.
  if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    t = lastWaiter;
 }
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  if (t == null)
    firstWaiter = node;
  else
    t.nextWaiter = node;
  lastWaiter = node;
  return node;
}
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
     }
      if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
   }
 } finally {
    if (failed)
      cancelAcquire(node);
 }
}
4.3.2 signal

要把被阻塞的线程,先唤醒(signal、signalAll)
把等待队列中被唤醒的线程转移到AQS队列中

public final void signal() {
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  Node first = firstWaiter; //得到当前的等待队列
  if (first != null)
    doSignal(first);
}

//唤醒等待队列中的一个线程
private void doSignal(Node first) {
  do {
    if ( (firstWaiter = first.nextWaiter) == null)
      lastWaiter = null;
    first.nextWaiter = null;
 } while (!transferForSignal(first) &&
      (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
  /*
    * If cannot change waitStatus, the node has been cancelled.
    */
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
  /*
    * Splice onto queue and try to set waitStatus of predecessor to
    * indicate that thread is (probably) waiting. If cancelled or
    * attempt to set waitStatus fails, wake up to resync (in which
    * case the waitStatus can be transiently and harmlessly wrong).
    */
  //这里是把当前从等待队列中头部节点的保存到AQS队列
  Node p = enq(node);
  int ws = p.waitStatus;
  //
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread); // 唤醒.
  return true;
}
4.3.3 图解condition原理


4.4 阻塞队列

队列是一种只允许在一端进行删除 *** 作,在另一端进行插入 *** 作的线性表,允许插入的一端称为队尾、允许删除的一端称为队头。那么阻塞队列,实际上是在队列的基础上增加了两个 *** 作。

  • 支持阻塞插入:队列满了的情况下,会阻塞继续往队列中添加数据的线程,直到队列元素被释放。
  • 支持阻塞移除:队列为空的情况下,会阻塞从队列中获取元素的线程,直到队列添加了新的元素。
4.4.1 阻塞队列中的方法
  • 添加元素
    针对队列满了之后的不同的处理策略
    add -> 如果队列满了,抛出异常
    offer -> true/false , 添加成功返回true,否则返回false
    put -> 如果队列满了,则一直阻塞
    offer(timeout) , 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout时长,超过阻塞时长,返回false。
  • 移除元素
    element-> 队列为空,抛异常
    peek -> true/false,移除成功返回true,否则返回false
    take -> 一直阻塞
    poll(timeout) -> 如果超时了,还没有元素,则返回null
    dequeue -> LIFO , FIFO的队列.
4.4.2 J.U.C 中的阻塞队列

ArrayBlockingQueue 基于数组结构
LinkedBlockingQueue 基于链表结构
PriorityBlcokingQueue 基于优先级队列
DelayQueue 允许延时执行的队列
SynchronousQueue 没有任何存储结构的的队列

/*可缓存的线程池。
可以处理非常大请求的任务。 1000个任务过来,那么线程池需要分配1000个线程来执行。*/
public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>());
}
4.4.3 DelayQueue应用
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueExampleTask implements Delayed {
    private String orderId;
    private long start=System.currentTimeMillis();
    private long time; //

    public DelayQueueExampleTask(String orderId, long time){
        this.orderId=orderId;
        this.time=time;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((start+time)-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "DelayQueueExampleTask{" +
                "orderId='" + orderId + '\'' +
                ", start=" + start +
                ", time=" + time +
                '}';
    }
}
import java.util.concurrent.DelayQueue;

public class DelayQueueMain {

    private static DelayQueue<DelayQueueExampleTask> delayQueue=new DelayQueue();
    public static void main(String[] args) {
        delayQueue.offer(new DelayQueueExampleTask("1001",1000));
        delayQueue.offer(new DelayQueueExampleTask("1002",5000));
        delayQueue.offer(new DelayQueueExampleTask("1003",3000));
        delayQueue.offer(new DelayQueueExampleTask("1004",6000));
        delayQueue.offer(new DelayQueueExampleTask("1005",2000));
        delayQueue.offer(new DelayQueueExampleTask("1006",8000));
        delayQueue.offer(new DelayQueueExampleTask("1007",3000));
        while(true){
            try {
                DelayQueueExampleTask task=delayQueue.take();
                System.out.println(task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}
4.4.4 模拟阻塞队列使用condition
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionBlockedQueueExample {

    //表示阻塞队列中的容器
    private List<String> items;
    //元素个数(表示已经添加的元素个数)
    private volatile int size;
    //数组的容量
    private volatile int count;
    private Lock lock=new ReentrantLock();
    //让take方法阻塞 ->wait/notify
    private final Condition notEmpty=lock.newCondition();
    //放add方法阻塞
    private final Condition notFull=lock.newCondition();

    public ConditionBlockedQueueExample(int count){
        this.count=count;
        items=new ArrayList<>(count); //写死了
    }

    //添加一个元素,并且阻塞添加
    public void put(String item) throws InterruptedException {
        lock.lock();
        try{
            if(size>=count){
                System.out.println("队列满了,需要先等一会");
                notFull.await();
            }
            ++size; //增加元素个数
            items.add(item);
            notEmpty.signal();
        }finally {
            lock.unlock();
        }
    }
    public String take() throws InterruptedException {
        lock.lock();
        try{
            if(size==0){
                System.out.println("阻塞队列空了,先等一会");
                notEmpty.await();
            }
            --size;
            String item=items.remove(0);
            notFull.signal();
            return item;
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionBlockedQueueExample cbqe=new ConditionBlockedQueueExample(10);
        //生产者线程
        Thread t1=new Thread(()->{
            Random random=new Random();
            for (int i = 0; i < 1000; i++) {
                String item="item-"+i;
                try {
                    cbqe.put(item); //如果队列满了,put会阻塞
                    System.out.println("生产一个元素:"+item);
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        Thread.sleep(100);
        Thread t2=new Thread(()->{
            Random random=new Random();
            for (;;) {
                try {
                    String item=cbqe.take();
                    System.out.println("消费者线程消费一个元素:"+item);
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t2.start();
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/790655.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-05
下一篇 2022-05-05

发表评论

登录后才能评论

评论列表(0条)

保存