请问mysql数据库是不能多线程写入吗

请问mysql数据库是不能多线程写入吗,第1张

在MySQL 80 之前, 我们假设一下有一条烂SQL,

mysqlselect from t1 order by rand() ;

以多个线程在跑,导致CPU被跑满了,其他的请求只能被阻塞进不来。那这种情况怎么办?

大概有以下几种解决办法:

设置max_execution_time 来阻止太长的读SQL。那可能存在的问题是会把所有长SQL都给KILL 掉。有些必须要执行很长时间的也会被误杀。

自己写个脚本检测这类语句,比如order by rand(), 超过一定时间用Kill query thread_id 给杀掉。

那能不能不要杀掉而让他正常运行,但是又不影响其他的请求呢?

那mysql 80 引入的资源组(resource group,后面简写微RG)可以基本上解决这类问题。

比如我可以用 RG 来在SQL层面给他限制在特定的一个CPU核上,这样我就不管他,让他继续运行,如果有新的此类语句,让他排队好了。

为什么说基本呢?目前只能绑定CPU资源,其他的暂时不行。

那我来演示下如何使用RG。

创建一个资源组user_ytt 这里解释下各个参数的含义,

type = user 表示这是一个用户态线程,也就是前台的请求线程。如果type=system,表示后台线程,用来限制mysql自己的线程,比如Innodb purge thread,innodb read thread等等。

vcpu 代表cpu的逻辑核数,这里0-1代表前两个核被绑定到这个RG。可以用lscpu,top等列出自己的CPU相关信息。

thread_priority 设置优先级。user 级优先级设置大于0。

mysqlmysql> create resource group user_ytt type = user  vcpu = 0-1 thread_priority=19 enable;Query OK, 0 rows affected (003 sec)

RG相关信息可以从 information_schemaresource_groups 系统表里检索。

mysqlmysql> select from information_schemaresource_groups;+---------------------+---------------------+------------------------+----------+-----------------+| RESOURCE_GROUP_NAME | RESOURCE_GROUP_TYPE | RESOURCE_GROUP_ENABLED | VCPU_IDS | THREAD_PRIORITY |+---------------------+---------------------+------------------------+----------+-----------------+| USR_default         | USER                |                      1 | 0-3      |               0 || SYS_default         | SYSTEM              |                      1 | 0-3      |               0 || user_ytt            | USER                |                      1 | 0-1      |              19 |+---------------------+---------------------+------------------------+----------+-----------------+3 rows in set (000 sec)

我们来给语句select guid from t1 group by left(guid,8) order by rand() 赋予RG user_ytt。

mysql> show processlist;+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| Id  | User            | Host      | db   | Command | Time  | State                  | Info                                                      |+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+|   4 | event_scheduler | localhost | NULL | Daemon  | 10179 | Waiting on empty queue | NULL                                                      || 240 | root            | localhost | ytt  | Query   |   101 | Creating sort index    | select guid from t1 group by left(guid,8) order by rand() || 245 | root            | localhost | ytt  | Query   |     0 | starting               | show processlist                                          |+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+3 rows in set (000 sec)

找到连接240对应的thread_id。

mysqlmysql> select thread_id from performance_schemathreads where processlist_id = 240;+-----------+| thread_id |+-----------+|       278 |+-----------+1 row in set (000 sec)

给这个线程278赋予RG user_ytt。没报错就算成功了。

mysqlmysql> set resource group user_ytt for 278;Query OK, 0 rows affected (000 sec)

当然这个是在运维层面来做的,我们也可以在开发层面结合 MYSQL HINT 来单独给这个语句赋予RG。比如:

mysqlmysql> select /+ resource_group(user_ytt) /guid from t1 group by left(guid,8) order by rand()8388602 rows in set (4 min 4609 sec)

RG的限制:

Linux 平台上需要开启 CAPSYSNICE 特性。比如我机器上用systemd 给mysql 服务加上

systemctl edit mysql@80 [Service]AmbientCapabilities=CAP_SYS_NICE

mysql 线程池开启后RG失效。

freebsd,solaris 平台thread_priority 失效。

目前只能绑定CPU,不能绑定其他资源。

你的提问就有问题

当你的程序不管是不是多线程的

获得到一个数据连接是 数据库会把这个连接标记为繁忙 当其他程序访问时它会返回另外空闲的连接

连接个数是有限的 如果一直不释放连接 数据库就会告诉你连接已经使用完了

这里和线程安全有何关系呢? 线程安全和数据库 *** 作没有直接关系

多线程连接数据库的连接池类:

public static class ConnectionPool

