Semaphore

Semaphore,第1张

Semaphore 示例
	public void park(){
        Semaphore semaphore = new Semaphore(2);
        for(int i=0;i<10;i++){
            Thread thread=new Thread(() -> {
                    try {
                        System.out.println("===="+Thread.currentThread().getName()+"来到停车场");
                        if(semaphore.availablePermits()==0){
                            System.out.println("车位不足,请耐心等待");
                        }
                        semaphore.acquire();//获取令牌尝试进入停车场
                        System.out.println(Thread.currentThread().getName()+"成功进入停车场");
                        Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间
                        System.out.println(Thread.currentThread().getName()+"驶出停车场");
                        semaphore.release();//释放令牌,腾出停车场车位
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            },i+"号车");
            thread.start();
        }
    }
成员变量和构造方法
public class Semaphore implements java.io.Serializable {
	//同步器
	private final Sync sync;
	//默认非公平锁,permits确定信号个数
	public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    //fair控制公平锁或非公平锁,permits确定信号个数
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
}
内部类

Sync

	abstract static class Sync extends AbstractQueuedSynchronizer {
		
        Sync(int permits) {
            setState(permits); //设置信号量
        }

        final int getPermits() { //获取剩余信号量
            return getState();
        }

		
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState(); //获取剩余信号量
                int remaining = available - acquires; //除去本次,剩余可使用信号量
                if (remaining < 0 || //剩余信号量小于0,去等待队列
                    compareAndSetState(available, remaining)) //替换剩余信号量
                    return remaining;
            }
        }
		
		
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState(); //获取剩余信号量
                int next = current + releases; //剩余信号量+releases
                if (next < current) // 
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next)) //替换剩余信号量
                    return true;
            }
        }
		
		
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState(); //获取当前信号量
                int next = current - reductions; // 当前信号量减少reductions个
                if (next > current)  
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next)) //替换信号量
                    return;
            }
        }
		
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

NonfairSync

	static final class NonfairSync extends Sync {
       	//
        NonfairSync(int permits) {
            super(permits);
        }
		//上锁
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

FairSync

	static final class FairSync extends Sync {
        

        FairSync(int permits) {
            super(permits);
        }
		
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors()) //判断等待队列中是否有线程等待
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
重要方法
	
	public void acquire() throws InterruptedException {
		//实际调用AQS中acquireSharedInterruptibly(int arg)方法,该方法中调用tryAcquireShared(arg)方法获取共享锁,tryAcquireShared有公平锁和非公平锁两种实现
        sync.acquireSharedInterruptibly(1); 
    }
    
    
	public void release() {
        sync.releaseShared(1); //实际通过AQS中的tryReleaseShared(arg)方法调用子类实现的释放锁方法,释放锁成功后唤起等待队列中的线程
    }
	
	
	public int availablePermits() {
        return sync.getPermits();
    }
	
	
	public int drainPermits() {
        return sync.drainPermits();
    }
    
    
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

Semaphore常叫做信号量,用来控制访问特定资源的线程数量,通常用来限制并发数量。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5636770.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存