Druid源码阅读系列(五)

Druid源码阅读系列(五),第1张

Druid源码阅读系列(五)

今天来看看昨天剩下的那块createAndStartDestroyThread()代码。

前言

开篇点题,如果让你设计一个数据库连接回收的线程你会如何去设计?如何判断一个连接是否是需要回收,放进连接池,如何判断连接是否需要废弃,当某个时刻出现大量连接请求之后,后续需要使用的连接大幅减少,这时候对暂时不用的线程作何处理?

以上问题在createAndStartDestroyThread()方法里都有实现。

代码解读

先整体看下这块代码:

protected void createAndStartDestroyThread() {
    destroyTask = new DestroyTask();

    if (destroyScheduler != null) {
        long period = timeBetweenEvictionRunsMillis;
        if (period <= 0) {
            period = 1000;
        }
        destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
                                                                      TimeUnit.MILLISECONDS);
        initedLatch.countDown();
        return;
    }

    String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
    destroyConnectionThread = new DestroyConnectionThread(threadName);
    destroyConnectionThread.start();
}

可以看出,在摧毁连接线程里同步和异步都是使用的同一个destroyTask,这和创建连接线程分两种情况是有差异的。这里的异步只是按照周期执行线程而已,这个周期是用户可以自己设置,参数为timeBetweenEvictionRunsMillis,如果不设置,默认为1000ms

其实你看了同步的代码会发现他们实现其实是一回事,所以这两种方式的唯一区别就是一个用自定义的destroyScheduler线程池去执行,另一个用守护线程去执行。

public class DestroyConnectionThread extends Thread {

    public DestroyConnectionThread(String name){
        super(name);
        this.setDaemon(true);
    }

    public void run() {
        initedLatch.countDown();

        for (;;) {
            // 从前面开始删除
            try {
                if (closed || closing) {
                    break;
                }

                if (timeBetweenEvictionRunsMillis > 0) {
                    Thread.sleep(timeBetweenEvictionRunsMillis);
                } else {
                    Thread.sleep(1000); //
                }

                if (Thread.interrupted()) {
                    break;
                }

                destroyTask.run();
            } catch (InterruptedException e) {
                break;
            }
        }
    }

}

最后会手动调用destroyTask.run()方法执行。

Thread.run()和Thread.start()区别

这里简单复习下线程实例调用start()方法和调用run()方法的区别,调用start()方法会将线程变为就绪态,但是不一定会立即执行,这个要看cpu的调度情况,而调用run()方法,其实就是调用本地方法而已,就和你调用A.run()是一个意思。它实际上和线程调用没啥关系,调用它的线程是当前线程而不是你起的那个线程,参考我下面这个demo。

public class Demo {
    public static void main(String[] args) {

        Thread t=new Thread(new T(),"myThread");
        t.run();

        t.start();
    }

    static class T implements Runnable{
        @Override
        public void run() {
            System.out.println("当前线程"+Thread.currentThread().getName());
        }
    }
}

输出结果如下:

DestroyTask线程

好了,我们继续看源码,来看看这个摧毁线程的实现。

public class DestroyTask implements Runnable {
    public DestroyTask() {

    }

    @Override
    public void run() {
        shrink(true, keepAlive);

        if (isRemoveAbandoned()) {
            removeAbandoned();
        }
    }

}

从名字看知道有两块逻辑,收缩连接和移除废弃连接。

shrink方法

先看看shrink()方法,代码很长,我都给了注释,看着应该轻松点:

