public void park(){ Semaphore semaphore = new Semaphore(2); for(int i=0;i<10;i++){ Thread thread=new Thread(() -> { try { System.out.println("===="+Thread.currentThread().getName()+"来到停车场"); if(semaphore.availablePermits()==0){ System.out.println("车位不足,请耐心等待"); } semaphore.acquire();//获取令牌尝试进入停车场 System.out.println(Thread.currentThread().getName()+"成功进入停车场"); Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间 System.out.println(Thread.currentThread().getName()+"驶出停车场"); semaphore.release();//释放令牌,腾出停车场车位 } catch (InterruptedException e) { e.printStackTrace(); } },i+"号车"); thread.start(); } }成员变量和构造方法
public class Semaphore implements java.io.Serializable { //同步器 private final Sync sync; //默认非公平锁,permits确定信号个数 public Semaphore(int permits) { sync = new NonfairSync(permits); } //fair控制公平锁或非公平锁,permits确定信号个数 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } }内部类
Sync
abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); //设置信号量 } final int getPermits() { //获取剩余信号量 return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); //获取剩余信号量 int remaining = available - acquires; //除去本次,剩余可使用信号量 if (remaining < 0 || //剩余信号量小于0,去等待队列 compareAndSetState(available, remaining)) //替换剩余信号量 return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); //获取剩余信号量 int next = current + releases; //剩余信号量+releases if (next < current) // throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) //替换剩余信号量 return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); //获取当前信号量 int next = current - reductions; // 当前信号量减少reductions个 if (next > current) throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) //替换信号量 return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
NonfairSync
static final class NonfairSync extends Sync { // NonfairSync(int permits) { super(permits); } //上锁 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
FairSync
static final class FairSync extends Sync { FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) //判断等待队列中是否有线程等待 return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }重要方法
public void acquire() throws InterruptedException { //实际调用AQS中acquireSharedInterruptibly(int arg)方法,该方法中调用tryAcquireShared(arg)方法获取共享锁,tryAcquireShared有公平锁和非公平锁两种实现 sync.acquireSharedInterruptibly(1); } public void release() { sync.releaseShared(1); //实际通过AQS中的tryReleaseShared(arg)方法调用子类实现的释放锁方法,释放锁成功后唤起等待队列中的线程 } public int availablePermits() { return sync.getPermits(); } public int drainPermits() { return sync.drainPermits(); } protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
Semaphore常叫做信号量,用来控制访问特定资源的线程数量,通常用来限制并发数量。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)