显示锁接口Loke和 AQS

显示锁接口Loke和 AQS,第1张

显示锁接口Loke和 AQS: 解决的问题

Java内置锁,使用时不需要对同步对象的监视器(Monitor)通过Java代码显示的进行抢占和释放,因为这些工作由JVM层面来完成。所以,Java的内置锁使用起来很方便。但是,Java内置锁的功能相对单一,不具备一些比较高级的锁功能:
限时抢锁:在抢锁时设置超时时长,超时则放弃。
中断抢锁:在抢锁时,外部线程给抢锁线程发出一个中断信号,就能唤起等待锁的线程,并且终止抢占过程。
多个等待队列:为锁维持多个等待队列,以便提高锁的效率。比如在生产者消费者模式实现中,生产者和消费者共用一把锁,该锁上维持两个等待队列,一个生产者队列,一个消费者队列。

Java内置锁在竞争稍微激烈的情况下,Java内置锁会膨胀为重量级锁。重量级锁, 它的线程间的调度和状态变更由 *** 作系统负责;
它基于 *** 作系统的临界区机制(每个进程中访问临界资源的那段程序称为临界区(临界资源是一次仅允许一个进程使用的共享资源)。每次只准许一个进程进入临界区,进入后不允许其他进程进入(加锁)。)阻塞锁机制(当有一个线程获取锁后,其他所有等待获取该锁的线程会进入阻塞状态);

上下文切换(CPU通过分配时间片来执行任务,当一个任务的时间片用完,就会切换到另一个任务。在切换之前会保存上一个任务的状态,当下次再切换到该任务,就会加载这个状态。任务从保存到再加载的过程);上下文切换只能发生在内核态,所以还会触发用户态与内核态切换,这个切换的过程会带来很大的性能开销,非常影响性能。
Java显示锁接口Loke就是为了解决这些Java对象的功能问题、性能问题。

Lock接口和synchronized的比较 锁的类型比较

synchronized是非公平锁,ReentrantLock 可以是公平锁也可以是非公平锁。
ReentrantLock支持非阻塞的方式获取锁,而synchronized不行。synchronized不可中断,ReentrantLock可中断。
synchronized是独占锁,Lock可以是独占锁(ReentrantLock,ReentrantReadWriteLock的写锁)也可以是共享(ReentrantReadWriteLock的读锁)。
都是可重入锁。

锁的获取与释放比较

Lock必须手动获取和释放锁,而synchronized不需要。
synchronized在发生异常的时候,会自动释放线程占有的锁,而ReentrantLock在发生异常时,如果没有通过unlock去释放锁,很有可能造成死锁,因此需要在finally块中释放锁。
Lock存在获得多个锁的方法(ReentrantReadWriteLock中的读锁获取 reentrantReadWriteLock.readLock().lock();)。

线程调度比较

synchronized: 使用Object对象本身的wait 、notify、notifyAll调度机制
Lock: 可以使用Condition中的 signal(); await()等, 进行线程之间的调度。

Lock接口中的核心方法:

lock():调用该方法,当前线程获取锁,获取到锁后返回;
lockInterruptibly():可中断地获取锁,即在锁的获取中可以中断当前线程;
tryLock():尝试非阻塞获取锁并立刻返回,如果锁没有被其他线程获取到,则返回true并获取到锁,否则返回false;
tryLock(long time, TimeUnit unit):设置过期时间尝试非阻塞获取锁,可以在获取锁的时间内被中断,过期时间内获取到锁返回true,否则返回false;
unlock():释放锁;
newCondition():当前线程获取到锁的前提下,可以将一个condition实例绑定到lock实例上,然后可以通过condition中的方法,对获得该锁线程进行线程调度。

Lock接口的实现类: ReentrantLock:

独占锁,可重入锁,公平/非公平锁,可中断锁。

简单使用案例:
public class ReentrantLockTest1 implements Runnable{
    Lock lock = new ReentrantLock();
    @Override
    public void run() {
        get();
    }
    private void get() {
        try {
            lock.lock();
            System.out.println("get 方法");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        ReentrantLockTest1 lockTest1 = new ReentrantLockTest1();
        new Thread(lockTest1).start();
    }
}
源码分析( AQS): ReentrantLock核心方法说明:
//构造器
//默认构造器创建非公平锁
public ReentrantLock();
//传入一个Boolean类型的参数,true创建一个公平锁,false非公平
public ReentrantLock(boolean fair);

	/**
     * 阻塞等待获取锁;
     */
    public void lock() {
        sync.lock();
    }

