tomcat源码系列导航栏
tomcat源码分析环境搭建
tomcat启动过程-load初始化
tomcat启动过程-start启动
tomcat使用AbstractQueuedSynchronizer进行限流分析
目录
代码块一acceptor.run()
代码块二countUpOrAwaitConnection()
代码块三 countUpOrAwait
前言
前一篇文章,分析了tomcat的启动过程,tomcat的start启动过程,一步一步的启动,主线程创建accpetor线程,accpetor线程阻塞的监听8080端口的请求进来,如果有请求进来,然后就把它放入一个事件列表中,又继续监听8080端口。主线程创建的poller线程就去轮训事件列表,如果有事件进来,那就交给线程池去处理。我们这一篇文章来分析tomcat的限流。
我们先看一下上一篇文章的代码块五acceptor.run()方法
代码块一acceptor.run()//后台线程监听传入的TCP/IP连接并将其传递给适当的处理器。 @Override public void run() { //Thread.currentThread().setName("我是响应用户网页请求的线程"); int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait //代码一这里就是进行tomcat的限流 *** 作 countUpOrAwaitConnection(); SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket //分析一 通过nio的方式获取客户端的请求 socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful //分析二 处理这个请求 if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; }
我们看的这个方法countUpOrAwaitConnection(),接着进去。
protected void countUpOrAwaitConnection() throws InterruptedException { //这个最大连接数,我们在tomcat启动过程中默认设置的是10000 //可以看到如果我们修改最大连接数为-1,那就代码不进行限流 *** 作 if (maxConnections==-1) { return; } //这里获取一个限流器,也是在tomcat启动过程中默认创建的限流器 LimitLatch latch = connectionLimitLatch; if (latch!=null) { //进行限流 *** 作 latch.countUpOrAwait(); } }
我们可以看到这里获得了一个限流器。LimitLatch 这个限流器调用Latch的countUpOrAwait进行限流
代码块三 countUpOrAwaitpublic void countUpOrAwait() throws InterruptedException { if (log.isDebugEnabled()) { log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount()); } sync.acquireSharedInterruptibly(1); }
可以看到这个方法就是调用了一个sync.acquireSharedInterruptibly(1);就进行了限流 *** 作。那么它的原理是什么。首先我们看一下sync是一个什么东西。
//这是一个限流器 public class LimitLatch { private static final Log log = LogFactory.getLog(LimitLatch.class); //我们可以发现Sync是一个继承自AQS的内部类 private class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; public Sync() { } }
我们可以看到原来Sync其实是LimitLatch的内部类,并且它还继承自AQS,我们知道AQS定义了一套多线程访问共享资源的同步器框架。其实原理就是它定义了一个双向链表和节点构成的队列。我们如果获取锁成功,则往下 *** 作。如果获取锁失败(说明锁被别人拿去了,排队等待),则将线程创建成一个节点放入到哪个双向链表中去进行等待。
放入到队列中的线程。底层使用LockSupport.park(this)来阻塞线程,LockSupport是使用UNSAFE这个类来使 *** 作系统进行线程的阻塞。
如果你不了解AQS这里可以参考一篇文章 Java并发:AbstractQueuedSynchronizer详解(独占模式)
接下来的部分需要先了解AQS才能理解更加深刻
我们接着往下看
sync.acquireSharedInterruptibly(1)。这个方法做了什么
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
接着进去看
代码块四 tryAcquireShared//尝试去 *** 作 返回-1代表需要进行限流了。然后1代表不需要限流。 @Override protected int tryAcquireShared(int ignored) { //count是统计的客户端的请求数,每一次请求进来,count的原子的增加1 long newCount = count.incrementAndGet(); //如果请求数已经大于了我们设置的最大连接数10000 if (!released && newCount > limit) { // Limit exceeded //我们将count减少1 count.decrementAndGet(); return -1; } else { return 1; } }代码块五 doAcquireSharedInterruptibly
//以下这个方法就是,当我们尝试去获取锁失败后, //我们会创建一个节点,然后将该节点设置进一个由双向链表组成的队列中去。 //然后再根据节点的状态看是否需要进行线程阻塞 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
我们可以看到以上这个方法就是,当我们尝试去获取锁失败后,我们会创建一个节点,然后将该节点设置进一个由双向链表组成的队列中去。然后再根据节点的状态看是否需要进行线程阻塞。这样我们就实现了限流 *** 作。
最后总结一下AbstractQueuedSynchronizer 这个类的设计方式,和具体的实现类是怎么个 *** 作方式。首先AQS这个类提供了必须要子类实现以下四个方法,
作用分别是tryAcquire 、获取 独享锁,tryRelease、释放独享锁,tryAcquireShared获取共享锁,tryReleaseShared释放共享锁。两两配对使用的。
具体怎样才算获取锁成功,这个需要我们自己业务确定。比如本篇文章,LimitLatch使用到的获取共享锁,只有我们的请求连接数小于10000。我们就认为这个动作就是获取锁成功。如果获取锁失败,那么就将当前线程放入队列,然后阻塞。
再比如ReentrantLock这个类是一个独享锁。它定义了一个state整数字段。0表示没有线程之前获取过锁,那么我们线程去 *** 作的时候,将state由0修改为1,那么我们就认为获取锁成功。如果state是1或者大于1的整数。并且之前不是当前线程把它获取的。那么当前线程就插入队列中去阻塞。代表获取锁失败。
诸如此类,我们以此类推Semaphore、CountDownLatch、ReentrantReadWriteLock都是类似的使用。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)