public void shrink(boolean checkTime, boolean keepAlive) {
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            return;
        }

        //是否需要填冲连接,条件为keepAlive && poolingCount + activeCount < minIdle
        //保活开启且池中连接+活跃连接<最小链接
        boolean needFill = false;
        //需要关闭连接数
        int evictCount = 0;
        //保活连接数
        int keepAliveCount = 0;
        //我们在sql执行时可能发生一些不可恢复的异常(致命)导致连接不可用,这种连接放到连接池其实是毫无用处的,
        //类似于“脏数据”,fatalErrorCount就是记录这种连接的数据,
        //fatalErrorIncrement表示距离上次检测的增量
        int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
        //记录最近一次致命异常连接数
        fatalErrorCountLastShrink = fatalErrorCount;
        
        try {
            //初始化未成功跳过检测
            if (!inited) {
                return;
            }

            //需要检测数量=连接池持有数量-最小连接数
            final int checkCount = poolingCount - minIdle;
            final long currentTimeMillis = System.currentTimeMillis();
            //遍历整个连接池连接
            for (int i = 0; i < poolingCount; ++i) {
                DruidConnectionHolder connection = connections[i];

                //onFatalError判断条件为fatalErrorCount - fatalErrorCountLastShrink > onFatalErrorMaxActive
                //发生了连接错误或者存在错误增量并且最近连接错误时间大于连接时间创建时间
                // 证明这个连接有一定几率不可用,放进保活检查数组里进行检测 并跳过本次循环
                if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis))  {
                    //需检测的保活连接数组
                    keepAliveConnections[keepAliveCount++] = connection;
                    continue;
                }

                if (checkTime) {
                    //物理超时时间,主要用于跳过mysql 8小时自动断开连接
                    if (phyTimeoutMillis > 0) {
                        //物理连接时间
                        long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                        //超过设置超时时间则关闭
                        if (phyConnectTimeMillis > phyTimeoutMillis) {
                            //需要关闭的连接
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }

                    //当前时间-上次活跃时间=存活时间
                    long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;

                    //存活时间<最小存活时间且存活时间<保活检测时间则跳出检测
                    //这里这个判断想了很久,想通了感觉设计的真的很妙,
                    //因为connections是加锁生成的,所以放进去的一定是按顺序的,那前面的元素lastActiveTimeMillis时间一定比后面元素小,
                    //相应的前面元素的idleMillis一定是比后面元素的小的,所以只要前面的满足这个条件,后面的就不需要判断了,所以可以直接break
                    if (idleMillis < minEvictableIdleTimeMillis
                            && idleMillis < keepAliveBetweenTimeMillis
                    ) {
                        break;
                    }

                    //大于等于最小存活时间
                    if (idleMillis >= minEvictableIdleTimeMillis) {
                        //小于检查数放进关闭数据
                        if (checkTime && i < checkCount) {
                            evictConnections[evictCount++] = connection;
                            continue;
                        } else if (idleMillis > maxEvictableIdleTimeMillis) {
                            //大于最大存活时间也扔进去
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }

                    //开启保活且存活时间超过保活检测周期放到保活检测数据
                    if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
                        keepAliveConnections[keepAliveCount++] = connection;
                    }
                } else {
                    if (i < checkCount) {
                        evictConnections[evictCount++] = connection;
                    } else {
                        break;
                    }
                }
            }

            //移除数量=关闭连接数+检测保活数
            int removeCount = evictCount + keepAliveCount;
            //大于0则将从removeCount位置起将剩余的数据往前移到下标为0的位置,再将后面数组元素填充为null,其实就是删除前removecount个元素
            if (removeCount > 0) {
                System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
                Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
                poolingCount -= removeCount;
            }
            //更新检查数
            keepAliveCheckCount += keepAliveCount;

            //开启keepAlive会将连接数直接填充到最小连接数
            if (keepAlive && poolingCount + activeCount < minIdle) {
                needFill = true;
            }
        } finally {
            lock.unlock();
        }

        //关闭连接
        if (evictCount > 0) {
            for (int i = 0; i < evictCount; ++i) {
                DruidConnectionHolder item = evictConnections[i];
                Connection connection = item.getConnection();
                JdbcUtils.close(connection);
                destroyCountUpdater.incrementAndGet(this);
            }
            Arrays.fill(evictConnections, null);
        }

        //keepAlive检测
        if (keepAliveCount > 0) {
            // keep order
            for (int i = keepAliveCount - 1; i >= 0; --i) {
                DruidConnectionHolder holer = keepAliveConnections[i];
                Connection connection = holer.getConnection();
                holer.incrementKeepAliveCheckCount();

                //进行有效性校验,通过则validate为true,否则会被丢弃
                boolean validate = false;
                try {
                    this.validateConnection(connection);
                    validate = true;
                } catch (Throwable error) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("keepAliveErr", error);
                    }
                    // skip
                }

                //未通过校验则丢弃
                boolean discard = !validate;
                if (validate) {
                    holer.lastKeepTimeMillis = System.currentTimeMillis();
                    //很奇怪的如果当前连接池里面有这个连接会丢掉
                    boolean putOk = put(holer, 0L, true);
                    if (!putOk) {
                        discard = true;
                    }
                }

                //丢弃连接
                if (discard) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                        // skip
                    }

                    lock.lock();
                    try {
                        discardCount++;

                        //当连接小于等于最小连接则释放空信号
                        if (activeCount + poolingCount <= minIdle) {
                            emptySignal();
                        }
                    } finally {
                        lock.unlock();
                    }
                }
            }
            this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
            Arrays.fill(keepAliveConnections, null);
        }

        //填充连接
        if (needFill) {
            lock.lock();
            try {
                int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
                for (int i = 0; i < fillCount; ++i) {
                    emptySignal();
                }
            } finally {
                lock.unlock();
            }
        } else if (onFatalError || fatalErrorIncrement > 0) {
            lock.lock();
            try {
                emptySignal();
            } finally {
                lock.unlock();
            }
        }
    }

