- 数据库连接池
- 关键技术点
- 项目背景
- 连接池功能点介绍
- MySQL Server参数介绍 及 涉及API
- 功能实现设计
- 连接池代码及详细介绍
- 连接池构造函数
- 获取单例
- 生产者线程
- 消费者接口
- 定时线程(回收连接)
- 压力测试
- 测试代码和配置文件:
- Connection.cpp *** 作代码
- ConnectionPool.cpp 整合代码
返回的是一个含connection的shared_ptr对象 关键技术点
MySQL数据库编程、单例模式、queue队列容器、C++11多线程编程、线程互斥、线程同步通信(mutex、条件变量)和 unique_lock、基于CAS的原子整形、智能指针shared_ptr、lambda表达式、生产者-消费者线程模型
项目背景为了提高MySQL数据库(基于C/S设计)的访问瓶颈,除了在服务器端增加缓存服务器缓存常用的数据 之外(例如redis),还可以增加连接池,来提高MySQL Server的访问效率,在高并发情况下,大量的 TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手所耗费的 性能时间也是很明显的,增加连接池就是为了减少这一部分的性能损耗。
本项目是为了在C/C++项目中,提供MySQL Server的访问效率,所实现基于C++代码的数据库连接池模块。
连接池功能点介绍连接池一般包含了数据库连接所用的ip地址、port端口号、用户名和密码以及其它的性能参数,例如初 始连接量,最大连接量,最大空闲时间、连接超时时间等,该项目是基于C++语言实现的连接池,主要 也是实现以上几个所有连接池都支持的通用基础功能。
初始连接量(initSize):表示连接池事先会和MySQL Server创建initSize个数的connection连接,当 应用发起MySQL访问时,不用再创建和MySQL Server新的连接,直接从连接池中获取一个可用的连接 就可以,使用完成后,并不去释放connection,而是把当前connection再归还到连接池当中。
最大连接量(maxSize):当并发访问MySQL Server的请求增多时,初始连接量已经不够使用了,此 时会根据新的请求数量去创建更多的连接给应用去使用,但是新创建的连接数量上限是maxSize,不能 无限制的创建连接,因为每一个连接都会占用一个socket资源,一般连接池和服务器程序是部署在一台 主机上的,如果连接池占用过多的socket资源,那么服务器就不能接收太多的客户端请求了。当这些连 接使用完成后,再次归还到连接池当中来维护。
最大空闲时间(maxIdleTime):当访问MySQL的并发请求多了以后,连接池里面的连接数量会动态 增加,上限是maxSize个,当这些连接用完再次归还到连接池当中。如果在指定的maxIdleTime里面, 这些新增加的连接都没有被再次使用过,那么新增加的这些连接资源就要被回收掉,只需要保持初始连 接量initSize个连接就可以了。
连接超时时间(connectionTimeout):当MySQL的并发请求量过大,连接池中的连接数量已经到达 maxSize了,而此时没有空闲的连接可供使用,那么此时应用从连接池获取连接无法成功,它通过阻塞 的方式获取连接的时间如果超过connectionTimeout时间,那么获取连接失败,无法访问数据库。 该项目主要实现上述的连接池四大功能,其余连接池更多的扩展功能,可以自行实现。
MySQL Server参数介绍 及 涉及APImysql函数api_MySQL API函数
MySQL 5.7 C API开发人员指南
mysql> show variables like ‘max_connections’; 该命令可以查看MySQL Server所支持的最大连接个数,超过max_connections数量的连接,MySQL Server会直接拒绝,所以在使用连接池增加连接数量的时候,MySQL Server的max_connections参数 也要适当的进行调整,以适配连接池的连接上限。
功能实现设计ConnectionPool.cpp和ConnectionPool.h:连接池代码实现
Connection.cpp和Connection.h:数据库 *** 作代码、增删改查代码实现
连接池主要包含了以下功能点:
1.连接池只需要一个实例,所以ConnectionPool以单例模式进行设计
2.从ConnectionPool中可以获取和MySQL的连接Connection
3.空闲连接Connection全部维护在一个线程安全的Connection队列中,使用线程互斥锁保证队列的线程安全
4.如果Connection队列为空,还需要再获取连接,此时需要动态创建连接,上限数量是maxSize
5.队列中空闲连接时间超过maxIdleTime的就要被释放掉,只保留初始的initSize个连接就可以了,这个功能点肯定需要放在独立的线程中去做
6.如果Connection队列为空,而此时连接的数量已达上限maxSize,那么等待connectionTimeout时间 如果还获取不到空闲的连接,那么获取连接失败,此处从Connection队列获取空闲连接,可以使用带 超时时间的mutex互斥锁来实现连接超时时间
7.用户获取的连接用shared_ptr智能指针来管理,用lambda表达式定制连接释放的功能(不真正释放连接,而是把连接归还到连接池中)
8.连接的生产和连接的消费采用生产者-消费者线程模型来设计,使用了线程间的同步通信机制条件变量和互斥锁
连接池代码及详细介绍主要功能为6个函数
获取实例、加载配置项、构造函数、
生产者线程(连接用完了就补充一点)、消费者(接口)、定时线程(回收资源)
shared_ptr 析构 自定义删除器,回线程
class ConnectionPool { public: // 获取连接池对象实例 static ConnectionPool* getConnectionPool(); // 给外部提供接口,从连接池中获取一个可用的空闲连接 shared_ptr连接池构造函数getConnection(); // 接口采用RAII,不需要用户管理资源 private: // 单例#1 构造函数私有化 ConnectionPool(); // 从配置文件中加载配置项 bool loadConfigFile(); // 运行在独立的线程中,专门负责生产新连接 void produceConnectionTask(); // 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收 void scannerConnectionTask(); string _ip; // mysql的ip地址 unsigned short _port; // mysql的端口号 3306 string _username; // mysql登录用户名 string _password; // mysql登录密码 string _dbname; // 连接的数据库名称 int _initSize; // 连接池的初始连接量 int _maxSize; // 连接池的最大连接量 int _maxIdleTime; // 连接池最大空闲时间 int _connectionTimeout; // 连接池获取连接的超时时间 queue _connectionQue; // 存储mysql连接的队列 mutex _queueMutex; // 维护连接队列的线程安全互斥锁 atomic_int _connectionCnt; // 记录连接所创建的connection连接的总数量 condition_variable cv; // 设置条件变量,用于连接生产线程和连接消费线程的通信 };
// 连接池的构造 ConnectionPool::ConnectionPool() { // 加载配置项了 if (!loadConfigFile()) { return; } // 创建初始数量的连接 for (int i = 0; i < _initSize; ++i) { Connection* p = new Connection(); p->connect(_ip, _port, _username, _password, _dbname); p->refreshAliveTime(); // 刷新一下开始空闲的起始时间 _connectionQue.push(p); _connectionCnt++; } // 启动一个新的线程,作为连接的生产者 linux thread => pthread_create thread produce(std::bind(&ConnectionPool::produceConnectionTask, this)); produce.detach(); // 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收 thread scanner(std::bind(&ConnectionPool::scannerConnectionTask, this)); scanner.detach(); }获取单例
// 线程安全的懒汉单例函数接口 调用的时候再构造 ConnectionPool* ConnectionPool::getConnectionPool() { static ConnectionPool pool; // lock和unlock return &pool; }生产者线程
// 运行在独立的线程中,专门负责生产新连接 void ConnectionPool::produceConnectionTask() { for (;;) { unique_lock消费者接口lock(_queueMutex); while (!_connectionQue.empty()) { cv.wait(lock); // 队列不空,此处生产线程进入等待状态,释放锁 } // 连接数量没有到达上限,继续创建新的连接 if (_connectionCnt < _maxSize) { Connection* p = new Connection(); p->connect(_ip, _port, _username, _password, _dbname); p->refreshAliveTime(); // 刷新一下开始空闲的起始时间 _connectionQue.push(p); _connectionCnt++; } // 通知消费者线程,可以消费连接了 cv.notify_all(); } }
// 给外部提供接口,从连接池中获取一个可用的空闲连接 shared_ptr定时线程(回收连接)ConnectionPool::getConnection() { unique_lock lock(_queueMutex); while (_connectionQue.empty()) { // sleep if (cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connectionTimeout))) { if (_connectionQue.empty()) { LOG("获取空闲连接超时了...获取连接失败!"); return nullptr; } } } shared_ptr sp(_connectionQue.front(), [&](Connection* pcon) { // 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全 *** 作 unique_lock lock(_queueMutex); pcon->refreshAliveTime(); // 刷新一下开始空闲的起始时间 _connectionQue.push(pcon); }); _connectionQue.pop(); cv.notify_all(); // 消费完连接以后,通知生产者线程检查一下,如果队列为空了,赶紧生产连接 return sp; }
// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收 void ConnectionPool::scannerConnectionTask() { for (;;) { // 通过sleep模拟定时效果 this_thread::sleep_for(chrono::seconds(_maxIdleTime)); // 扫描整个队列,释放多余的连接 unique_lock压力测试lock(_queueMutex); while (_connectionCnt > _initSize) { Connection* p = _connectionQue.front(); if (p->getAliveTime() >= (_maxIdleTime * 1000)) { _connectionQue.pop(); _connectionCnt--; delete p; // 调用~Connection()释放连接 } else { break; // 队头的连接没有超过_maxIdleTime,其它连接肯定没有 } } } }
开发平台选型
有关MySQL数据库编程、多线程编程、线程互斥和同步通信 *** 作、智能指针、设计模式、容器等等这些 技术在C++语言层面都可以直接实现,因此该项目选择直接在windows平台上进行开发,当然放在 Linux平台下用g++也可以直接编译运行。
验证数据的插入 *** 作所花费的时间,第一次测试使用普通的数据库访问 *** 作,第二次测试使用带连接池 的数据库访问 *** 作,对比两次 *** 作同样数据量所花费的时间,性能压力测试结果如下(电脑配置不太高——Intel® Core™ i5-8300H CPU @ 2.30GHz 2.30 GHz);
(不同电脑的降低比率不同)
测试代码和配置文件:- 依赖关系
#数据库连接池的配置文件 ip=127.0.0.1 port=3306 username=root password=12345678 dbname=chat initSize=10 maxSize=1024 #最大空闲时间默认单位是秒 maxIdleTime=60 #连接超时时间单位是毫秒 connectionTimeOut=100
CREATE DATAbase chart; CREATE TABLE IF NOT EXISTS `user`( `id` INT(11) UNSIGNED AUTO_INCREMENT, `name` VARCHAR(50), `age` VARCHAR(11), `sex` enum('male', 'female'), PRIMARY KEY ( `id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8;
#define _CRT_SECURE_NO_WARNINGS #include "pch.h" #includeusing namespace std; #include "Connection.h" #include "CommonConnectionPool.h" void f() { ConnectionPool *cp = ConnectionPool::getConnectionPool(); for (int i = 0; i < 5000; ++i) { char sql[1024] = { 0 }; sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')", "zhang san", 20, "male"); shared_ptr sp = cp->getConnection(); sp->update(sql); } } int main() { // Connection conn; // conn.connect("127.0.0.1", 3306, "root", "12345678", "chat"); #if 1 clock_t begin = clock(); f(); clock_t end = clock(); std::cout << (end - begin) << "ms" << endl; #endif #if 1 clock_t begin4 = clock(); thread t1(f); thread t2(f); thread t3(f); thread t4(f); t1.join(); t2.join(); t3.join(); t4.join(); clock_t end4 = clock(); std::cout << (end4 - begin4) << "ms" << endl; #endif return 0; }
Connection.cpp *** 作代码懒汉式单例类.在第一次调用的时候实例化自己 !
饿汉式单例类.在类初始化时,已经自行实例化
单例模式 Singleton 就是保证一个类只有一个实例
#include "pch.h" #include "public.h" #include "Connection.h" #includeConnectionPool.cpp 整合代码using namespace std; Connection::Connection() { // 初始化数据库连接 _conn = mysql_init(nullptr); } Connection::~Connection() { // 释放数据库连接资源 if (_conn != nullptr) mysql_close(_conn); } bool Connection::connect(string ip, unsigned short port, string username, string password, string dbname) { // 连接数据库 MYSQL* p = mysql_real_connect(_conn, ip.c_str(), username.c_str(), password.c_str(), dbname.c_str(), port, nullptr, 0); return p != nullptr; } bool Connection::update(string sql) { // 更新 *** 作 insert、delete、update if (mysql_query(_conn, sql.c_str())) { LOG("更新失败:" + sql); return false; } return true; } MYSQL_RES* Connection::query(string sql) { // 查询 *** 作 select if (mysql_query(_conn, sql.c_str())) { LOG("查询失败:" + sql); return nullptr; } return mysql_use_result(_conn); }
// Connection.h #pragma once #include#include #include #include #include #include #include #include #include using namespace std; #include "Connection.h" class ConnectionPool { public: static ConnectionPool* getConnectionPool(); shared_ptr getConnection(); private: ConnectionPool(); bool loadConfigFile(); void produceConnectionTask(); void scannerConnectionTalk(); string _ip; unsigned short __port; string _username; string _dbname; int _initsize; int _maxsize; int _maxIdleTime; int _connectionTimeout; queue _connectionQue; mutex _queueMutex; atmoix_int _connectionCnt; condition_variable cv; } // ConnectionPool.cpp 函数实现 #include "pch.h" #include "CommonConnectionPool.h" #include "public.h" ConnectionPool* ConnectionPool::getConnectionPool() { statuc ConnectionPool pool; return &pool; } bool ConnectionPool::loadConfigFile() { FILE* pf = fopen("mysql.ini", "r"); if (pf == nullptr) { LOG("mysql.ini file dose not exsit!"); return false; } while (!feof(pf)) { char line[1024] = { 0 }; fgets(line, 1024, pf); string str = line; int idx = str.find('=', 0); if (idx == -1) // 无效的配置项 { continue; } int endidx = str.find('n', idx); string key = str.substr(0, idx); string value = str.substr(idx + 1, endidx - idx - 1); if (key == "ip") { _ip = value; } else if (key == "port") { _port = atoi(value.c_str()); } else if (key == "password") { _password = value; } else if (key = "dbname") { _dbname = value; } else if (key == "initSize") { _initSize = atoi(value.c_str()); } else if (key == "maxSize") { _maxSize = atoi(value.c_str()); } else if (key == "maxIdleTime") { _maxIdleTime = atoi(value.c_str()); } else if (key == "connectionTimeOut") { _connectionTimeout = atoi(value.c_str()); } } return true; } ConnectionPool::ConnetionPool() { if (!loadConfigFile()) { return ; } for (int i = 0; i < _initSize; ++i) { Connection* p = new Connection(); p->connect(_ip, port, _username, _password, _dbname); p->refreshAliveTime(); _connectionQue.push(p); _connectionCnt++; } thread produce(std::bind(&ConnectionPool::produceConnectionTask, this)); produce.detach(); thread scanner(std::bind(&ConnectionPool::scannerConnectionTask, this)); scanner.detach(); } void ConnectionPool::produceConnectionTask() { for (;;) { unique_lock lock(_queueMutex); while (!_connectionQue.empty()) { cv.wait(lock); } if (_connection < _maxSize) { Connection* p = new Conncetion(); p->connect(_ip, _port, _username, _password, _dbname); p->refershAlibeTime(); _connectionQue,push(p); _connectionCnt++; } cv.notify_all(); } } shared_ptr ConnectionPool::getConnection() { unique_lock lock(_queueMutex); while (_connectionQue.empty()) { if (cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connecionTimeout))) { if (_connectionQue.empty()) { LOG("获取空闲连接超时了...获取连接失败!"); return nullptr; } } } shared_ptr sp(_conncetionQue.front(), [&](Connection* pcon) { unique_lock lock(_queMutex); pcon->refershAliveTime(); _connectionQue.push(pcon); }); _connecitonQue.pop(); cv.notify_all(); return sp; } void ConnectionPool::scannerConnectionTask() { for (;;) { this_thread::sleep_for(chrono::seconds(maxIdleTime)); unique_lock lock(_queueMutex); while (_connectionCnt > _initSize) { Connection* p = _connectionQue.front(); if (p->getAliveTime() >= (_maxIdleTime * 1000)) { _connectionQue.pop(); _conectionCnt--; delete p; } else { break; } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)