1. 背景 🔗
1.1 现象 🔗
2021.12.16 凌晨,我们的应用数据库因故发生了主备切换,之后某个 Pod 就持续报错 GetConnectionTimeoutException
,并且该 Pod 的进程一直挂起,无法正常提供服务。
不过比较奇怪的是,当时连接同一数据库的其他 Pod 虽然也有几条报错(主要是数据库连接超时的报错),但很快其他 Pod 都恢复了正常,就只有这一个 Pod 一直没有恢复。
1.2 报错信息 🔗
异常 Pod 的部分报错信息如下:
[2021-12-16 02:10:06.617] [ERROR] [thread-30849] com.alibaba.druid.pool.DruidPooledStatement errorCheck:367 - CommunicationsException, druid version 1.2.5, jdbcUrl : jdbc:mysql://xxx:3306/xxx?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true, testWhileIdle true, idle millis 1785, minIdle 5, poolingCount 0, timeBetweenEvictionRunsMillis 300000, lastValidIdleMillis 1785, driver com.mysql.cj.jdbc.Driver, exceptionSorter com.alibaba.druid.pool.vendor.MySqlExceptionSorter
......
2021-12-16 02:10:06.617] [ERROR] thread-30849] Caused by: com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 100000, active 399, maxActive 100, creating 0
at com.alibaba.druid.pool.DruidDataSource.getConnectionInternal(DruidDataSource.java:1745)
at com.alibaba.druid.pool.DruidDataSource.getConnectionDirect(DruidDataSource.java:1415)
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1395)
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1385)
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:100)
at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:158)
at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:116)
at org.springframework.orm.ibatis.SqlMapClientTemplate.execute(SqlMapClientTemplate.java:182)
... 26 common frames omitted
从报错信息来看, GetConnectionTimeoutException 即获取连接超时。但数据库和网络都是正常的,为什么获取数据库连接超时呢?
此外错误栈中的 active
居然比 maxActive
大,最大连接数 maxActive
是 100,为什么活跃连接数能达到 399?
在 Druid 的 issues 中, GetConnectionTimeoutException
也是一个非常常见的问题,那这个异常到底是怎么产生的呢,又该如何避免呢?
接下来就让我们带着这些疑问,通过阅读 Druid 源码进行深入的分析。
2.1.3 Druid 配置 🔗
在进行问题分析前,先看一下当前的 Druid 配置。Druid 的版本为 1.2.5,因此在接下来的内容中,我都将根据 1.2.5 版本的源码进行分析。Druid 的具体配置如下:
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
<property name="url" value="${jdbc_url}" />
<property name="username" value="${jdbc_user}" />
<property name="password" value="${jdbc_password}" />
<property name="maxActive" value="100"/>
<property name="initialSize" value="1"/>
<property name="maxWait" value="100000"/>
<property name="minIdle" value="1"/>
<property name="timeBetweenEvictionRunsMillis" value="300000"/>
<property name="validationQuery" value="SELECT 1"/>
<property name="testWhileIdle" value="true"/>
<property name="removeAbandoned" value="false"/>
<property name="removeAbandonedTimeout" value="600"/>
</bean>
2. 关键变量 🔗
在错误栈中,我们看到了 pollingCount
、active
、maxActive
等变量,那这些变量到底是什么含义呢?
这一章节就先来了解一下。同时了解了这些变量的含义,也更利于后续阅读源码分析问题。
2.1 配置项 🔗
这里列出一些本文会涉的及关键配置,更多配置可参考 DruidDataSource配置属性列表 。
2.1.1 minIdle 🔗
连接池中至少需要保持的连接数。
2.1.2 maxActive 🔗
连接池中最大连接数。
2.1.3 maxWait 🔗
单次获取连接的最长等待时间。
2.2 DruidDataSource 🔗
DruidDataSource 是数据源,每个数据源都有一个对应的 DruidDataSource 实例。
2.2.1 activeCount 🔗
当前活跃连接数。每当成功获取到一个连接后,activeCount 都会加一,同时也会将 holder
的 active
属性设置为 true。
private DruidPooledConnection getConnectionInternal(long maxWait) {
// ...
if (holder != null) {
if (holder.discard) {
continue;
}
activeCount++;
holder.active = true;
}
}
2.2.2 connections 🔗
当前所数据源拥有的连接池。其类型是 DruidConnectionHolder[]
。
2.2.3 poolingCount 🔗
连接池中的的连接数。
每当放入连接到连接池中,就会将 poolingCount 加一,例如:
private boolean put(DruidConnectionHolder holder, long createTaskId) {
// ...
connections[poolingCount] = holder;
incrementPoolingCount();
// ...
}
private final void incrementPoolingCount() {
poolingCount++;
}
每当将连接从连接池中取出,就会将 poolingCount 减一,例如:
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
// ...
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
// ...
return last;
}
private final void decrementPoolingCount() {
poolingCount--;
}
2.3 DruidConnectionHolder 🔗
数据库连接以及相关属性。
2.3.1 conn 🔗
数据库连接,类型为 Connection。
2.3.2 active 🔗
用来标记连接是否活跃。
2.3.3 lastActiveTimeMillis 🔗
连接上次活跃时间。
2.3.4 lastKeepTimeMillis 🔗
连接上次保持活跃的时间。每次通过 keepAlive 保持连接活跃,就会更新此时间。
2. 源码分析 🔗
接下来,就让我们从错误栈开始深入源码,一层一层寻找问题的根源。
2.2.1 异常是如何抛出的? 🔗
首先 GetConnectionTimeoutException
是由 getConnectionInternal
方法抛出的,该方法的主要作用就是获取数据库连接,当前没有获取到连接也就是 holder
为 null
的时候,就会抛出异常,同时打印出当前 active
和 maxActive
的值。
if (holder == null) {
// ...
StringBuilder buf = new StringBuilder(128);
buf.append("wait millis ")//
.append(waitNanos / (1000 * 1000))//
.append(", active ").append(activeCount)//
.append(", maxActive ").append(maxActive)//
.append(", creating ").append(creatingCount)//
;
// ...
String errorMessage = buf.toString();
if (this.createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
}
holder
是 DruidConnectionHolder
的实例。那为什么 holder
为 null
呢?接下来就让我们再看一下 holder
是如何创建的。
2.2 连接是如何创建的? 🔗
接下来就看 holder
是如何创建的。
在 getConnectionInternal
中, holder
的创建是在 for 循环中进行的,一旦创建成功则退出循环。
在创建 holder
时,有同步和异步两种方式。
2.2.1 同步创建连接 🔗
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
如果需要同步创建连接,需要同时满足以下几个条件:
- 不配置创建连接的线程池,即
createScheduler != null
- 目前连接池中没有可用连接,即
poolingCount == 0
- 目前活跃的连接数小于最大连接数,即
activeCount < maxActive
- 目前没有正在同步创建的连接,也就是统一时刻只能存在一个线程在同步创建连接,即
creatingCountUpdater.get(this) == 0
满足上述条件后,createDirect
会被设置为 true
,然后在下一次循环中就会同步创建连接。也正因为有上述条件在,所以只有很少的线程会进入到同步创建连接的流程中。
private DruidPooledConnection getConnectionInternal(long maxWait) {
// ...
if (createDirect) {
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
// ...
boolean discard = false;
lock.lock();
try {
if (activeCount < maxActive) {
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
break;
} else {
discard = true;
}
} finally {
lock.unlock();
}
}
}
}
2.2.2 异步创建连接 🔗
如果没有同步创建连接,则接下来会通过 pollLast(nanos)
或 takeLast()
异步创建连接。
nanos
是根据 maxWait
计算出来的,如果设置了 maxWait
就会调用 pollLast(nanos)
,否则调用 takeLast()
。
private DruidPooledConnection getConnectionInternal(long maxWait) {
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
// ...
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
}
2.2.3 pollLast
和 takeLast
🔗
在 pollLast
方法中,如果当前连接池中的连接数 poolingCount 为 0,则调用 emptySignal
使用信号量 empty
,并创建异步线程去创建连接。
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
long estimate = nanos;
for (;;) {
if (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
// ...
}
}
}
然后,pollLast
自己再使用 notEmpty
信号量进入等待:
estimate = notEmpty.awaitNanos(estimate); // signal by
// recycle or
// creator
本次等待的最长时间 estimate
就是根据 maxWait
计算出来的。
在有连接被回收,会新创建出来后,就会被唤醒。如果唤醒后本线程没有成功创建连接,并且目前总的等待时间还没到maxWait
,就会再次进入 await。
如果等待时间到了 maxWait
,但依旧没有成功创建连接,即 poolingCount 为 0,则会返回 null。
if (poolingCount == 0) {
if (estimate > 0) {
continue;
}
return null;
}
通过信号创建的连接会被放入到连接池 connections 中,然后 pollLast(nanos)
收到创建成功的信号后,就可以从连接池中取出连接并返回了:
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
// ...
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
// ...
return last;
}
无参的 takeLast()
就简单很多了,它的流程和 pollLast(nanos)
几乎一致,唯一不同的是它没有等待超时时间,只有等待回收线程,会创建线程唤醒后,才会继续执行。
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
while (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
try {
notEmpty.await(); // signal by recycle or creator
}
}
}
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
}
2.2.4 emptySignal 🔗
使用信号异步创建连接的 emptySignal()
方法代码如下:
private void emptySignal() {
if (createScheduler == null) {
empty.signal();
return;
}
if (createTaskCount >= maxCreateTaskCount) {
return;
}
if (activeCount + poolingCount + createTaskCount >= maxActive) {
return;
}
submitCreateTask(false);
}
如果设置的创建线程池 createScheduler 为 null,则会发送一个信号去创建连接。否则创建一个 CreateConnectionTask
来创建连接。
这里发出的创建信号会被 CreateConnectionThread
接收,CreateConnectionThread
会在数据源初始化的时候创建。
public void init() throws SQLException {
// ...
createAndStartCreatorThread();
createAndStartDestroyThread();
}
收到信号后,会判断当前活跃连接数以及连接池中的连接数是否大于 maxActive,如果大于则不创建,避免创建出超过 maxActive 数量的连接;如果小于,则创建了物理连接,然后在调用 DruidDataSource 的 put 方法将连接放入连接池中。
public void run() {
for (;;) {
// ...
try {
boolean emptyWait = true;
if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}
if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}
if (emptyWait) {
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
empty.await();
}
// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue;
}
}
}
// ...
PhysicalConnectionInfo connection = null;
try {
connection = createPhysicalConnection();
}
// ...
boolean result = put(connection);
}
}
CreateConnectionTask
也是类似的逻辑。
2.2.5 DruidDataSource#put 🔗
在 put 方法中,首先会判断当前连接数是否大于最大连接数,如果小于,则会将连接放入到 connections
中,然后将 poolingCount 的值加一。
private boolean put(DruidConnectionHolder holder, long createTaskId) {
if (poolingCount >= maxActive) {
if (createScheduler != null) {
clearCreateTask(createTaskId);
}
return false;
}
// ...
connections[poolingCount] = holder;
incrementPoolingCount();
//...
}
2.2.6 小结 🔗
以上便是便是大致的连接创建流程。
总的来说就是,应用需要执行 SQL 的时候,就通过 getConnectionInternal
获取连接,其中创建连接分为同步和异步,异步则通过信号量创建连接。成功获取连接后还会将 activeCount
加一。
结合 1.2.1 中的异常信息,报错时 poolingCount
为 0, 所以会调用 emptySignal()
创建连接。但activeCount + poolingCount + createTaskCount
显然是大于 maxActive
的,因此 pollLast
最终会等待 maxWait
然后超时。所以最终 getConnectionInternal
抛出了 GetConnectionTimeoutException
异常。
所以接下来问题的关键就是,为什么 activeCount 会大于 maxActive?
2.3 连接是如何释放的? 🔗
要解答这个问题,我们就需要先知道 activeCount 什么时候减少。
前面已经知道了 activeCount 会在获取到连接后增加,那相应的,activeCount 也会在释放连接时减少。所以接下来就详细了解连接时如何释放的。
2.3.1 应用主动关闭连接 🔗
提到连接释放,首先想到的应该是我们在应用中主动调用 connection.close()
方法来释放连接。
那 connection.close()
会立即释放物理连接吗?显然是不会的。
public void close() throws SQLException {
// ...
recycle();
}
public void recycle() throws SQLException {
// ...
if (!this.abandoned) {
DruidAbstractDataSource dataSource = holder.getDataSource();
dataSource.recycle(this);
}
}
connection.close()
会最终会调用 dataSource.recycle(DruidPooledConnection pooledConnection)
释放连接。 那 dataSource.recycle
又是怎么实现的呢?
2.3.2 recycle 🔗
在 recyle 中会有一些列的连接状态判断,如物理连接是否已关闭等,然后决定是否立即释放连接。不过大部分情况下会运行到下面的逻辑,即如果连接状态是活跃的,则将 activeCount 减一,并将状态再设置为 false。最后再调用 putLast
。
/**
* 回收连接
*/
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
// ...
if (holder.active) {
activeCount--;
holder.active = false;
}
result = putLast(holder, currentTimeMillis);
// ...
}
在 putLast 中则会更新连接的上次活跃时间,然后将连接再度放回到连接池中,并将 poolingcount
加一。
boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
if (poolingCount >= maxActive || e.discard) {
return false;
}
e.lastActiveTimeMillis = lastActiveTimeMillis;
connections[poolingCount] = e;
incrementPoolingCount();
// ...
return true;
}
也就是说,通过 .close
方法主动释放连接,不会真的释放,而是将连接还回到连接池中。
2.3.3 连接释放 🔗
既然前面的 recycle
不会关闭连接,但连接池也不可能无限大,那连接到底是如何回收的呢?这就要提到 Druid 的连接释放任务了。
通 CreateConnectionThread
一样, Druid 在初始化数据源时会创建释放连接的线程 DestroyThread
,然后创建释放任务 DestroyTask。
当应用设置的释放线程池 destroyScheduler 不为空时,就会在 destroyScheduler 中运行 DestroyTask;否则会创建一个消费连接的线程池,然后在该线程池中运行释放连接的任务。与 createScheduler 类似,大部分情况下我们不会设置 destroyScheduler。
protected void createAndStartDestroyThread() {
destroyTask = new DestroyTask();
if (destroyScheduler != null) {
// ...
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
TimeUnit.MILLISECONDS);
return;
}
// ...
destroyConnectionThread = new DestroyConnectionThread(threadName);
destroyConnectionThread.start();
}
DestroyConnectionThread 本质上就是一个死循环,在循环中每隔 timeBetweenEvictionRunsMillis
执行一次释放连接的任务。
public class DestroyConnectionThread extends Thread {
// ...
public void run() {
for (;;) {
// 从前面开始删除
try {
// ...
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000); //
}
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
}
释放连接的关键逻辑就在 DestroyTask 中。
2.3.4 DestroyTask 🔗
在 DestroyTask 中,主要是通过 shrink
方法来释放连接。
public class DestroyTask implements Runnable {
public DestroyTask() {
}
@Override
public void run() {
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
所以接下来的关键就是 shrink(boolean checkTime, boolean keepAlive)