    /**
     * 当前线程未被中断,则获取锁
     * 允许在等待时由其它线程调用等待线程的Thread.interrupt方法来中断等待线程的等待而直接返回.
     */
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     *尝试申请一个锁,在成功获得锁后返回true,否则,立即返回false
     */
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    /**
     * 在一段时间内尝试申请一个锁,在成功获得锁后返回true,否则,立即返回false
     */
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    /**
     * 释放锁
     */
    public void unlock() {
        sync.release(1);
    }

    /**
     * 条件实例
     */
    public Condition newCondition() {
        return sync.newCondition();
    }

    /**
     * 获取当前线程持有此锁的次数
     */
    public int getHoldCount() {
        return sync.getHoldCount();
    }

    /**
     * 是否被当前线程持有
     */
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    /**
     * 查询此锁是否由任意线程持有
     */
    public boolean isLocked() {
        return sync.isLocked();
    }

    /**
     *如果是“公平锁”返回true,否则返回false
     */
    public final boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
     * 获取目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null
     */
    protected Thread getOwner() {
        return sync.getOwner();
    }

    /**
     * 查询是否有线程正在等待
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     *查询给定线程是否正在等待获取此锁。
     */
    public final boolean hasQueuedThread(Thread thread) {
        return sync.isQueued(thread);
    }

