手写简易数据库连接池

简要说明

任务

用超时等待模式构造一个简单的数据库连接池,模拟从连接池中获取、使用、释放连接的过程。

手段

  1. 将客户端获取连接的过程设定为超时等待的模式,即在1000ms内如果无法获取到可用连接,将返回null给客户端;
  2. 设定连接池的大小为10,通过调节客户端连接的线程数模拟无法获取连接的场景;

代码实现

ConnectionPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import java.sql.Connection;
import java.util.LinkedList;

/**
* <p>定义连接池,用构造函数初始化连接的最大上限,用一个双向队列维护连接,
* 调用方需要先调用fetConnection(long)方法指定在多少ms内超时获取连接,
* 再调用releaseConnection(Connection)方法将连接放回线程池。
* <p>
* Created by hank on 8/5/19
*/
public class ConnectionPool {
// 定义一个双向队列,用来维护连接
private LinkedList<Connection> pool = new LinkedList<>();

// 构造函数初始化连接上限initialSize
public ConnectionPool(int initialSize) {
// initialSize小于0没有意义
if (initialSize > 0) {
for (int i = 0; i < initialSize; i++) {
// 从尾部依次放入连接池中
pool.addLast(ConnectionDriver.createConnection());
}
}
}

// 将连接返回线程池
public void releaseConnection(Connection connection) {
if (connection != null) {
// 锁定信号量pool
synchronized (pool) {
// 连接释放后需要进行通知,这样其他待连接的线程才能知道线程池中已经空闲了一个连接
pool.addLast(connection);
pool.notifyAll();
}
}
}

// 在指定时间内为获取到连接就返回null
public Connection fetchConnection(long mills) throws InterruptedException {
// 锁定信号量pool
synchronized (pool) {
// 未设置>0的等待时间,需要一直等待连接
if (mills <= 0) {
while (pool.isEmpty()) {// 线程池没有空闲连接,需要等待
pool.wait();
}
// 当有空闲连接时,返回空闲连接
return pool.removeFirst();
} else {
// 超时的时间刻度
long future = System.currentTimeMillis() + mills;
// 剩余超时时间
long remaining = mills;
// 当没有空闲连接而且还没有超时
while (pool.isEmpty() && remaining > 0) {
// 继续等待remaining个时长
pool.wait(remaining);
// 更新剩余等待时长为0
remaining = future - System.currentTimeMillis();
}
// 定义返回变量
Connection result = null;

// 两种情况
// 1. pool不为空直接下来执行,此时remaining>0
// 2. 经过remaining时长的等待,pool已经不为空了
if (!pool.isEmpty()) {// 返回一个连接
result = pool.removeFirst();
}
// 经过等待后pool依然为空,就返回null
return result;
}
}
}
}
ConnectionDriver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;

/**
* <p>由于java.sql.Connection是一个接口,最终得依靠数据库驱动(jar包)的方式实现。
* 简单起见,这里通过动态代理构造一个Connection,该Connection的代理实现为commit()方法调用休眠100ms。
* <p>
* Created by hank on 8/5/19
*/
public class ConnectionDriver {

/*InvocationHandler是一个代理接口,每一个代理实例都关联了一个调用处理器(invocation handler)。
当方法被代理实例调用时(invoked),方法的调用(invocation)将被编码和发送到方法的调用处理器。*/
static class ConnectionHandler implements InvocationHandler {

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 对commit()方法进行操作
if (method.getName().equals("commit")) {
// 休眠100毫秒
TimeUnit.MILLISECONDS.sleep(100);
}
return null;
}
}