{

private static object locker = new object();

private static Dictionary<string, SqlConnection> Connections = null;

public static SqlConnection GetConnection<T>() where T : class, new()

{

string databaseName = NACommonExtensionsGetDatabaseName<T>();

if (stringIsNullOrEmpty(databaseName))

return null;

if (Connections == null)

{

lock (locker)

{

Connections = new Dictionary<string, SqlConnection>();

}

}

string connKey = FindFreeSqlConnection(databaseName);

if (connKey != null)

return Connections[connKey];

else

{

string strconn = NACommonExtensionsGetConnectionString<T>();

int poolSize = NACommonExtensionsGetConnectionPoolSize<T>();

lock (locker)

{

for (int i = 0; i < poolSize; ++i)

{

SqlConnection conn = new SqlConnection(strconn);

connOpen();

ConnectionsAdd(databaseName + "_" + iToString(), conn);

connClose();

}

}

return Connections[FindFreeSqlConnection(databaseName)];

}

}

private static string FindFreeSqlConnection(string databaseName)

{

IEnumerable<string> connKeys = ConnectionsKeysWhere(item => itemStartsWith(databaseName));

if (connKeys != null && connKeysCount() > 0)

{

foreach (string key in connKeys)

{

if (Connections[key]State == ConnectionStateClosed)

return key;

}

}

return null;

}

}

附加上其中用到的三个方法:

internal static int GetConnectionPoolSize<T>() where T : class, new()

{

string database = GetDatabaseName<T>();

string[] poolSizeArray = ConfigurationManagerAppSettings["ConnectionsPoolSize"]Split('|');

if (poolSizeArray != null)

{

foreach (string sizeItem in poolSizeArray)

{

string[] sizeItemArray = sizeItemSplit(':');

if (database == sizeItemArray[0])

return intParse(sizeItemArray[1]);

}

}

return 50;

}

public static string GetConnectionString<T>() where T : class, new()

{

string tableName = GetTableName<T>();

string[] databaseArray = ConfigurationManagerAppSettings["DatabaseArray"]Split('|');

if (databaseArray != null)

{

foreach (string database in databaseArray)

{

string tableNameList = ConfigurationManagerAppSettings[database];

string[] tables = ConfigurationManagerAppSettings[database]Split('|');

if (tables != null && tablesLength > 0)

if (tablesContains(tableName))

return ConfigurationManagerConnectionStrings[database]ConnectionString;

}

}

return stringEmpty;

}

public static string GetDatabaseName<T>() where T : class, new()

{

string tableName = GetTableName<T>();

string[] databaseArray = ConfigurationManagerAppSettings["DatabaseArray"]Split('|');

if (databaseArray != null)

{

foreach (string database in databaseArray)

{

string tableNameList = ConfigurationManagerAppSettings[database];

string[] tables = ConfigurationManagerAppSettings[database]Split('|');

if (tables != null && tablesLength > 0)

if (tablesContains(tableName))

return database;

}

}

return stringEmpty;

}

写线程是不能并发的(无意义,且易死锁),可以考虑设置双队列。

并发线程将接收的数据插入队列,然后用一个线程不断的处理将另一个队列的数据写入磁盘,写入工作的线程处理完队列A后和并发接收线程切换一下,写队列B的数据,并发接收线程向A中写数据,这是简单的,也可以设置三个队列切换。甚至更多。看你了。

//将数据库中的数据条数分段

 public void division(){

  //获取要导入的总的数据条数

  String sql3="SELECT  count()  FROM [CMD][dbo][mycopy1]";

  try {

   pss=consprepareStatement(sql3);

   rss=pssexecuteQuery();

   

   while(rssnext()){

   Systemoutprintln("总记录条数:"+rssgetInt(1));

   sum=rssgetInt(1);

   }

   //每30000条记录作为一个分割点

   if(sum>=30000){

    n=sum/30000;

    residue=sum%30000;

   }else{

    residue=sum;

   }

   

   Systemoutprintln(n+"  "+residue);

   

  } catch (SQLException e) {

   // TODO Auto-generated catch block

   eprintStackTrace();

  }

  

 }

线程类