    /**
     * 获取正等待获取此锁的线程数
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }
   /**
     * 正等待获取此锁的线程集合
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }
 /**
     *是否存在正在等待并符合相关给定条件的线程
     */
    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    /**
     * 正在等待并符合相关给定条件的线程数量
     */
    public int getWaitQueueLength(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    /**
     * 正在等待并符合相关给定条件的线程集合
     */
    protected Collection<Thread> getWaitingThreads(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }
源码分析: 构造方法:

public ReentrantLock(boolean fair);构造方法返回的是,实现了AQS(AbstractQueuedSynchronizer)模板类部分方法的抽象类Sync的实现类。

sync = fair ? new FairSync() : new NonfairSync();
//static final class FairSync extends Sync
//abstract static class Sync extends AbstractQueuedSynchronizer

FairSync 和 NonfairSync都实现了 Sync抽象类lock()抽象方法,重写了AQS中的tryAcquire;两个对象实现的两个方法大致相同 。NonfairSync 使用lock()方法获取锁之前,不管同步队列的先判断当前锁是否有人占用,如果没有就直接获取。FairSync 在使用tryAcquire()方法判断是否有可用资源时,保证公平性先判段阻塞队列的情况。

lock()方法,底层是调用的是AQS中的模板方法acquire(int arg);
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
   selfInterrupt();

tryAcquire(arg) :调用子类中重写的tryAcquire判断能否获得资源
FairSync类公平方式的实现类中重写的tryAcquire方法

protected final boolean tryAcquire(int acquires) {
         final Thread current = Thread.currentThread();	
         int c = getState();		//得到当前锁的状态
         if (c == 0) {			//锁的状态为0时,表示现在锁没有被占用
           
/*hasQueuedPredecessors()判断当前的同步队列中是否存在,比当前线程等待锁更长时间的线程存在则返回false
hasQueuedPredecessors()方法中的代码:{
 头节点不等于尾节点,头节点的下一个节点为null或者头节点的下一个节点的线程不等于当前线程则返回false
 就是判断当前同步队列是不是空或者当前节点是不是要获得锁的资源
  return h != t &&  
    ((s = h.next) == null || s.thread != Thread.currentThread());}
*/
       if (!hasQueuedPredecessors() &&	
        	compareAndSetState(0, acquires)) { 	//如果当前线程是等待时长最长获得资源的线程,CAS修改state同步状态的值		
               setExclusiveOwnerThread(current); //给当前现在设置独占访问权(拿到锁了)。
               return true;
            }
       }
      else if (current == getExclusiveOwnerThread()) {	//如果当前线程等于持有锁的线程(锁的重入)
         int nextc = c + acquires;		//计数加一
         if (nextc < 0)
             	throw new Error("Maximum lock count exceeded");
             setState(nextc);			//修改同步状态的值
             return true;
         }
      return false;     //到这里说明没有拿到锁,返回false
     }

如当前环境中有多个线程竞争同步资源失败,这些线程都会被封装为node,形成一个同步队列
当tryAcquire(arg)方法不能拿到锁后,addWaiter(Node.EXCLUSIVE)为当前线程创建并招募节点。
addWaiter()代码:

Node node = new Node(Thread.currentThread(), mode); //将当前线程封装为一个节点
Node pred = tail;  //tail同步队列的尾节点
if (pred != null) {  // 尾结点本身就是空,说明此时同步队列为空
   //将尾部节点更新为这个新节点	 双端队列
   node.prev = pred;
   if (compareAndSetTail(pred, node)) {  //CAS成功说明节点入队成功,设置尾结点成功
      pred.next = node;
      return node;
   }
}
//如果直接入队失败,就采用自旋方式 enq(node);加入结点直至入队成功,返回该结点.
enq(node);//将节点插入队列,必要时初始化。

/*
enq()代码:
for (;;) {  //自旋
  Node t = tail;  //尾节点
  if (t == null) {  //如队尾为空,即队列为空,队列进行初始化
  //创建虚拟队列头节点
     if (compareAndSetHead(new Node()))	  //将头节点设置为空节点
        tail = head;	 //当只有一个空节点时,头节点与尾节点相等,将头节点赋值给尾节点
  } else {   //如队列尾结点不为空
  	//将node节点入队,并将尾节点设置为node
     node.prev = t;
     if (compareAndSetTail(t, node)) {
       	t.next = node;
       	return t;
     }
}
*/
return node;

final boolean acquireQueued(final Node node, int arg);
当线程包装成node进入同步队列后,会在同步队列中等待同步资源的释放。
当线程争取到同步资源 的使用权后,线程中同步队列中出队。

final boolean acquireQueued(final Node node, int arg) {
//在这个方法中,node已经加入了同步队列,它在这里尝试在同步队列中自旋以获取同步资源.
        boolean failed = true;     //标记是否成功获取到资源
        try {
            boolean interrupted = false;   //标记线程是否被 中断
            for (;;) {		//自旋
                final Node p = node.predecessor();	//获得传入节点的前驱节点
                if (p == head && tryAcquire(arg)) {	//如果当前节点的前驱节点节点等于头节点,且可以获得资源
                    setHead(node);	//将当前节点设置为头节点
                    p.next = null;  //设置原头结点的后继节点为null,帮助jvm回收垃圾.
                    failed = false;   //标记已经成功获取同步资源
                    return interrupted;
                }

                //当前节点不是头节或者获取资源失败,
                //(shouldParkAfterFailedAcquire(p, node)线程获取资源失败后,判断是否阻塞线程.
                	//false 再次进入自旋查找前驱并获取锁的过程.
                	//true 执行parkAndCheckInterrupt() 阻塞当前线程,当线程被其他线程唤醒时,返回线程的中断状态,并清除中断标识
                		//为true ,并将interrupted中断标记设置为true,回到acquire()方法中,
                			//就会执行selfInterrupt()方法。 完成当前线程的中断信号发出.
                		//为false 再次进入自旋查找前驱并获取锁
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
                  
      //      -----shouldParkAfterFailedAcquire(Node pred, Node node) 线程获取资源失败后,判断是否阻塞线程.
			/*  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
			 int ws = pred.waitStatus; //得到前驱节点的同步状态
        	if (ws == Node.SIGNAL)		//等于阻塞状态
            return true;	//返回ture说明要阻塞当前线程,等待前驱节点释放资源,唤醒后继节点
            
            //如果前驱结点的线程取消等待资源(原因可能是超时或中断),则跳过所有被取消的前驱结点.
            //将最近一个状态小于0的结点设置为node结点的前驱结点.
        if (ws > 0) {		//前驱状态大于0,说明state是CANCELLED,线程是已取消的
   
        //将同步队列中已经标记已取消的节点全部剔除
            do {			
                node.prev = pred = pred.prev;	//从同步队列中去掉pred节点,(将后继节点的前驱节点改为这个节点的前驱节点)
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {  //同步状态不大于0,状态是0初始态或 propagate态(在共享模式同步状态可以进行传播),则通过CAS将pred的状态改为SIGNAL阻塞状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;  
		*/

	//	-----parkAndCheckInterrupt() 阻塞当前线程,并判断线程的中断状态
		/*	parkAndCheckInterrupt() 
	  private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);	//阻塞当前线程
        
       // 调用 unpark()或者interrupt()唤醒线程后
        return Thread.interrupted(); //测试当前线程是否已被中断。 并清除中断标识.
    	}
		*/
        	 }
        } finally {
        	//如failed为true,说明线程不能成功获取资源。
            //如果node的前驱结点 ( final  Node p = node.predecessor();  )为空,代码抛出异常
            //这说明node为头结点,node无前驱结点。
            if (failed)
                cancelAcquire(node); //取消对资源的获取.

      //  ------------cancelAcquire()取消对资源的获取.
            /*
   	 private void cancelAcquire(Node node) {
        if (node == null)	//节点为空之间返回
            return;
        node.thread = null;  //节点加入队列时的线程
        Node pred = node.prev;
        while (pred.waitStatus > 0)  //将同步队列中已经标记已取消的节点全部剔除
            node.prev = pred = pred.prev;
        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED;  //将节点同步标记改为已取消的
      
        //如果节点是尾节点,就将尾节点设置为它的前驱节点
        if (node == tail && compareAndSetTail(node, pred)) {
        	//回收当前节点,将它前驱节点的下一节点的指向设为null
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            //头节点不是这个节点的前驱;
            //节点前驱的状态是阻塞的或者,状态是初始化或可传播的且状态改为阻塞状态成功;
            //节点加入队列时的线程不为空
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;		
                //节点的后继节点不为空,且状态是初始化或可传播的;
                if (next != null && next.waitStatus <= 0)
                	//移除这个节点
                    compareAndSetNext(pred, predNext, next);
            } else {
            //如果节点的后继节点存在,则唤醒它。LockSupport.unpark(s.thread);
                unparkSuccessor(node);       //如果节点的后继节点存在,则唤醒它。
            }
            node.next = node; // help GC
        }
    }
            */
        }
    }
lockInterruptibly(); 方法底层调用的是AQS中的acquireInterruptibly(1)方法;
if (Thread.interrupted()) //判断线程中断状态
      throw new InterruptedException();
if (!tryAcquire(arg))
	//以独占可中断模式获取。大致与lock()方法调用相同。
      doAcquireInterruptibly(arg);
unlock();底层调用的是AQS的release(1);模板方法
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head; //获得头节点
            if (h != null && h.waitStatus != 0) //头节点不为null且状态不为0即初始状态
                unparkSuccessor(h);	//唤醒同步队列的后继节点
                /*
     private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)  //结点状态小于0,则肯定不是cancled态
            compareAndSetWaitStatus(node, ws, 0);   //cas将结点状态设为0
        Node s = node.next; //取Node的后继结点.
        if (s == null || s.waitStatus > 0) {	//后继节点为null或者状态为cancled,取消,说明后继结点对应的线程取消对资源的等待
            s = null;	//将后继节点赋为null
          
        	 //从同步队列队尾结点开始向前遍历,找到同步队列中node结点后第一个待唤醒的结点,如遍历到的结点t非空且不等于当前node,则校验此结点t的状态.
            //简单来说就是  寻找到下一个非取消态的结点 s
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
        	 //唤醒结点t对应的线程.
            LockSupport.unpark(s.thread);
    }
                */
            return true;
        }
        return false;
    }