强调一下下面这块代码设计的真的很妙,因为connections是加锁生成的,所以放进去的一定是按顺序的,那前面的元素lastActiveTimeMillis时间一定比后面元素小,相应的前面元素的idleMillis一定是比后面元素的小的,所以只要前面的满足这个条件,后面的就不需要判断了,所以可以直接break.

if (idleMillis < minEvictableIdleTimeMillis
        && idleMillis < keepAliveBetweenTimeMillis
) {
    break;
}
removeAbandoned方法

当设置了removeAbandoned属性时,会开启强制关闭连接模式,用于强制kill掉一些查询时间较长可能被锁住的查询。通过设置removeAbandonedTimeoutMillis参数限定遗弃时间。这个是直接在activeConnections里面去移除,这个是十分危险的,可能导致不必要的连接创建,官方都建议不要用在生产。参看如下代码:

protected void initCheck() throws SQLException {
 
    、、、、、、、、、、、、、、、、、、、、、、、、、、、、

    if (removeAbandoned) {
        LOG.warn("removeAbandoned is true, not use in production.");
    }
}

removeAbandoned代码相对shrink很简单,可以看着我的注释过一下:

public int removeAbandoned() {
    int removeCount = 0;

    long currrentNanos = System.nanoTime();

    List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();

    activeConnectionLock.lock();
    try {
        Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();

        //遍历活跃连接
        for (; iter.hasNext();) {
            DruidPooledConnection pooledConnection = iter.next();

            //如果在执行跳过
            if (pooledConnection.isRunning()) {
                continue;
            }

            long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);

            //存活时间大于设定时间,丢弃连接
            if (timeMillis >= removeAbandonedTimeoutMillis) {
                iter.remove();
                pooledConnection.setTraceEnable(false);
                abandonedList.add(pooledConnection);
            }
        }
    } finally {
        activeConnectionLock.unlock();
    }

    //开始丢弃连接
    if (abandonedList.size() > 0) {
        for (DruidPooledConnection pooledConnection : abandonedList) {
            final ReentrantLock lock = pooledConnection.lock;
            lock.lock();
            try {
                //连接不可用跳过
                if (pooledConnection.isDisable()) {
                    continue;
                }
            } finally {
                lock.unlock();
            }

            JdbcUtils.close(pooledConnection);
            //就行标记和数值记录
            pooledConnection.abandond();
            removeAbandonedCount++;
            removeCount++;

            //记录日志
            if (isLogAbandoned()) {
                StringBuilder buf = new StringBuilder();
                buf.append("abandon connection, owner thread: ");
                buf.append(pooledConnection.getOwnerThread().getName());
                buf.append(", connected at : ");
                buf.append(pooledConnection.getConnectedTimeMillis());
                buf.append(", open stackTrace\n");

                StackTraceElement[] trace = pooledConnection.getConnectStackTrace();
                for (int i = 0; i < trace.length; i++) {
                    buf.append("\tat ");
                    buf.append(trace[i].toString());
                    buf.append("\n");
                }

                buf.append("ownerThread current state is " + pooledConnection.getOwnerThread().getState()
                           + ", current stackTrace\n");
                trace = pooledConnection.getOwnerThread().getStackTrace();
                for (int i = 0; i < trace.length; i++) {
                    buf.append("\tat ");
                    buf.append(trace[i].toString());
                    buf.append("\n");
                }

                LOG.error(buf.toString());
            }
        }
    }

    return removeCount;
}

强烈建议不要在生产用removeAbandoned,否则你并发比较高的情况下会给服务器造成很大压力的。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/921325.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存