// 创建Connection的代理,在commit时休眠100毫秒
public static final Connection createConnection() {
ClassLoader loader = ConnectionDriver.class.getClassLoader();
// Class<?>[]
Class<?>[] interfaces = new Class<?>[]{Connection.class};
InvocationHandler h = new ConnectionHandler();

/* 为指定的接口返回代理类的实例,该实例将使用指定的调用处理器调用方法,有3个参数分别如下
loader是定义代理类的class loader
interfaces是一个代理类实现的列表
h是调用处理器,来发送方法调用*/
return (Connection) Proxy.newProxyInstance(loader, interfaces, h);
}
}
ConnectionPoolTest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
* <p>测试简易数据库连接池的工作情况,模拟客户端ConnectionRunner获取、使用、释放连接过程。
* Created by hank on 8/5/19
*/
public class ConnectionPoolTest {
// 定义线程池最大连接个数
static ConnectionPool pool = new ConnectionPool(10);

/* 保证所有客户端ConnectionRunner能够同时开始,来竞争线程池资源
CountDownLatch构造函数接收int类型的参数作为计数器,
想等待N个点完成(countDown()方法可以用在任何地方,所以N个点即可指N个线程,也可以是N个执行步骤。
用在多个线程时,只需把CountDownLatch的引用传递到线程即可),就传入N。
当调用countDown()方法是,N会减1;await()方法会阻塞当前线程,直到N变为0*/
static CountDownLatch start = new CountDownLatch(1);

// main线程将等待所有ConnectionRunner结束后,才继续执行
static CountDownLatch end;

public static void main(String[] args) throws InterruptedException {
// 线程数量
int threadCount = 20;
end = new CountDownLatch(threadCount);

// count为进行模拟操作的次数,总获取次数=threadCount*count;
int count = 20;

// 获取到连接的次数
AtomicInteger got = new AtomicInteger();
// 未获取到连接的次数
AtomicInteger notGot = new AtomicInteger();
// 依次创建threadCount个连接
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread");
// 执行start()方法,但是run里面还是在start.await(),都还在阻塞中,等待计时器清零再同步开始
thread.start();
}
// start 计数器变为0 创建线程工作完成
start.countDown();


// 阻塞当前线程,转而执行上面创建的10线程,等到计数器为0后,再打印下面三条信息
end.await();

System.out.println("总获取次数:" + threadCount * count);
System.out.println("获取到次数:" + got);
System.out.println("未获取到次数:" + notGot);

}

private static class ConnectionRunner implements Runnable {
// 对应的成员变量
int count;
AtomicInteger got;
AtomicInteger notGot;

// 构造函数
public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
this.count = count;
this.got = got;
this.notGot = notGot;
}

// 实现接口方法
@Override
public void run() {
try {
// 阻塞当前线程,都在等待start的计数器,等清零0一块儿开始
start.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
while (count > 0) {
try {
// 从线程池中获取连接,如果1000ms内无法获取到,就返回null
// 分别统计连接获取的数量got和未获取到数量notGot
Connection connection = pool.fetchConnection(1000);
if (connection != null) {
try {
// 模拟创建声明
connection.createStatement();
// 模拟提交,实际为通过代理休眠了100ms
connection.commit();
} catch (SQLException e) {
e.printStackTrace();
} finally {
// 完成后释放连接
pool.releaseConnection(connection);
// 将获取到连接次数变量+1
got.incrementAndGet();
}
} else if (connection == null) {
// 未获取到连接,nogGot+1
notGot.incrementAndGet();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 模拟操作次数-1
count--;
}

}
// 模拟完成
end.countDown();
}
}
}

测试

一次运行结果展示:

1
2
3
4
5
总获取次数:200
获取到次数:200
未获取到次数:0

Process finished with exit code 0

当前代码实现的是10个线程同时运行获取连接池(10个连接不变)的情况,现可通过设定不同线程数量来观察获取连接的情况。具体线程数量总获取次数的设定情况,以及测试结果得到的获取到的次数未获取到的次数未获取到的比例均参见下表:
说明:本机测试环境CPU:i5-3210M,内存:8G,不同环境可能测试结果会有偏差
线程数量与连接获取的关系

线程数量 总获取次数 获取到次数 未获取到次数 未获取到比例
10 200 200 0 0.00%
20 400 388 12 3.00%
30 600 556 44 7.30%
40 800 694 106 13.25%
50 1000 820 180 18.00%

分析
从上表可以看出随着获取连接线程的逐步增多,在保持线程池内10个连接的情况下,未获取到连接次数的比例也从0%到7.3%再到18%逐步增多。

码哥 wechat
欢迎关注个人订阅号:「码上行动GO」