子类重写的tryRelease()方法

 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;  
            //如果当前线程不是已经获得资源的线程则报错
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            //标识释放状态
            boolean free = false;
            //如果同步状态减少releases后是初始状态则说明资源释放成功
            if (c == 0) {
                free = true;
                //将以获得资源的线程赋为null
                setExclusiveOwnerThread(null);
            }
            //设置同步状态
            setState(c);
            return free;
        }
原理分析:

ReentrantLock方法实际调用的大多数都是AQS中的模板方法。AQS通过模板方法模式固定了程序的执行顺序,子类通过重写AQS中的
isHeldExclusively() //该线程是否正在独占资源。只有用到condition才需要去实现它;
tryAcquire(int) //独占方式。尝试获取资源,功返回true,失败则返回false;
tryRelease(int) /独占方式。尝试释放资源,功贝则返回true,失败则返回falseo;
tryAcquireShared(int) //共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源;
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败返回false;
等方法来确定资源的获取的逻辑等。
AQS实际是通过维系一个CLH队列(FIFO的双向队列)来实现线程的等待,线程唤醒和资源的分配机制。
每有一个线程请求共享资源时,判段共享资源是否空闲,如果空闲就将当前请求线程设置为工作线程,并将共享资源的状态(State)设置为锁定(通过CAS设置state值)(公平/非公平)。
如果共享资源被占用,就将这个线程包装成一个节点加入到CLH队列中,在acquireQueued方法中不断去尝试得到资源,并在方法中判断当前线程是否中断和将已经取消的线程移出队列。
当工作线程运行结束后,工作线程会释放共享资源并将同步状态的赋为0,并唤醒同步队列中的第一个待唤醒的线程。

