tomcat使用AbstractQueuedSynchronizer进行限流分析

tomcat使用AbstractQueuedSynchronizer进行限流分析,第1张

tomcat使用AbstractQueuedSynchronizer进行限流分析

tomcat源码系列导航栏

    tomcat源码分析环境搭建

    tomcat启动过程-load初始化

    tomcat启动过程-start启动

    tomcat使用AbstractQueuedSynchronizer进行限流分析

目录

代码块一acceptor.run()

代码块二countUpOrAwaitConnection()

代码块三 countUpOrAwait

代码块四 tryAcquireShared

代码块五 doAcquireSharedInterruptibly


前言

前一篇文章,分析了tomcat的启动过程,tomcat的start启动过程,一步一步的启动,主线程创建accpetor线程,accpetor线程阻塞的监听8080端口的请求进来,如果有请求进来,然后就把它放入一个事件列表中,又继续监听8080端口。主线程创建的poller线程就去轮训事件列表,如果有事件进来,那就交给线程池去处理。我们这一篇文章来分析tomcat的限流。

我们先看一下上一篇文章的代码块五acceptor.run()方法

代码块一acceptor.run()
  //后台线程监听传入的TCP/IP连接并将其传递给适当的处理器。
        @Override
        public void run() {
            //Thread.currentThread().setName("我是响应用户网页请求的线程");
            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    //代码一这里就是进行tomcat的限流 *** 作
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        //分析一 通过nio的方式获取客户端的请求
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        // We didn't get a socket
                        countDownConnection();
                        if (running) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused) {
                        // setSocketOptions() will hand the socket off to
                        // an appropriate processor if successful
                        //分析二 处理这个请求
                        if (!setSocketOptions(socket)) {
                            closeSocket(socket);
                        }
                    } else {
                        closeSocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }


      我们看的这个方法countUpOrAwaitConnection(),接着进去。

代码块二countUpOrAwaitConnection()
    protected void countUpOrAwaitConnection() throws InterruptedException {
       //这个最大连接数,我们在tomcat启动过程中默认设置的是10000
       //可以看到如果我们修改最大连接数为-1,那就代码不进行限流 *** 作 
       if (maxConnections==-1) {
            return;
        }
        //这里获取一个限流器,也是在tomcat启动过程中默认创建的限流器
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) {
            //进行限流 *** 作
            latch.countUpOrAwait();
        }
    }

        我们可以看到这里获得了一个限流器。LimitLatch 这个限流器调用Latch的countUpOrAwait进行限流

代码块三 countUpOrAwait
    
    public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
        }
        
        sync.acquireSharedInterruptibly(1);
    }

        可以看到这个方法就是调用了一个sync.acquireSharedInterruptibly(1);就进行了限流 *** 作。那么它的原理是什么。首先我们看一下sync是一个什么东西。

//这是一个限流器
public class LimitLatch {

    private static final Log log = LogFactory.getLog(LimitLatch.class);
    
    //我们可以发现Sync是一个继承自AQS的内部类
    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }
}

        我们可以看到原来Sync其实是LimitLatch的内部类,并且它还继承自AQS,我们知道AQS定义了一套多线程访问共享资源的同步器框架。其实原理就是它定义了一个双向链表和节点构成的队列。我们如果获取锁成功,则往下 *** 作。如果获取锁失败(说明锁被别人拿去了,排队等待),则将线程创建成一个节点放入到哪个双向链表中去进行等待。

        放入到队列中的线程。底层使用LockSupport.park(this)来阻塞线程,LockSupport是使用UNSAFE这个类来使 *** 作系统进行线程的阻塞。

        如果你不了解AQS这里可以参考一篇文章        Java并发:AbstractQueuedSynchronizer详解(独占模式)

        接下来的部分需要先了解AQS才能理解更加深刻

        我们接着往下看

sync.acquireSharedInterruptibly(1)。这个方法做了什么
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

接着进去看

代码块四 tryAcquireShared
 //尝试去 *** 作 返回-1代表需要进行限流了。然后1代表不需要限流。
        @Override
        protected int tryAcquireShared(int ignored) {
            //count是统计的客户端的请求数,每一次请求进来,count的原子的增加1
            long newCount = count.incrementAndGet();
            //如果请求数已经大于了我们设置的最大连接数10000
            if (!released && newCount > limit) {
                // Limit exceeded
                //我们将count减少1
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }
代码块五 doAcquireSharedInterruptibly
//以下这个方法就是,当我们尝试去获取锁失败后,
//我们会创建一个节点,然后将该节点设置进一个由双向链表组成的队列中去。
//然后再根据节点的状态看是否需要进行线程阻塞
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

        我们可以看到以上这个方法就是,当我们尝试去获取锁失败后,我们会创建一个节点,然后将该节点设置进一个由双向链表组成的队列中去。然后再根据节点的状态看是否需要进行线程阻塞。这样我们就实现了限流 *** 作。

        最后总结一下AbstractQueuedSynchronizer 这个类的设计方式,和具体的实现类是怎么个 *** 作方式。首先AQS这个类提供了必须要子类实现以下四个方法,

作用分别是tryAcquire 、获取 独享锁,tryRelease、释放独享锁,tryAcquireShared获取共享锁,tryReleaseShared释放共享锁。两两配对使用的。

         具体怎样才算获取锁成功,这个需要我们自己业务确定。比如本篇文章,LimitLatch使用到的获取共享锁,只有我们的请求连接数小于10000。我们就认为这个动作就是获取锁成功。如果获取锁失败,那么就将当前线程放入队列,然后阻塞。

        再比如ReentrantLock这个类是一个独享锁。它定义了一个state整数字段。0表示没有线程之前获取过锁,那么我们线程去 *** 作的时候,将state由0修改为1,那么我们就认为获取锁成功。如果state是1或者大于1的整数。并且之前不是当前线程把它获取的。那么当前线程就插入队列中去阻塞。代表获取锁失败。

        诸如此类,我们以此类推Semaphore、CountDownLatch、ReentrantReadWriteLock都是类似的使用。

 
 
 
					
										


					

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5595892.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存