在MySQL 80 之前, 我们假设一下有一条烂SQL,
mysqlselect from t1 order by rand() ;
以多个线程在跑,导致CPU被跑满了,其他的请求只能被阻塞进不来。那这种情况怎么办?
大概有以下几种解决办法:
设置max_execution_time 来阻止太长的读SQL。那可能存在的问题是会把所有长SQL都给KILL 掉。有些必须要执行很长时间的也会被误杀。
自己写个脚本检测这类语句,比如order by rand(), 超过一定时间用Kill query thread_id 给杀掉。
那能不能不要杀掉而让他正常运行,但是又不影响其他的请求呢?
那mysql 80 引入的资源组(resource group,后面简写微RG)可以基本上解决这类问题。
比如我可以用 RG 来在SQL层面给他限制在特定的一个CPU核上,这样我就不管他,让他继续运行,如果有新的此类语句,让他排队好了。
为什么说基本呢?目前只能绑定CPU资源,其他的暂时不行。
那我来演示下如何使用RG。
创建一个资源组user_ytt 这里解释下各个参数的含义,
type = user 表示这是一个用户态线程,也就是前台的请求线程。如果type=system,表示后台线程,用来限制mysql自己的线程,比如Innodb purge thread,innodb read thread等等。
vcpu 代表cpu的逻辑核数,这里0-1代表前两个核被绑定到这个RG。可以用lscpu,top等列出自己的CPU相关信息。
thread_priority 设置优先级。user 级优先级设置大于0。
mysqlmysql> create resource group user_ytt type = user vcpu = 0-1 thread_priority=19 enable;Query OK, 0 rows affected (003 sec)
RG相关信息可以从 information_schemaresource_groups 系统表里检索。
mysqlmysql> select from information_schemaresource_groups;+---------------------+---------------------+------------------------+----------+-----------------+| RESOURCE_GROUP_NAME | RESOURCE_GROUP_TYPE | RESOURCE_GROUP_ENABLED | VCPU_IDS | THREAD_PRIORITY |+---------------------+---------------------+------------------------+----------+-----------------+| USR_default | USER | 1 | 0-3 | 0 || SYS_default | SYSTEM | 1 | 0-3 | 0 || user_ytt | USER | 1 | 0-1 | 19 |+---------------------+---------------------+------------------------+----------+-----------------+3 rows in set (000 sec)
我们来给语句select guid from t1 group by left(guid,8) order by rand() 赋予RG user_ytt。
mysql> show processlist;+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| Id | User | Host | db | Command | Time | State | Info |+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| 4 | event_scheduler | localhost | NULL | Daemon | 10179 | Waiting on empty queue | NULL || 240 | root | localhost | ytt | Query | 101 | Creating sort index | select guid from t1 group by left(guid,8) order by rand() || 245 | root | localhost | ytt | Query | 0 | starting | show processlist |+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+3 rows in set (000 sec)
找到连接240对应的thread_id。
mysqlmysql> select thread_id from performance_schemathreads where processlist_id = 240;+-----------+| thread_id |+-----------+| 278 |+-----------+1 row in set (000 sec)
给这个线程278赋予RG user_ytt。没报错就算成功了。
mysqlmysql> set resource group user_ytt for 278;Query OK, 0 rows affected (000 sec)
当然这个是在运维层面来做的,我们也可以在开发层面结合 MYSQL HINT 来单独给这个语句赋予RG。比如:
mysqlmysql> select /+ resource_group(user_ytt) /guid from t1 group by left(guid,8) order by rand()8388602 rows in set (4 min 4609 sec)
RG的限制:
Linux 平台上需要开启 CAPSYSNICE 特性。比如我机器上用systemd 给mysql 服务加上
systemctl edit mysql@80 [Service]AmbientCapabilities=CAP_SYS_NICE
mysql 线程池开启后RG失效。
freebsd,solaris 平台thread_priority 失效。
目前只能绑定CPU,不能绑定其他资源。
你的提问就有问题
当你的程序不管是不是多线程的
获得到一个数据库连接是 数据库会把这个连接标记为繁忙 当其他程序访问时它会返回另外空闲的连接
连接个数是有限的 如果一直不释放连接 数据库就会告诉你连接已经使用完了
这里和线程安全有何关系呢? 线程安全和数据库 *** 作没有直接关系
多线程连接数据库的连接池类:
public static class ConnectionPool
{
private static object locker = new object();
private static Dictionary<string, SqlConnection> Connections = null;
public static SqlConnection GetConnection<T>() where T : class, new()
{
string databaseName = NACommonExtensionsGetDatabaseName<T>();
if (stringIsNullOrEmpty(databaseName))
return null;
if (Connections == null)
{
lock (locker)
{
Connections = new Dictionary<string, SqlConnection>();
}
}
string connKey = FindFreeSqlConnection(databaseName);
if (connKey != null)
return Connections[connKey];
else
{
string strconn = NACommonExtensionsGetConnectionString<T>();
int poolSize = NACommonExtensionsGetConnectionPoolSize<T>();
lock (locker)
{
for (int i = 0; i < poolSize; ++i)
{
SqlConnection conn = new SqlConnection(strconn);
connOpen();
ConnectionsAdd(databaseName + "_" + iToString(), conn);
connClose();
}
}
return Connections[FindFreeSqlConnection(databaseName)];
}
}
private static string FindFreeSqlConnection(string databaseName)
{
IEnumerable<string> connKeys = ConnectionsKeysWhere(item => itemStartsWith(databaseName));
if (connKeys != null && connKeysCount() > 0)
{
foreach (string key in connKeys)
{
if (Connections[key]State == ConnectionStateClosed)
return key;
}
}
return null;
}
}
附加上其中用到的三个方法:
internal static int GetConnectionPoolSize<T>() where T : class, new()
{
string database = GetDatabaseName<T>();
string[] poolSizeArray = ConfigurationManagerAppSettings["ConnectionsPoolSize"]Split('|');
if (poolSizeArray != null)
{
foreach (string sizeItem in poolSizeArray)
{
string[] sizeItemArray = sizeItemSplit(':');
if (database == sizeItemArray[0])
return intParse(sizeItemArray[1]);
}
}
return 50;
}
public static string GetConnectionString<T>() where T : class, new()
{
string tableName = GetTableName<T>();
string[] databaseArray = ConfigurationManagerAppSettings["DatabaseArray"]Split('|');
if (databaseArray != null)
{
foreach (string database in databaseArray)
{
string tableNameList = ConfigurationManagerAppSettings[database];
string[] tables = ConfigurationManagerAppSettings[database]Split('|');
if (tables != null && tablesLength > 0)
if (tablesContains(tableName))
return ConfigurationManagerConnectionStrings[database]ConnectionString;
}
}
return stringEmpty;
}
public static string GetDatabaseName<T>() where T : class, new()
{
string tableName = GetTableName<T>();
string[] databaseArray = ConfigurationManagerAppSettings["DatabaseArray"]Split('|');
if (databaseArray != null)
{
foreach (string database in databaseArray)
{
string tableNameList = ConfigurationManagerAppSettings[database];
string[] tables = ConfigurationManagerAppSettings[database]Split('|');
if (tables != null && tablesLength > 0)
if (tablesContains(tableName))
return database;
}
}
return stringEmpty;
}
写线程是不能并发的(无意义,且易死锁),可以考虑设置双队列。
并发线程将接收的数据插入队列,然后用一个线程不断的处理将另一个队列的数据写入磁盘,写入工作的线程处理完队列A后和并发接收线程切换一下,写队列B的数据,并发接收线程向A中写数据,这是简单的,也可以设置三个队列切换。甚至更多。看你了。
//将数据库中的数据条数分段
public void division(){
//获取要导入的总的数据条数
String sql3="SELECT count() FROM [CMD][dbo][mycopy1]";
try {
pss=consprepareStatement(sql3);
rss=pssexecuteQuery();
while(rssnext()){
Systemoutprintln("总记录条数:"+rssgetInt(1));
sum=rssgetInt(1);
}
//每30000条记录作为一个分割点
if(sum>=30000){
n=sum/30000;
residue=sum%30000;
}else{
residue=sum;
}
Systemoutprintln(n+" "+residue);
} catch (SQLException e) {
// TODO Auto-generated catch block
eprintStackTrace();
}
}
线程类
public MyThread(int start,int end) {
thisend=end;
thisstart=start;
Systemoutprintln("处理掉余数");
try {
Systemoutprintln("--------"+ThreadcurrentThread()getName()+"------------");
ClassforName(SQLSERVERDRIVER);
Systemoutprintln("加载sqlserver驱动");
cons = DriverManagergetConnection(CONTENTS,UNS,UPS);
stas = conscreateStatement();
Systemoutprintln("连接SQLServer数据库成功!!");
Systemoutprintln("加载mysql驱动");
ClassforName(MYSQLDRIVER);
con = DriverManagergetConnection(CONTENT,UN,UP);
sta = concreateStatement();
// 关闭事务自动提交
consetAutoCommit(false);
Systemoutprintln("连接mysql数据库成功!!");
} catch (Exception e) {
eprintStackTrace();
}
// TODO Auto-generated constructor stub
}
public ArrayList<Member> getAll(){
Member member;
String sql1="select from (select row_number() over (order by pmcode) as rowNum," +
" from [CMD][dbo][mycopy1]) as t where rowNum between "+start+" and "+end;
try {
Systemoutprintln("正在获取数据");
allmembers=new ArrayList();
rss=stasexecuteQuery(sql1);
while(rssnext()){
member=new Member();
membersetAddress1(rssgetString("address1"));
membersetBnpoints(rssgetString("bnpoints"));
membersetDbno(rssgetString("dbno"));
membersetExpiry(rssgetString("expiry"));
membersetHispoints(rssgetString("hispoints"));
membersetKypoints(rssgetString("kypoints"));
membersetLevels(rssgetString("levels"));
membersetNames(rssgetString("names"));
membersetPmcode(rssgetString("pmcode"));
membersetRemark(rssgetString("remark"));
membersetSex(rssgetString("sex"));
membersetTelephone(rssgetString("telephone"));
membersetWxno(rssgetString("wxno"));
membersetPmdate(rssgetString("pmdate"));
allmembersadd(member);
// Systemoutprintln(membergetNames());
}
Systemoutprintln("成功获取sqlserver数据库数据!");
return allmembers;
} catch (SQLException e) {
// TODO Auto-generated catch block
Systemoutprintln("获取sqlserver数据库数据发送异常!");
eprintStackTrace();
}
try {
rssclose();
stasclose();
} catch (SQLException e) {
// TODO Auto-generated catch block
eprintStackTrace();
}
return null;
}
public void inputAll(ArrayList<Member> allmembers){
Systemoutprintln("开始向mysql中写入");
String sql2="insert into testmycopy2 values (,,,,,,,,,,,,,)";
try {
ps=conprepareStatement(sql2);
Systemoutprintln("-------------------------等待写入数据条数: "+allmemberssize());
for(int i=0;i<allmemberssize();i++){
pssetString(1, allmembersget(i)getPmcode());
pssetString(2, allmembersget(i)getNames());
//Systemoutprintln(allmembersget(i)getNames());
pssetString(3, allmembersget(i)getSex());
pssetString(4, allmembersget(i)getTelephone());
pssetString(5, allmembersget(i)getAddress1());
pssetString(6, allmembersget(i)getPmdate());
pssetString(7, allmembersget(i)getExpiry());
pssetString(8, allmembersget(i)getLevels());
pssetString(9, allmembersget(i)getDbno());
pssetString(10, allmembersget(i)getHispoints());
pssetString(11, allmembersget(i)getBnpoints());
pssetString(12, allmembersget(i)getKypoints());
pssetString(13, allmembersget(i)getWxno());
pssetString(14, allmembersget(i)getRemark());
//插入命令列表
//psaddBatch();
psexecuteUpdate();
}
//psexecuteBatch();
concommit();
psclose();
conclose();
thisflag=false;
Systemoutprintln(ThreadcurrentThread()getName()+"--->OK");
} catch (SQLException e) {
// TODO Auto-generated catch block
Systemoutprintln("向mysql中更新数据时发生异常!");
eprintStackTrace();
}
}
@Override
public void run() {
// TODO Auto-generated method stub
while(true&&flag){
thisinputAll(getAll());
}
}
oracle数据库中,每个客户端进程(userprocess)都会有一个服务端进程()与之对应连接,称为一个session。
每个都会有独立的PGA,所有共享SGA资源。
以上就是关于请问mysql数据库是不能多线程写入吗全部的内容,包括:请问mysql数据库是不能多线程写入吗、请教多线程数据库程序如何保证线程安全、多线程 连接数据库,C#多线程写数据库等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)