ReentrantReadWriteLock:

读写锁,一般在读多写少情况下使用。有两把锁一把读锁,一把写锁,写锁是独占式的,读锁是共享式的,读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读状态,也就是获取到读锁的次数,低16位表示获取到写线程的可重入次数。在有线程持有读锁的情况下没有线程可以获得写锁,在有线程持有写锁的情况下只有持有写锁的那个线程可以获取读锁。

原理分析参考:https://cloud.tencent.com/developer/article/1469555

简单案例
public class ReentrantReadWriteLockeTest {
    private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    //让线程同步去争抢锁
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    private static int i = 100;
    public static void main(String[] args) {
    //利用线程池和CyclicBarrier 保证线程是去同步争抢锁的
        executor.execute(() -> {
            write(Thread.currentThread());
        });
        executor.execute(() -> {
            read(Thread.currentThread());
        });
        executor.execute(() -> {
            read(Thread.currentThread());
        });
        executor.shutdown();
    }

    private static void write(Thread thread) {
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        //获取写锁
        reentrantReadWriteLock.writeLock().lock();
        //获取读锁,说明获得写锁后同线程还可以获得读锁
        reentrantReadWriteLock.readLock().lock();
        try {
            System.out.println("写线程开始运行" + thread.getName() + " : i=" + i);
            Thread.sleep(100);
            System.out.println(thread.getName() + "运行完");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //释放写锁
            reentrantReadWriteLock.writeLock().unlock();
        }
    }
    private static void read(Thread thread) {
        try {
            //线程阻塞, 让线程同步争抢锁
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        //获得读锁
        reentrantReadWriteLock.readLock().lock();
       //获取写锁, 不注释,线程获得读锁后阻塞
       //reentrantReadWriteLock.writeLock().lock();
        try {
            System.out.println("读线程开始运行" + thread.getName() + " : i=" + i);
            Thread.sleep(100);
            System.out.println(thread.getName() + "运行完");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //释放锁
            reentrantReadWriteLock.readLock().unlock();
        }
    }

}

结果,说明获得写锁和获得读锁的线程不能同时运行,获得读锁的线程可以同时运行

写线程开始运行pool-1-thread-1 : i=100
pool-1-thread-1运行完 : i=1
读线程开始运行pool-1-thread-2 : i=1
读线程开始运行pool-1-thread-3 : i=1
pool-1-thread-2运行完
pool-1-thread-3运行完
Condition接口

用Lock和Condition可以实现类似于,synchronized和wait()、wait(long timeout)、notify()以及notifyAll()方法配合实现的等待\通知模式。

以前的方式只能有一个等待队列,在实际应用时可能需要多个,比如读和写。为了这个灵活性,lock将同步互斥控制和等待队列分离开来,互斥保证在某个时刻只有一个线程访问临界区(lock自己完成),等待队列负责保存被阻塞的线程(condition完成)。
学习文章:https://www.jianshu.com/p/c7af7f3fa135

简单案例
/**
 * 利用lock和Condition来替换synchronized,实现生产者,消费者模式
 *
 * @author shkstart
 * @create 2022-01-11 17:56
 */
public class producerConsumer {
    public static void main(String[] args) {
        AppleBox appleBox = new AppleBox();
        Producer producer = new Producer(appleBox);
        Consumer consumer = new Consumer(appleBox);
        Consumer consumer2 = new Consumer(appleBox);
        Thread p = new Thread(producer, "生产者");
        Thread c1 = new Thread(consumer, "消费者1");
        Thread c2 = new Thread(consumer2, "消费者2");
        p.start();
        c1.start();
        c2.start();
    }
}
class Apple {
    int id;
    public Apple(int id) {
        this.id = id;
    }
}
class AppleBox {
    AtomicInteger index = new AtomicInteger(0);
    AtomicReferenceArray<Apple> apples = new AtomicReferenceArray<>(new Apple[5]);
//    AtomicReference apples = new AtomicReference<>();
    //创建锁对象 ,替换synchronized
    private final ReentrantLock lock = new ReentrantLock();
    //生产条件
    private final Condition produceCondition = lock.newCondition();
    //消费条件
    private final Condition consumeCondition = lock.newCondition();
    // public synchronized boolean deposite(Apple apple) {
    public boolean deposite(Apple apple) {
        lock.lock();
        try {
            while (index.get() == apples.length()) {
                //生产线程进入等待
                produceCondition.await();
                return false;
            }
            apples.set( index.getAndIncrement(),apple);
            //this.apples[index.incrementAndGet()] = apple;
            System.out.println(Thread.currentThread().getName() + "生产了: " + index.get());
            //唤醒消费线程
            consumeCondition.signal();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return false;
    }
    public Apple withdraw() throws Exception {
        lock.lock();
        try {
            while (index.get() == 0) {
                System.out.println(Thread.currentThread().getName() + "消费缺货" );
                consumeCondition.await();
            }
            Apple a = apples.get(index.decrementAndGet());
            System.out.println(Thread.currentThread().getName() + "消费了: " + a.id);
            return a;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return null;
    }
}
class Producer implements Runnable {
    AppleBox appleBox;
    public Producer(AppleBox appleBox) {
        this.appleBox = appleBox;
    }
    @Override
    public void run() {
        while (true) {
            appleBox.deposite(new Apple(appleBox.index.get() + 1));
            try {
                Thread.sleep((int) (Math.random() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
class Consumer implements Runnable {
    AppleBox appleBox;
    public Consumer(AppleBox appleBox) {
        this.appleBox = appleBox;
    }
    @Override
    public void run() {
        while (true) {
            try {
                appleBox.withdraw();
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep((int) (Math.random() * 3000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
自定义实现 CountdownLatch的自定义实现(共享方式获资源)

重写tryAcquireShared(),tryReleaseShared()

/**
 * 自定义实现 CountdownLatch
 */
public class MyCountDownLatch {

    //实现AQS
    private static final class MySync extends AbstractQueuedSynchronizer {

        public MySync(int count) {
            //初始化设置AQS中的state状态值
            setState(count);
        }

        public int getCount(){
            //获得AQS中的state状态值
            return getState();
        }
        /**
         * 共享锁实现 tryAcquireShared(int) 和 tryReleaseShared(int)
         */
        @Override
        /**
         * 模板方法acquireShared 调用tryAcquireShared
         * 开始时AQS中的State被初始化了一个值N,此时所有线程不能获得锁
         * 返回-1表示当前线程加入到阻塞队列
         * 返回1 解锁释放
         *
         * 返回一个负数
         *  public final void acquireShared(int arg) {
         *         if (tryAcquireShared(arg) < 0)
         *             doAcquireShared(arg);
         *     }
         *  非公平的
         */
        protected int tryAcquireShared(int arg) {
            return (getState() == 0)? 1 : -1;
        }
        @Override
        /**
         * 考虑多线程的共享 *** 作
         * 以自旋+CAS *** 作防止资源争抢
         */
        protected boolean tryReleaseShared(int arg) {
            while (true){
                int c = getState();
                if (c == 0){ //说明锁已经释放
                    return false;
                }
                int nextc = c - 1;
                //调用CAS设置state值
                if (compareAndSetState(c,nextc)){
                    //为0说明释放锁成功
                    return nextc == 0;
                }
            }
        }
    }
    private final MySync sync;
    public MyCountDownLatch(int count) {
        this.sync = new MySync(count);
    }
    public void await(){
        //模板方法,组装流程,调用 子类中实现的tryAcquireShared
        sync.acquireShared(1);
    }
    public void countDown(){
        //模板方法,组装流程,调用 子类中实现的tryReleaseShared
        sync.releaseShared(1);
    }
    public int getCount(){
        return sync.getCount();
    }
}
自定义ReentrantLocak ( 独占锁,非可重入)

重写isHeldExclusively(),tryAcquire(),tryRelease()

public class MyReentrantLock implements Lock {
    private final Sync sync = new Sync();
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        /**
         * 判断锁是否被占用
         */
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        @Override
        /*
         * 独占锁的方法,一次只能一个线程持有锁,只有state=0表示没有线程有锁才能CAS成功
         */
        protected boolean tryAcquire(int arg) {
            //state状态值不是0就是1,所以是非重入锁  CAS *** 作
            if (compareAndSetState(0, 1)) {
                //设置当前线程为执行线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        /**
         * 释放锁
         */
        protected boolean tryRelease(int arg) {
            if (getState() == 0) { //没有线程执行有锁
                throw new UnsupportedOperationException();
            }
            //将执行线程置为空
            setExclusiveOwnerThread(null);
            //同步状态设置为01
            setState(0);
            return true;
        }
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    /**
     * 调用AQS提供的模板方法提供的模板方法
     */
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1); //响应中断式直接获得锁
    }
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/798548.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存