1) Semaphore,俗称信号量 基于AbstractQueuedSynchronizer实现!AQS 2) Semaphore管理着一组许可permit,许可的初始数量通过构造函数设定。 3) 默认使用非公平的方式 sync = new NonfairSync(permits); 使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数 4) 当线程要访问共享资源时,需要先通过acquire()方法获取许可。获取到之后许可就被当前线程占用了,在归还许可之前其他线程不能获取这个许可。 调用acquire()方法时,如果没有许可可用了,就将线程阻塞,等待有许可被归还了再执行。 当执行完业务功能后,需要通过release()方法将许可证归还,以便其他线程能够获得许可证继续执行。 如果初始化了一个许可为1的Semaphore,那么就相当于一个不可重入的互斥锁(Mutex)。
举个生活中的小栗子:
我们假设停车场仅有3个停车位,停车位就是有限的共享资源,许可数为3。一开始停车场没有车辆所有车位全部空着,然后先后到来三辆车,停车场车位够,安排进去停车。之后来的车必须在外面候着,直到停车场有空车位。当停车场有车开出去,里面有空位了,则安排一辆车进去(至于是哪辆要看选择的机制是公平还是非公平)。
从程序角度看,停车场就相当于有限的公共资源,许可数为3,车辆就相当于线程。当来一辆车时,许可数就会减1,当停车场没有车位了(许可数为0),其他来的车辆需要在外面等候着。如果有一辆车开出停车场,许可数+1,然后放进来一辆
2.代码举例实现package com.concurrent;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
//Semaphore管理着一组许可permit,许可的初始数量通过构造函数设定。
/**
* Semaphore的主要方法摘要:
*
* void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
*
* void release():释放一个许可,将其返回给信号量。
*
* int availablePermits():返回此信号量中当前可用的许可数。
*
* boolean hasQueuedThreads():查询是否有线程正在等待获取。
*/
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=6 ; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"\t 抢到了车位");
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"\t 停车3s后离开");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
代码结果输出:
1 抢到了车位
3 抢到了车位
2 抢到了车位
1 停车3s后离开
4 抢到了车位
3 停车3s后离开
2 停车3s后离开
5 抢到了车位
6 抢到了车位
5 停车3s后离开
6 停车3s后离开
4 停车3s后离开
Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。假如有多个线程读取数据后,需要将数据保存在数据库中,而可用的最大数据库连接只有10个,这时候就需要使用Semaphore来控制能够并发访问到数据库连接资源的线程个数最多只有10个。在限制资源使用的应用场景下,Semaphore是特别合适的。
3.源码分析 acquire()acquire()方法就是获取许可,获取到许可就可以继续执行访问共享资源,获取不到就阻塞等待其他线程归还许可。
AQS.state用来记录可用的许可数量,每获取一个许可state减1。
/*** 获取许可的方法其实就是获取锁的方法 */
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 真正获取锁的方法,由Semaphore.NonfairSync实现 doAcquireSharedInterruptibly(arg); // 获取锁失败,当前线程阻塞并进入AQS同步队列}/** * Semaphore.NonfairSync实现的获取锁的方法 *
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
** * 每获取一个许可,将state-1,state表示剩余的许可数 * 如果许可已经用完,返回remaining<0,表示获取不到锁/许可,线程阻塞 * 如果还有许可,返回remaining>=0,表示获取到锁/许可,线程继续执行
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
4.源码分析 release()
release()方法归还许可,其实就是将AQS.state加1。归还成功,唤醒AQS队列中等锁的线程,从被阻塞的位置开始执行。
public final boolean releaseShared(int arg)
{
if (tryReleaseShared(arg)) {
// 释放锁,由Semaphore.Sync实现
doReleaseShared();
// 释放锁成功,唤醒AQS队列中等锁的线程
return true;
}
return false;
}
/** * 每归还一个许可将state加1 */
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;// 每归还一个许可将state加1
// if (next < current) // overflow throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
5.小总结
信号量Semaphore用于控制资源能够被并发访问的线程数量,以保证多个线程能够合理的使用特定资源,比如数据库连接等。
Semaphore在构造时设置一个许可数量,这个许可数量用AQS.state来记录。
acquire()方法就是获取许可,只有获取到许可才可以继续执行访问共享资源,获取到许可之后AQS.state减1,以记录当前可用的许可数量;如果获取不到许可,线程就阻塞等待其他线程归还许可。
release()方法将许可归还,AQS.state加1;归还之后,唤醒AQS队列中阻塞的线程获取许可。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)