public MyThread(int start,int end) {

  thisend=end; 

     thisstart=start;

  Systemoutprintln("处理掉余数");

    try {

    

         Systemoutprintln("--------"+ThreadcurrentThread()getName()+"------------");

    ClassforName(SQLSERVERDRIVER);

    Systemoutprintln("加载sqlserver驱动");

    cons = DriverManagergetConnection(CONTENTS,UNS,UPS);

    stas = conscreateStatement();

    Systemoutprintln("连接SQLServer数据库成功!!");

    

    Systemoutprintln("加载mysql驱动");

    ClassforName(MYSQLDRIVER);

    con = DriverManagergetConnection(CONTENT,UN,UP);

    sta = concreateStatement();

    // 关闭事务自动提交

    consetAutoCommit(false);

    Systemoutprintln("连接mysql数据库成功!!");

    

   } catch (Exception e) {

    eprintStackTrace(); 

   }

  // TODO Auto-generated constructor stub

 }

 

 

 public ArrayList<Member> getAll(){

  Member member;

  String sql1="select  from (select row_number() over (order by pmcode) as rowNum," +

    " from [CMD][dbo][mycopy1]) as t where rowNum between "+start+" and "+end;

  try {

   Systemoutprintln("正在获取数据");

   allmembers=new ArrayList();

   rss=stasexecuteQuery(sql1);

   while(rssnext()){

    member=new Member();

    membersetAddress1(rssgetString("address1"));

    membersetBnpoints(rssgetString("bnpoints"));

    membersetDbno(rssgetString("dbno"));

    membersetExpiry(rssgetString("expiry"));

    membersetHispoints(rssgetString("hispoints"));

    membersetKypoints(rssgetString("kypoints"));

    membersetLevels(rssgetString("levels"));

    membersetNames(rssgetString("names"));

    membersetPmcode(rssgetString("pmcode"));

    membersetRemark(rssgetString("remark"));

    membersetSex(rssgetString("sex"));

    membersetTelephone(rssgetString("telephone"));

    membersetWxno(rssgetString("wxno"));

    membersetPmdate(rssgetString("pmdate"));

    allmembersadd(member);

   // Systemoutprintln(membergetNames());

   }

   Systemoutprintln("成功获取sqlserver数据库数据!");

   return allmembers;

   

  } catch (SQLException e) {

   // TODO Auto-generated catch block

   Systemoutprintln("获取sqlserver数据库数据发送异常!");

   eprintStackTrace();

  }

  try {

   rssclose();

   stasclose();

  } catch (SQLException e) {

   // TODO Auto-generated catch block

   eprintStackTrace();

  }

  return null;

 }

 

 public void inputAll(ArrayList<Member> allmembers){

  Systemoutprintln("开始向mysql中写入");

  String sql2="insert into testmycopy2 values (,,,,,,,,,,,,,)";

  try {

   ps=conprepareStatement(sql2);

   Systemoutprintln("-------------------------等待写入数据条数: "+allmemberssize());

   for(int i=0;i<allmemberssize();i++){

    pssetString(1, allmembersget(i)getPmcode());

    pssetString(2, allmembersget(i)getNames());

    //Systemoutprintln(allmembersget(i)getNames());

    pssetString(3, allmembersget(i)getSex());

    pssetString(4, allmembersget(i)getTelephone());

    pssetString(5, allmembersget(i)getAddress1());

    pssetString(6, allmembersget(i)getPmdate());

    pssetString(7, allmembersget(i)getExpiry());

    pssetString(8, allmembersget(i)getLevels());

    pssetString(9, allmembersget(i)getDbno());

    pssetString(10, allmembersget(i)getHispoints());

    pssetString(11, allmembersget(i)getBnpoints());

    pssetString(12, allmembersget(i)getKypoints());

    pssetString(13, allmembersget(i)getWxno());

    pssetString(14, allmembersget(i)getRemark());

    //插入命令列表

    //psaddBatch();

    psexecuteUpdate();

   }

   //psexecuteBatch();

   concommit();

   

   psclose();

   conclose();

   thisflag=false;

   Systemoutprintln(ThreadcurrentThread()getName()+"--->OK");

  } catch (SQLException e) {

   // TODO Auto-generated catch block

   Systemoutprintln("向mysql中更新数据时发生异常!");

   eprintStackTrace(); 

  }

 }

 @Override

 public void run() {

  // TODO Auto-generated method stub

  while(true&&flag){

   thisinputAll(getAll());

  }

 }

oracle数据库中,每个客户端进程(userprocess)都会有一个服务端进程()与之对应连接,称为一个session。

每个都会有独立的PGA,所有共享SGA资源。

以上就是关于请问mysql数据库是不能多线程写入吗全部的内容,包括:请问mysql数据库是不能多线程写入吗、请教多线程数据库程序如何保证线程安全、多线程 连接数据库,C#多线程写数据库等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/9807213.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-02
下一篇 2023-05-02

发表评论

登录后才能评论

评论列表(0条)

保存