多线程学习(五)

多线程学习(五),第1张

线程学习(五)

一、AQS
全称是AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,特点是:用state属性表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
1、getState-获取state状态;
2、setState-设置state状态;
3、compareAndSetState-cas机制设置state状态;
4、独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源;
5、提供了基于FIFO的等待队列,类似于Monitor的EntryList;
6、条件变量来实现等待,唤醒机制,支持多个条件变量,类似于Monitor的WaitSet;

子类主要实现这样一些方法(默认抛出UnsupportedOperationException)
	tryAcquire
	tryRelease
	tryAcquireShared
	tryReleaseShared
	isHeldExclusively

基于AQS实现不可重入锁:

package com.concurrent.test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class TestAqs {

    public static void main(String[] args) {
        MyLock lock = new MyLock();

        new Thread(() -> {
            lock.lock();
            try{
                System.out.println("locking...");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("unlocking...");
                lock.unlock();
            }
        }, "t1").start();

        new Thread(() -> {
            lock.lock();
            try{
                System.out.println("locking...");
            }finally {
                System.out.println("unlocking...");
                lock.unlock();
            }
        }, "t2").start();
    }

}

// 自定义锁(不可重入锁)
class MyLock implements Lock {

    // 独占锁 同步器类
    class Mysync extends AbstractQueuedSynchronizer{

        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0, 1)){
                // 加上了锁, 并设置owner为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            // 注意二者顺序,volatile后变量增加写屏障,使它之后的值看到前面的最新值
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override // 是否持有独占锁
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        public Condition newCondition(){
            return new ConditionObject();
        }
    }

    private Mysync sync = new Mysync();

    @Override // 加锁(不成功会进入等待队列)
    public void lock() {
        sync.acquire(1);
    }

    @Override // 加锁,可打断
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override // 尝试加锁(一次)
    public boolean tryLock() {

        return sync.tryAcquire(1);
    }

    @Override // 尝试加锁,带超时
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override // 解锁
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

二、ReentrantLock原理


加锁解锁流程
默认非公平实现

public ReentrantLock() {
        sync = new NonfairSync();
}

NonfairSync继承自AQS
非公平锁加锁解锁

final void lock() {
            if (compareAndSetState(0, 1))
            	// 直接竞争锁成功(一次加锁成功)
                setExclusiveOwnerThread(Thread.currentThread());
            else
            	// 加锁失败,走acquire方法
                acquire(1);
        }
        
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	// 获取当前节点的前驱节点
                final Node p = node.predecessor();
                // 若前驱节点为head节点则处于第二位,则再次尝试是否能够获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

--------------------------------------------------------
// 释放锁
// ReentrantLock.unlock
public void unlock() {
        sync.release(1);
    }

// aqs
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

// ReentrantLock
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

没有竞争时

第一个竞争出现时

Thread-1 执行了
1、cas尝试将state由0改为1,结果失败;
2、进入tryAcquire逻辑,这时state已经是1,结果仍然失败;
3、接下来进入addWaiter逻辑,构造Node队列:

  • 图中黄色三角表示该Node的waitStatus状态,其中0为默认正常状态;
  • Node的创建是懒惰的;
  • 其中第一个Node称为Dummy(哑元)或哨兵,用来占位,并不关联线程;
    当前线程进入acquireQueued逻辑
    1、acquireQueued会在一个死循环中不断尝试获取锁,失败后进入park阻塞;
    2、如果自己是紧邻着head(排第二位),那么再次tryAcquire尝试获取锁,当然这时state仍为1,失败;
    3、进入shouldParkAfterFailedAcquire逻辑,将前驱node,即head的waitStatus改为-1,这次返回false;

    4、shouldParkAfterFailedAcquire执行完毕回到acquireQueued,再次tryAcquire尝试获取锁,当然这时state仍为1,失败;
    5、当再次进入shouldParkAfterFailedAcquire时,这时因为其前驱node的waitStatus已经是-1,这次返回true;
    6、进入parkAndCheckInterrupt,Thread-1 park(灰色表示)

    再次有多个线程经历上述过程竞争失败,变成这个样子

    Thread-0释放锁,进入tryRelease流程,如果成功
  • 设置exclusiveOwnerThread为null;
  • state = 0;

    当前队列不为null,并且head的waitStatus=-1,进入unparkSuccessor流程:
    1、 找到队列中离head最近的一个Node(没取消的),unpark恢复其运行,本例中即为Thread-1;
    2、回到Thread-1的acquireQueued流程;

    如果加锁成功(没有竞争),会设置
  • exclusiveOwnerThread 为Thread-1,state = 1;
  • head指向刚刚Thread-1所在的Node,该Node清空Thread;
  • 原本的head因为从链表断开,而可被垃圾回收;
    如果这时候有其他线程来竞争(非公平的体现),例如这时有Thread-4来了

    如果不巧又被Thread-4占了先
  • Thread-4被设置为exclusiveOwnerThread,state = 1;
  • Thread-1再次进入acquireQueued流程,获取锁失败,重新进入park阻塞;

