Java并发知识之ReentrantLock

Java并发知识之ReentrantLock,第1张

Java并发知识之ReentrantLock

​并发编程是Java中非常关键的基本功,在Java中并发编程包括的知识点如下图,非常之多,锁是保证并发的一种手段之一,本文将围绕Aqs,ReentrantLock,ReentrantReadWriteLock的实现来分析锁设计的思想。

Jdk为什么需要设计Lock?

使用了Sychronized之后,发现Sychronized提供的锁功能存在一些短板,比如不能取消,不可以等待,不可以中断,而Lock就弥补了Sychronized的这些缺陷,下面是区别,所以Lock有他存在的必要性。

 Lock与Sychronized区别、特性

功能方面

1、Synchronized Jvm关键字实现 互斥锁实现。monitorenter和monitorexit

2、Synchronized 使用范围包括 *** 作方法,对象,同步代码块

3、Synchornized 可重入 不可中断 不可等待取消 非公平,而Lock是可重入,可中断,可等待取消,支持公平/非公平(默认)

4、Synchronized在线程发生异常时会自动释放锁,因此不会发生异常死锁。Lock异常时不会自动释放锁,所以需要在finally中实现释放锁。

5、Lock是可以中断锁,Synchronized是非中断锁,必须等待线程执行完成释放锁。

6、Lock可以使用读锁提高多线程读效率,如ReentrantLock。

7、在Lock中可以使用多组Condition实现多组线程多条件通信,而Sychronized使用wait/notify只能针对一组线程通信。

在实现方面

1、都使用了cas+volatile+自旋。

2、排队抢锁的线程,sychronized使用了两个队列,而reentrantlock只使用了一个队列。

3、sychronized有自适应自旋,而Lock没有,Lock只有自旋和park.

在分析ReentrantLock之前,有必要对aqs进行了解,因为ReentrantLock是基于aqs的框架实现。

aqs(AbstractQueuedSynchronizer)基础组件提供的能力

1、内部维护了Node类型的双向链表,用来存储等待线程的,以及等待线程的状态,提供线程排队基础。

`Node 有 5 中状态,分别是:`

 CANCELLED(1): 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点,其结点的 waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化

SIGNAL(-1): 只要前置节点释放锁,就会通知标识为SIGNAL状态的后续节点的线程抢占锁

 CONDITION(-2):和 Condition 有关系

PROPAGATE(-3):共享模式下,PROPAGATE状态的线程处于可运行状态

0:初始状态

2、一个volatile int state字段,用来表示锁的状态和锁的重入次数 0无锁,>0加锁,提供了get/set/compareAndSwapInt方法来 *** 作state状态。

3、等待线程入队。

4、唤醒后继节点。

5、加锁算法定义。

6、释放独占锁,共享锁算法定义,提供子类重写方法,实现不同的同步组件

7、提供了一些监控锁的方法,等待线程队列长度,排队中的线程,

8、condition辅助类

aqs是对锁的通用功能进行抽象,子类通过继承aqs实现同步的功能。其中ReentrantLock就是基于aqs实现的可重入独占锁。

ReentrantLock类结构图如下:

支持公平锁和非公平锁。默认是非公平锁

非公平锁设计思想

1、一请求就尝试获取

2、获取锁失败,继续尝试一定次数来获取锁

3、获取锁成功,通过cas实现state字段值原子自增,exclusiveOwnerThread设置成当前线程。

4、如果已经获取到了锁,state自增1,实现可重入特性,当前线程对应的节点设置成头节点。

5、如果尝试获取锁失败,将线程加入队列,并阻塞当前线程

如下是加锁成功的代码。

非公平锁获取锁分四个阶段,详细流程如下

第一阶段,快速占用锁阶段

这个阶段就是一调用lock方法,就直接通过cas设置一次state字段加锁,不管有前面没有线程获取锁,这就是体现了`非公平`的地方。

抢锁入口:

