现在我们的项目已经实现了异步类XThread,数据库类XMylsq,验证了std::function和std::bind实现回调的功能,现在我们只需要把他们组在一起就行。
2.代码实现 ①这里先把我的代码分享一下链接:https://pan.baidu.com/s/16fDkpC_g3QVgHTocxygDSQ
提取码:su1a
新增异步回调结果XMysqlAsynRes,新增回调 *** 作信息XMysqlAsynData,实现异步 *** 作虚函数virtual bool exec(),新增异步 *** 作函数 bool exeAsynSql(const char* sql, std::function
主要原理是,主进程加载一个异步数据库,执行流程和主进程分离开。mysql流程在exec()函数中阻塞,不断地去遍历队列的数据, *** 作完之后就进入等待状态。主进程可以通过调用exeAsynSql()接口将 *** 作放入队列中,等待 *** 作结果。
XMysql.h和XMysql.cpp
/**********************************************************
* Author : 谢名聪
* Email : 869408604@qq.com
* Last modified : 2022-04-21 11:45
* Filename : XMysql.h
* Description : xsqly类,提供同步异步两种 *** 作
* *******************************************************/
#ifndef X_MYSQL
#define X_MYSQL
#include "../XThread/XThread.h"
#include "mysqlToProtobuf.h"
#include "../include/mysql/mysql.h"
#include
#include
#include
#include
struct XMysqlRes
{
bool suc = false;
std::vector> res;
};
struct XMysqlAsynRes
{
XMysqlAsynRes() = default;
~XMysqlAsynRes() {
//释放掉mysql的数据
if (mysqlRes) {
mysql_free_result(mysqlRes);
}
}
XMysqlRes xMysqlRes;
MYSQL_RES* mysqlRes;
template
bool getProtoRes(std::vector>& res) {
//重置遍历的行数
mysql_data_seek(mysqlRes, 0);
mysqlToProtobuf(mysqlRes, res);
}
};
struct XMysqlAsynData
{
std::function fun;
const char* sql;
};
class XMysql : public XThread
{
public:
virtual bool exec() override;
public:
//初始化数据库
bool init(const char* host, const char* port, const char* user, const char* pwd, const char* db, const uint32_t timeout = 2);
//---------------------------------------------------------
//同步 *** 作
//--------------------------------------------------------
public:
//有返回的 *** 作
XMysqlRes exeSqlStore(const char* sql);
//无返回的 *** 作
bool exeSql(const char* sql);
//定义一个模板,针对不同的protobuf结构
template
//这里必须再类内实现,在类外实现的话,进行外部链接的时候会出错
bool exeSqlProtobuf(const char* sql, std::vector>& protoRes)
{
if (!exeSql(sql)) {
std::cout << "XMysql exeSqlStore error! sql=" << sql << std::endl;
return false;
}
//转成protobuf形式
mysqlToProtobuf(mysql_store_result(&m_mysql), protoRes);
return true;
}
//校验连接状态
void check();
//连接数据库
bool connect();
//---------------------------------------------
//异步操作
//---------------------------------------------
public:
bool exeAsynSql(const char* sql, std::function fun);
private:
MYSQL m_mysql;
std::string m_host = "";
std::string m_user = "";
std::string m_pwd = "";
std::string m_db = "";
uint32_t m_port = 0;
uint32_t m_timeout = 0;
//异步处理
private:
std::queue m_sqlQueue;//需要处理的队列
};
#endif
#include "XMysql.h"
bool XMysql::init(const char* host, const char* port, const char* user, const char* pwd, const char* db, const uint32_t timeout)
{
m_host = host;
m_port = std::atoi(port);
m_user = user;
m_pwd = pwd;
m_db = db;
m_timeout = timeout;
if (connect()) {
std::cout << "XMysql connect error" << std::endl;
return false;
}
return true;
}
bool XMysql::exeSql(const char* sql)
{
if (mysql_real_query(&m_mysql,sql,strlen(sql)) != 0) {
std::cout << "XMysql exeSql error! sql=" << sql << std::endl;
return false;
}
return true;
}
XMysqlRes XMysql::exeSqlStore(const char* sql)
{
XMysqlRes res;
check();
if (!exeSql(sql)) {
std::cout << "XMysql exeSqlStore error! sql=" << sql << std::endl;
return res;
}
//执行成功
res.suc = true;
//查询结果
auto result = mysql_store_result(&m_mysql);
//行数
auto rowNum = mysql_num_rows(result);
//列数
auto fiedNum = mysql_num_fields(result);
for (int i = 0; i < rowNum; i++) {
auto row = mysql_fetch_row(result);
std::vector rows;
for (int j = 0; j < fiedNum; j++) {
rows.push_back(row[j]);
}
res.res.push_back(rows);
}
return res;
}
void XMysql::check()
{
//ping返回0时表示正常
if (mysql_ping(&m_mysql) != 0) {
std::cout << "XMysql ping == 0" << std::endl;
//关闭mysql
mysql_close(&m_mysql);
return;
}
//重新连接
connect();
}
bool XMysql::connect()
{
//初始化
if (!mysql_init(&m_mysql)) {
std::cout << "XMysql init error!" << std::endl;
return false;
}
//连接数据库
if (!mysql_real_connect(&m_mysql, m_host.c_str(), m_user.c_str(), m_pwd.c_str(), m_db.c_str(), m_port, NULL, CLIENT_MULTI_STATEMENTS)) {
std::cout << "XMysql connect error!" << std::endl;
return false;
}
//设置参数
mysql_options(&m_mysql, MYSQL_OPT_READ_TIMEOUT, &m_timeout);
mysql_options(&m_mysql, MYSQL_SET_CHARSET_NAME, "utf8");
}
bool XMysql::exeAsynSql(const char* sql, std::function fun)
{
auto info = new XMysqlAsynData;
info->sql = sql;
info->fun = fun;
m_sqlQueue.push(info);
}
bool XMysql::exec()
{
XMysqlAsynData* info = nullptr;
while (status()) {
if (m_sqlQueue.size() > 0) {
info = m_sqlQueue.front();
m_sqlQueue.pop();
XMysqlRes res;
check();
if (!exeSql(info->sql)) {
std::cout << "XMysql exeAsynSql error! sql=" << info->sql << std::endl;
continue;
}
//执行成功
res.suc = true;
//查询结果
auto result = mysql_store_result(&m_mysql);
//行数
auto rowNum = mysql_num_rows(result);
//列数
auto fiedNum = mysql_num_fields(result);
for (int i = 0; i < rowNum; i++) {
auto row = mysql_fetch_row(result);
std::vector rows;
for (int j = 0; j < fiedNum; j++) {
rows.push_back(row[j]);
}
res.res.push_back(rows);
}
XMysqlAsynRes* asynRes = new XMysqlAsynRes();
asynRes->xMysqlRes = res;
asynRes->mysqlRes = result;
mysql_free_result(result);
if (info->fun) {
info->fun(asynRes);
} else {
std::cout << "info->fun = nullptr" << std::endl;
}
}
}
return true;
}
③在进程基类加入异步数据库
异步数据库实现后,将他加入主进程类Process中,加入异步mysql类成员m_asynXMysqls,并实现初始化和调用接口,增加异步工具管理容器m_asynTools,在结束程序的时候,可以调用这个工具,结束掉分离出去的异步工具。
Process.h和Process.cpp
/**********************************************************
* Author : 谢名聪
* Email : 869408604@qq.com
* Last modified : 2022-04-21 11:42
* Filename : Process.h
* Description : 进程的基类
* *******************************************************/
#ifndef PROCESS_H
#define PROCESS_H
#include "XInclude/XMysql/XMysql.h"
#include "XInclude/XConfig/XConfig.h"
class Process
{
public:
Process();
virtual ~Process();
//进程子类需要实现的逻辑
private:
virtual bool initProcess() = 0;
virtual bool startProcess() = 0;
virtual bool stopProcess() = 0;
//main.cpp中调用的逻辑
public:
bool init(const std::string config);
bool start();
bool stop();
bool startAsynTools();
bool stopAsynTools();
//*********************************************************
//工具类:包括config socket redis mysql timer log等
//*********************************************************
//---------------------------------------------------------
//config
//--------------------------------------------------------
private:
std::shared_ptr m_config;
public:
//获取配置
std::string getConfigValue(const std::string groupKey, const std::string key);
//-----------------------------------------------------------
//同步数据库,只 *** 作一些简单的逻辑
//-----------------------------------------------------------
private:
std::map> m_XMysqls;
public:
//增加数据库服务
bool addMysqlServer(uint32_t id, const char* host, \
const char* port, const char* user, \
const char* pwd, const char* db, \
uint32_t timeout = 2);
//无返回值
bool exeSql(uint32_t id, const char* sql);
//返回XMysql
XMysqlRes exeSqlStore(uint32_t id, const char* sql);
//返回protobuf
template
bool exeSqlProtobuf(uint32_t id, const char* sql, std::vector>& protoRes)
{
auto it = m_XMysqls.find(id);
if (it == m_XMysqls.end()) {
return false;
}
return m_XMysqls[id]->exeSqlProtobuf(sql, protoRes);
}
//------------------------------------------------------------
//异步mysql 异步类型 有一些不需要马上得到结果的 *** 作可以在这完成
//-----------------------------------------------------------
private:
std::map> m_asynXMysqls;
public:
//增加一个异步mysql服务
bool addAsynMysqlServer(uint32_t id, const char* host, \
const char* port, const char* user, \
const char* pwd, const char* db, \
uint32_t timeout = 20);
//加入一个异步 *** 作
bool exeAsynSql(uint32_t id, const char* sql, std::function func = nullptr);
public:
std::string m_name = "";
uint32_t m_type = 0;
uint32_t m_id = 0;
std::vector> m_asynTools;
};
extern Process* g_process;
#endif
#include "Process.h"
Process* g_process = nullptr;
Process::Process()
{
if (!g_process) {
g_process = this;
}
if (!m_config) {
m_config = std::make_shared();
}
}
Process::~Process()
{
}
bool Process::init(const std::string config)
{
try {
// 加载配置
if (!m_config->load(config)) {
std::cout << "load config error" << std::endl;
return false;
}
// 初始化进程
if (!initProcess()) {
std::cout << "initProcess error!" << std::endl;;
return false;
}
//开启异步工具
startAsynTools();
} catch (std::exception& ex) {
std::cout << "Process, init ex= " << ex.what() << std::endl;
}
std::cout << "Process, init success!" << std::endl;
return true;
}
bool Process::start()
{
try {
// 开启服务逻辑
if (!startProcess()) {
std::cout << "startProcess error!" << std::endl;
return false;
}
} catch (std::exception& ex) {
std::cout << "PROCESS, start ex = " << ex.what() << std::endl;
}
std::cout << "PROCESS, start success!" << std::endl;
return true;
}
bool Process::stop()
{
try {
//关闭工具
stopAsynTools();
//关闭服务逻辑
stopProcess();
std::cout << "PROCESS, " << m_name << "stopped" << std::endl;;
} catch (std::exception& ex) {
std::cout << "stop ex, " << ex.what() << std::endl;
}
return true;
}
std::string Process::getConfigValue(const std::string groupKey, const std::string key)
{
return m_config->getConfigValue(groupKey, key);
}
bool Process::addMysqlServer(uint32_t id, const char* host, \
const char* port, const char* user, \
const char* pwd, const char* db, \
uint32_t timeout)
{
auto it = m_XMysqls.find(id);
if (it != m_XMysqls.end()) {
return true;
}
auto mysql = std::make_shared ();
m_XMysqls[id] = mysql;
return mysql->init(host, port, user, pwd, db, timeout);
}
bool Process::exeSql(uint32_t id, const char* sql)
{
auto it = m_XMysqls.find(id);
if (it == m_XMysqls.end()) {
return false;
}
return m_XMysqls[id]->exeSql(sql);
}
XMysqlRes Process::exeSqlStore(uint32_t id, const char* sql)
{
auto it = m_XMysqls.find(id);
if (it == m_XMysqls.end()) {
return XMysqlRes();
}
return m_XMysqls[id]->exeSqlStore(sql);
}
bool Process::addAsynMysqlServer(uint32_t id, const char* host, \
const char* port, const char* user, \
const char* pwd, const char* db, \
uint32_t timeout)
{
auto it = m_asynXMysqls.find(id);
if (it != m_asynXMysqls.end()) {
return true;
}
auto mysql = std::make_shared ();
if (!mysql->init(host, port, user, pwd, db, timeout)) {
std::cout << "init asynSql error" << std::endl;
return false;
}
std::cout << "addAsynMysqlServer" << std::endl;
m_asynXMysqls[id] = mysql;
m_asynTools.push_back(mysql);
return true;
}
bool Process::exeAsynSql(uint32_t id, const char* sql, std::function func)
{
auto it = m_asynXMysqls.find(id);
if (it == m_asynXMysqls.end()) {
std::cout << "not find m_asynXMsql id = " << id << std::endl;
return false;
}
std::cout << "exeAsynSql " << std::endl;
return m_asynXMysqls[id]->exeAsynSql(sql, func);
}
bool Process::startAsynTools()
{
for (auto tool : m_asynTools) {
tool->run();
}
return true;
}
bool Process::stopAsynTools()
{
for (auto tool : m_asynTools) {
tool->stop();
}
return true;
}
④写一个测试看看 *** 作结果
在XProcess中,增加回调函数void testAsynSql(XMysqlAsynRes* res), 在initProcess()中初始化异步数据库,在startProcess()中调用异步数据库。
XProcess.cpp
#include "XProcess.h"
#include "../../proto/src/User.pb.h"
#include
#include
XProcess xprocess;
bool XProcess::initProcess()
{
//加载数据库
std::string host = getConfigValue("mysql", "host");
std::string port = getConfigValue("mysql", "port");
std::string user = getConfigValue("mysql", "user");
std::string pwd = getConfigValue("mysql", "pwd");
std::string db = getConfigValue("mysql", "db");
std::string sqlType = getConfigValue("mysql", "type");
uint32_t type = std::atoi(sqlType.c_str());
if (!addMysqlServer(type, host.c_str(), port.c_str(), user.c_str(), pwd.c_str(), db.c_str())) {
std::cout << "addMysqlServer error" << std::endl;
return false;
}
//加载异步数据库
if (!addAsynMysqlServer(type, host.c_str(), port.c_str(), user.c_str(), pwd.c_str(), db.c_str())) {
std::cout << "addAsynMysqlServer error" << std::endl;
return false;
}
//加载服务名字和类型
std::string serverName = getConfigValue("server", "name");
std::string serverType = getConfigValue("server", "type");
std::string serverId = getConfigValue("server", "id");
m_name = serverName;
m_type = std::atoi(serverType.c_str());
m_id = std::atoi(serverId.c_str());
return true;
}
bool XProcess::startProcess()
{
while (m_stop == false) {
/*
//获取系统时间戳
time_t timeReal;
time(&timeReal);
timeReal = timeReal + 8 * 3600;
tm* t = gmtime(&timeReal);
printf("%d-%02d-%02d %02d:%02d:%02d\n", t->tm_year + 1900, t->tm_mon + 1, t->tm_mday, t->tm_hour, t->tm_min, t->tm_sec);
*/
std::string sql = "select * from user_data;";
std::cout << "exeAsynSql():" << sql << std::endl;
exeAsynSql(1, sql.c_str(), std::bind(&XProcess::testAsynSql, this, std::placeholders::_1));
sleep(100);
};
return true;
}
bool XProcess::stopProcess()
{
std::cout << "stop " << m_name << std::endl;
m_stop = true;
return true;
}
void XProcess::testAsynSql(XMysqlAsynRes* res)
{
if (!res) {
std::cout << "xmysqlAsyRes == nullptr" << std::endl;
}
std::cout << "XMysqlRes ------------" << std::endl;
auto xRes = res->xMysqlRes;
for (auto res : xRes.res) {
for (auto row : res) {
std::cout << row << " ";
}
std::cout << std::endl;
}
std::cout << "protoRes------------------- " << std::endl;
std::vector> users;
res->getProtoRes(users);
for (auto u : users) {
std::cout << u->id() << u->nickname() << std::endl;
}
}
⑤看一下测试结果
3.小结
①统一管理这些异步工具
在关闭和运行这些工具的时候,需要统一处理,这样可以让流程更清晰,如果主进程关闭,工具没有关闭,那整个进程就会陷入死锁状态。
②数据库 *** 作的坑
mysql提供访问数据库结果的接口是mysql_fetch_row(MYSQL_RES*),这个函数会根据结果集,从头到尾访问,如果访问到结尾了,那需要重置一下访问的光标位置,这里就要用到mysql_data_seek(MYSQL_RES* , uint32_t row);函数,这个函数会改变结果集的遍历位置。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)