可重入原理

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果已经获得了锁,线程还是当前线程,表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
            	// state++
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 支持锁重入,只有state减为0,才释放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

可打断原理
不可打断模式:
在此模式下,即使它被打断,仍会驻留在AQS队列中,等获得锁后方能继续运行(是继续运行!只是打断标记被设置为true)。

private final boolean parkAndCheckInterrupt() {
		// 如果打断标记已经是true, 则park会失效
        LockSupport.park(this);
        // interrupted会清楚打断标记(这里保证一个线程可以多次park在这里)
        return Thread.interrupted();
    }

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 还是需要获得锁后,才能返回打断状态(因此,在没获取到锁之前,打断是无效的,会多次进入parkAndCheckInterrupt中park)
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果是因为interrupt被唤醒, 返回打断状态为true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 重新产生一次中断
            selfInterrupt();
    }

可打断模式:

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 如果没有获得到锁, 进入(一)
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

// (一) 可打断的获取锁流程
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 在 park 过程中如果被 interrupt 会进入此
                    // 这时候抛出异常,而不会再次进入for(;;)
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

因此,不可打断只重置打断标记,没获取到锁钱,会再次进入死循环park,所以打断无效,而可打断锁通过抛出异常,不用再进入死循环,从而实现可打断。

公平锁实现原理
非公平锁实现

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 如果还没有获得锁
            if (c == 0) {
            	// 尝试用cas获得,这里体现了非公平性:不去检查AQS队列
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果已经获得了锁,线程还是当前线程,表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
            	// state++
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 获取失败,返回调用处
            return false;
        }

公平锁实现

// 与非公平锁主要区别在于tryAcquire方法的实现
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            	// 先检查AQS队列中是否有前驱节点,没有才去竞争
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

// 队列中是否有节点
public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        // h != t 时表示队列中有Node
        return h != t &&
        	// (s = h.next) == null表示队列中还没有老二
            ((s = h.next) == null || 
            // 或者队列中老二线程不是此线程
            s.thread != Thread.currentThread());
    }

条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是ConditionObject
await流程
开始Thread-0持有锁,调用await,进入ConditionObject 的 addConditionWaiter流程,创建新的Node状态为 -2 (Node.CONDITION),关联Thread-0,加入等待队列尾部

接下来进入AQS的 fullyRelease 流程,释放同步器上的锁

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

park 阻塞 Thread-0

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 进入ConditionObject 的 addConditionWaiter流程,创建新的Node状态为 -2 (Node.CONDITION),关联Thread-0,加入等待队列尾部
            Node node = addConditionWaiter();
            // 释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
            	// 当前线程进入park, 等待被唤醒
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 可重入锁一次释放
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

signal流程
假设 Thread-1 要来唤醒 Thread-0

进入 ConditionObject 的 doSignal流程,取得等待队列中第一个 Node,即 Thread-0 所在Node

执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的waitStatus 改为0,Thread-3 的waitStatus 改为-1

Thread-1 释放锁,进入 unlock 流程

public final void signal() {
			// 当前线程是否持有锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 取conditionObject中第一个节点
            Node first = firstWaiter;
            if (first != null)
            	// 执行doSignal方法
                doSignal(first);
        }

 private void doSignal(Node first) {
            do {
            	// 若队列中只有一个节点,将尾节点也置为空
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            // 节点转移成功,则退出, 转移失败且队列不为空, Node后移, 继续尝试
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

final boolean transferForSignal(Node node) {
        // 将当前节点的状态由 -2 改为 0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        // 将当前节点加入 aqs 队列尾部, 并返回当前节点的前驱节点
        Node p = enq(node);
        int ws = p.waitStatus;
        // 若前驱节点waitStatus > 0 或 前驱节点waitStatus修改为-1成功, 则返回成功
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        	// 否则, 进入unpark流程
            LockSupport.unpark(node.thread);
        return true;
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5638378.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存