static final class NonfairSync extends Sync {​        private static final long serialVersionUID = 7316153563782823691L;                final void lock() {        //通过cas快速加锁            if (compareAndSetState(0, 1))                setExclusiveOwnerThread(Thread.currentThread());            else            //进入正常加锁流程                acquire(1);        }        protected final boolean tryAcquire(int acquires) {            return nonfairTryAcquire(acquires);        }    }

第二阶段,正常的加锁阶段

* 这个阶段和第一阶段类似,也是使用cas尝试加一次锁,多了重入锁的判断,如果当前锁已经被当前线程占有了,state自增,体现了`重入锁`特性。

final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();​            int c = getState();            if (c == 0) {            //尝试快速加一次锁                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }​            //重入锁判断和获取            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }

 第三阶段,当前线程加入队列,排队

这个阶段,是线程入队列,之前尝试两次都没抢到锁,需要排队抢锁

//线程加入等待队列private Node addWaiter(Node mode) {        Node node = new Node(Thread.currentThread(), mode);        // Try the fast path of enq; backup to full enq on failure        Node pred = tail;        //尝试快速加入队列尾部        if (pred != null) {            node.prev = pred;            if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }        }        //正常入队列        enq(node);        return node;    }        private Node enq(final Node node) {        for (;;) {            Node t = tail;            if (t == null) {             // Must initialize 初始化队头(空节点)                if (compareAndSetHead(new Node()))                    tail = head;            } else {            //加入队尾 首先新节点 前置节点执行老尾节点                node.prev = t;                //新的尾节点 设置成 新节点                if (compareAndSetTail(t, node)) {                //老尾节点 指向 新节点                    t.next = node;                    return t;                }            }        }    }

第四阶段,自旋抢站锁阶段

自旋,根据上一个节点状态来判断是否可以抢占锁,还是park阻塞

   final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            //自旋            for (;;) {                final Node p = node.predecessor();                //如果前驱节点是头节点,则具备抢锁资格                if (p == head && tryAcquire(arg)) {                    //抢锁成功了,自己作为头节点                    setHead(node);                    //帮助老的头节点回收                    p.next = null; // help GC                    failed = false;                    return interrupted;                }                //是否需要park阻塞,前一个节点状态是SIGNAL就要park,                 if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt()) //park                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }

如果达到了park条件,则可能进入park,

private final boolean parkAndCheckInterrupt() {//调用unsafe类的park方法 阻塞在这里,等待获取锁的线程释放锁并唤醒 我 然后继续执行        LockSupport.park(this);        return Thread.interrupted();    }

公平/非公平锁释放锁流程

非公平锁的释放锁设计思想

    unlock方法调用一次将state减少1

    如果state变成了0,则exclusiveOwnerThread设置成null

    释放锁成功后,需要将等待获取锁的线程唤醒

protected final boolean tryRelease(int releases) {            int c = getState() - releases;            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }

如果头节点状态不是0,表示需要唤醒头节点的下一个节点

public final boolean release(int arg) {        if (tryRelease(arg)) {            Node h = head;            //唤醒下一个节点            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;    }        private void unparkSuccessor(Node node) {                int ws = node.waitStatus;        if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);        Node s = node.next;        //下一个节点是空的,`从尾节点往前找`,最接近head的状态小于0的线程节点        if (s == null || s.waitStatus > 0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus <= 0)                    s = t;        }        //指向unpark        if (s != null)            LockSupport.unpark(s.thread);    }    

上面为啥head.next是空的,还有从尾巴节点遍历找需要唤醒的节点由于入队 *** 作,先构建尾节点,再将老尾节点指向新节点,next链比prev链延迟构建

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //和尾巴节点 node.prev = t; if (compareAndSetTail(t, node)) { //最后一步才和尾节点关联 t.next = node; return t; } } }}

以上分析的是非公平锁,公平锁与非公平锁释放锁流程一致,

公平锁和非公平锁加锁流程有什么不同?

final void lock() {            acquire(1);        }

少了非公平锁的快速抢占锁阶段,在正常获取锁之前,加入hasQueuedPredecessors判断,是否有前驱节点,其它步骤一样,

经过排队抢锁。

protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (!hasQueuedPredecessors() &&                    compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }    }

//查询是否有任何线程在等待获取比当前线程更长的时间

```java

public final boolean hasQueuedPredecessors() {

        

        Node t = tail; 

        Node h = head;

        Node s;

        return h != t &&

            ((s = h.next) == null || s.thread != Thread.currentThread());

    }

```

Condition的设计思想

类似于Object的wait方法,notify方法,实现线程之间的通信。

在生产者消费者模型的例子里:

生产者线程发现消息队列满了,则调用await方法阻塞,

生产者线程如果生产消息成功,则调用signal方法通知消费者线程进行消费

消费者线程发现消息队列空了,则调用await方法阻塞,

消费者线程消费消息成功,则调用signal方法通知生产者进行生产

ConditionObject 设计了两个节点 头节点 尾节点 维护了一个单向链表,维护等待条件的线程链路。

    public class ConditionObject implements Condition {                private transient Node firstWaiter;                private transient Node lastWaiter;  }      

Condition的await方法设计思想

当前线程需要通过LockSupport.park方法阻塞,释放cpu资源,并且通过调用release方法释放锁,让其他线程可以获取到锁,并且把当前线程存起来,让其他线程唤醒,在阻塞的方法后面,如果被唤醒,需要重新获取锁资源

        public final void await() throws InterruptedException {            if (Thread.interrupted())                throw new InterruptedException();            //当前线程加入等待队列            Node node = addConditionWaiter();            //释放锁资源,让其他线程有机会获得锁            int savedState = fullyRelease(node);            int interruptMode = 0;            //阻塞            while (!isonSyncQueue(node)) {                LockSupport.park(this);                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;            }            //阻塞被唤醒后再次获取锁            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                interruptMode = REINTERRUPT;            if (node.nextWaiter != null) // clean up if cancelled                unlinkCancelledWaiters();            if (interruptMode != 0)                reportInterruptAfterWait(interruptMode);        }

Conditon的signal方法设计思想

需要将第一个等待需要唤醒的线程进行唤醒,并且加入同步队列

​        public final void signal() {            if (!isHeldExclusively())                throw new IllegalMonitorStateException();//获得第一个等待被唤醒的线程节点            Node first = firstWaiter;            if (first != null)//执行唤醒 *** 作                doSignal(first);        }          private void doSignal(Node first) {            do {                if ( (firstWaiter = first.nextWaiter) == null)                    lastWaiter = null;                first.nextWaiter = null;            } while (!transferForSignal(first) &&                     (first = firstWaiter) != null);        }                final boolean transferForSignal(Node node) {        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))            return false;        //将需要被唤醒的节点加入同步队列        Node p = enq(node);        int ws = p.waitStatus;        //实现唤醒 *** 作        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))            LockSupport.unpark(node.thread);        return true;    }        

针对中断获取锁,直接抛出中断一次

public final void acquireInterruptibly(int arg)            throws InterruptedException {            //中断状态抛异常        if (Thread.interrupted())            throw new InterruptedException();        if (!tryAcquire(arg))            doAcquireInterruptibly(arg);    }     private void doAcquireInterruptibly(int arg)        throws InterruptedException {        final Node node = addWaiter(Node.EXCLUSIVE);        boolean failed = true;        try {            for (;;) {                final Node p = node.predecessor();                if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return;                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    //中断状态抛异常                    throw new InterruptedException();            }        } finally {            if (failed)                cancelAcquire(node);        }    }     取消获取锁的线程private void cancelAcquire(Node node) {        // Ignore if node doesn't exist        if (node == null)            return;        node.thread = null;        // Skip cancelled predecessors        Node pred = node.prev;        while (pred.waitStatus > 0)            node.prev = pred = pred.prev;        Node predNext = pred.next;        // 设置为取消状态        node.waitStatus = Node.CANCELLED;        // 节点在尾处 直接去除 置null        if (node == tail && compareAndSetTail(node, pred)) {            compareAndSetNext(pred, predNext, null);        } else {            // 节点去除            int ws;            if (pred != head &&                ((ws = pred.waitStatus) == Node.SIGNAL ||                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&                pred.thread != null) {                Node next = node.next;                if (next != null && next.waitStatus <= 0)                    compareAndSetNext(pred, predNext, next);            } else {            //唤醒下一个节点                unparkSuccessor(node);            }            node.next = node; // help GC        }    }      

最后再来一波总结:

Lock和Sychronized在非公平锁的加锁解锁的实现上差异?

1、快速加锁阶段,lock通过cas,设置state变量,再设置独占线程

而sychronized是通过cas设置对象头上线程id.(偏向锁阶段)

2、lock通过 cas一次设置state变量 而sychronized设置锁记录指针到对象头(轻量级别锁)

3、lock 先排队,通过自旋 + cas + park 设置state字段,再设置线程对象。而sychronized通过 自旋 + cas + 排队 + park 设置线程id(重量级锁),lock中只有一个队列用户抢锁排队,而sychronized设计了两个队列,2q算法,减少对一个队列的并发处理。

4、wait/notify和signal/await的设计思想类型,都使用了一个队列存储等待的线程。结合加解锁api来实现线程通信。

ReentrantReadWriteLock可重入读写锁的实现解析

基于ReentrantLock主要改造点

1、读写锁的标志,通过32位整形变量,高16位存读锁标志,低16位存写锁标志。

2、ReentrantReadWriteLock有读和写两把锁,写锁和ReentrantLock实现方式类似,因为他们都是独占锁,而读锁,ReentrantReadWriteLock使用了ThreadLocal对不同线程获取读锁的次数进行隔离,加锁 *** 作,是否锁 *** 作互不影响。

写锁获取源码,和ReentrantLock流程类似

protected final boolean tryAcquire(int acquires) {​            Thread current = Thread.currentThread();            int c = getState();            int w = exclusiveCount(c);            //锁状态不为0,已经加了读锁或写锁            if (c != 0) {                //写锁为0或者线程不是当前线程,其它线程获取了读锁,则本次获取写锁失败                if (w == 0 || current != getExclusiveOwnerThread())                    return false;                //锁次数是否会超过上限                    if (w + exclusiveCount(acquires) > MAX_COUNT)                    throw new Error("Maximum lock count exceeded");                    //已经获取了写锁,重入次数自增                // Reentrant acquire                setState(c + acquires);                return true;            }            //写锁是否要阻塞,如果是非公平不需要阻塞,公平要阻塞            if (writerShouldBlock() ||                !compareAndSetState(c, c + acquires))                return false;            setExclusiveOwnerThread(current);            return true;        }​

//与运算 

static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//     0000 0000 0000 0001 左移16位,再减1//0001 0000 0000 0000 0000//0000 1111 1111 1111 1111 ==写锁的标记//计算写锁的数量static int exclusiveCount(int c) {    return c & EXCLUSIVE_MASK; }

写锁释放

和Reentrantlock的释放逻辑一致

修改状态,唤醒下一个线程

读锁获取

//最后一个线程 获取读锁 的计数器private transient HoldCounter cachedHoldCounter;​protected final int tryAcquireShared(int unused) {                       Thread current = Thread.currentThread();            int c = getState();            //写锁被其它线程占有了,失败            if (exclusiveCount(c) != 0 &&                getExclusiveOwnerThread() != current)                return -1;            int r = sharedCount(c);            //是否要阻塞            if (!readerShouldBlock() &&                r < MAX_COUNT &&                compareAndSetState(c, c + SHARED_UNIT)) {                //第一个读锁线程,专有字段进行记录                if (r == 0) {                    firstReader = current;                    firstReaderHoldCount = 1;                } else if (firstReader == current) {                //第一个获取读锁线程,重入                    firstReaderHoldCount++;                } else {                //不是第一个获取读锁线程,不是当前线程                    HoldCounter rh = cachedHoldCounter;                    if (rh == null || rh.tid != getThreadId(current))                        cachedHoldCounter = rh = readHolds.get();                    else if (rh.count == 0)                        readHolds.set(rh);                    rh.count++;                }                return 1;            }            return fullTryAcquireShared(current);        }

### 读锁释放

protected final boolean tryReleaseShared(int unused) {            Thread current = Thread.currentThread();            //对获取第1个的线程进行判断,释放锁,缓存第一个获取锁的线程和计数,加快释放锁的速度            if (firstReader == current) {                // assert firstReaderHoldCount > 0;                if (firstReaderHoldCount == 1)                    firstReader = null;                else                    firstReaderHoldCount--;            } else {            //最后一个获取锁的计数器,缓存,也是为了最后一个获取锁的线程进行快速释放锁                HoldCounter rh = cachedHoldCounter;                if (rh == null || rh.tid != getThreadId(current))                    rh = readHolds.get();                int count = rh.count;                if (count <= 1) {                    readHolds.remove();                    if (count <= 0)                        throw unmatchedUnlockException();                }                --rh.count;            }            //释放锁,最后需要修改state状态            for (;;) {                int c = getState();                int nextc = c - SHARED_UNIT;                if (compareAndSetState(c, nextc))                    // Releasing the read lock has no effect on readers,                    // but it may allow waiting writers to proceed if                    // both read and write locks are now free.                    return nextc == 0;            }        }

本文分析了ReentrantLock的设计思想,包括非公平和公平获取锁,释放锁,还有Condition的实现,最后和ReentrantLock的设计进行了分析对比,希望可以加深对锁设计的理解。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5710381.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存