java 多线程存储数据库

java 多线程存储数据库,第1张

以mysql为数据库写的一个粗陋的demo,你参考一下,希望不会因为代码过多被百度吞了——

import javasqlConnection;

import javasqlDriverManager;

import javasqlPreparedStatement;

import javasqlResultSet;

import javasqlSQLException;

import javautilArrayList;

import javautilList;

public class Test {

       

    public static void main(String[] args) {

        allotThread();

    }

       

    /

      将100条数据分成10份并启动10个线程分别 *** 作

     /

    public static void allotThread() {

        List<String[]> datas = buildDatas();

        for (int i=0; i<100; i+=10) {

            List<String[]> tenDatas = datassubList(i, i + 10);

            insertData(tenDatas);

        }

    }

       

    /

      创建100条模拟数据

      @return

     /

    public static List<String[]> buildDatas() {

        List<String[]> datas = new ArrayList<String[]>();

        for (int i=0; i<100; i++) {

            String[] data = {"id " + i, "name " + i};

            datasadd(data);

        }

        return datas;

    }

       

    /

      启动线程进行数据插入 *** 作

      @param tenDatas

     /

    public static void insertData(final List<String[]> tenDatas) {

        new Thread(new Runnable() {

            public void run() {

                String sql = "insert into testtable (id, name) values (, )";

                Connection conn = null;

                PreparedStatement pstmt = null;

                try {

                    conn = getConnection();

                    connsetAutoCommit(false);

                    pstmt = getPstmt(conn, sql);

                    for (String[] data : tenDatas) {

                        pstmtsetString(1, data[0]);

                        pstmtsetString(2, data[1]);

                        pstmtaddBatch();

                    }

                    pstmtexecuteBatch();

                    conncommit();

                    connsetAutoCommit(true);

                } catch (SQLException e) {

                    eprintStackTrace();

                    rollback(conn);

                } catch (ClassNotFoundException e) {

                    eprintStackTrace();

                } finally {

                    close(pstmt);

                    close(conn);

                }

            }

        })start();

    }

       

    public static Connection getConnection() throws SQLException, ClassNotFoundException {

        ClassforName("commysqljdbcDriver");

        String dbUrl = "jdbc:mysql://localhost/testuseUnicode=true&characterEncoding=UTF-8";

        Connection conn = DriverManagergetConnection(dbUrl, "root", "tooeasy");

        return conn;

    }

       

    public static PreparedStatement getPstmt(Connection conn, String sql) throws SQLException, ClassNotFoundException {

        PreparedStatement pstmt = connprepareStatement(sql);

        return pstmt;

    }

       

    public static void rollback(Connection conn) {

        try {

            if (null != conn) {

                connrollback();

            }

        } catch (SQLException e) {

            eprintStackTrace();

        }

    }

       

    public static void close(Connection conn) {

        try {

            if (null != conn) {

                connclose();

            }

        } catch (SQLException e) {

            eprintStackTrace();

        }

    }

       

    public static void close(PreparedStatement pstmt) {

        try {

            if (null != pstmt) {

                pstmtclose();

            }

        } catch (SQLException e) {

            eprintStackTrace();

        }

    }

       

    public static void close(ResultSet rs) {

        try {

            if (null != rs) {

                rsclose();

            }

        } catch (SQLException e) {

            eprintStackTrace();

        }

    }

}

#include <QCoreApplication>

#include "threadh"

#include <QVector>

#include <QDebug>

int main(int argc, char argv[])

{

QCoreApplication a(argc, argv);

QVector<Thread> vector;

Thread thread;

//创建多个线程,并start

for(int i=0;i<10;i++){

thread=new Thread;

vectorappend(thread);

thread->set(i);

thread->start();

}

//等待所有线程执行完,然后删除线程

foreach(thread,vector){

thread->wait();

}

foreach(thread,vector){

delete thread;

}

return aexec();

}

-------------------------------------------------------------------------------------------------

#include "threadh"

Thread::Thread(QObject parent) : QThread(parent)

{

}

void Thread::run()

{

begin();

}

//为每个线程创建一个连接名

void Thread::set(int a)

{

connectionName=QString::number(a);

}

void Thread::connectionDatabase(QString dbName)

{

QSqlDatabasedb=QSqlDatabase::addDatabase("QMYSQL",connectionName);

dbsetHostName("localhost");

dbsetDatabaseName(dbName);

dbsetUserName("root");

dbsetPassword("");

if(!dbopen())

qDebug()<<"db open fail";

}

void Thread::begin()

{

QString dbName="learnsql";

connectionDatabase(dbName);

QSqlDatabase db=QSqlDatabase::database(connectionName);

dbtransaction(); //开启事物

QSqlQuery query(db);

//向表student中插入10000条数据

for(int i=1;i<=10000;i++){

queryexec("insert into student values(1)");

}

dbcommit(); //提交事物

}

在MySQL 80 之前, 我们假设一下有一条烂SQL,

mysqlselect from t1 order by rand() ;

以多个线程在跑,导致CPU被跑满了,其他的请求只能被阻塞进不来。那这种情况怎么办?

大概有以下几种解决办法:

设置max_execution_time 来阻止太长的读SQL。那可能存在的问题是会把所有长SQL都给KILL 掉。有些必须要执行很长时间的也会被误杀。

自己写个脚本检测这类语句,比如order by rand(), 超过一定时间用Kill query thread_id 给杀掉。

那能不能不要杀掉而让他正常运行,但是又不影响其他的请求呢?

那mysql 80 引入的资源组(resource group,后面简写微RG)可以基本上解决这类问题。

比如我可以用 RG 来在SQL层面给他限制在特定的一个CPU核上,这样我就不管他,让他继续运行,如果有新的此类语句,让他排队好了。

为什么说基本呢?目前只能绑定CPU资源,其他的暂时不行。

那我来演示下如何使用RG。

创建一个资源组user_ytt 这里解释下各个参数的含义,

type = user 表示这是一个用户态线程,也就是前台的请求线程。如果type=system,表示后台线程,用来限制mysql自己的线程,比如Innodb purge thread,innodb read thread等等。

vcpu 代表cpu的逻辑核数,这里0-1代表前两个核被绑定到这个RG。可以用lscpu,top等列出自己的CPU相关信息。

thread_priority 设置优先级。user 级优先级设置大于0。

mysqlmysql> create resource group user_ytt type = user  vcpu = 0-1 thread_priority=19 enable;Query OK, 0 rows affected (003 sec)

RG相关信息可以从 information_schemaresource_groups 系统表里检索。

mysqlmysql> select from information_schemaresource_groups;+---------------------+---------------------+------------------------+----------+-----------------+| RESOURCE_GROUP_NAME | RESOURCE_GROUP_TYPE | RESOURCE_GROUP_ENABLED | VCPU_IDS | THREAD_PRIORITY |+---------------------+---------------------+------------------------+----------+-----------------+| USR_default         | USER                |                      1 | 0-3      |               0 || SYS_default         | SYSTEM              |                      1 | 0-3      |               0 || user_ytt            | USER                |                      1 | 0-1      |              19 |+---------------------+---------------------+------------------------+----------+-----------------+3 rows in set (000 sec)

我们来给语句select guid from t1 group by left(guid,8) order by rand() 赋予RG user_ytt。

mysql> show processlist;+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| Id  | User            | Host      | db   | Command | Time  | State                  | Info                                                      |+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+|   4 | event_scheduler | localhost | NULL | Daemon  | 10179 | Waiting on empty queue | NULL                                                      || 240 | root            | localhost | ytt  | Query   |   101 | Creating sort index    | select guid from t1 group by left(guid,8) order by rand() || 245 | root            | localhost | ytt  | Query   |     0 | starting               | show processlist                                          |+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+3 rows in set (000 sec)

找到连接240对应的thread_id。

mysqlmysql> select thread_id from performance_schemathreads where processlist_id = 240;+-----------+| thread_id |+-----------+|       278 |+-----------+1 row in set (000 sec)

给这个线程278赋予RG user_ytt。没报错就算成功了。

mysqlmysql> set resource group user_ytt for 278;Query OK, 0 rows affected (000 sec)

当然这个是在运维层面来做的,我们也可以在开发层面结合 MYSQL HINT 来单独给这个语句赋予RG。比如:

mysqlmysql> select /+ resource_group(user_ytt) /guid from t1 group by left(guid,8) order by rand()8388602 rows in set (4 min 4609 sec)

RG的限制:

Linux 平台上需要开启 CAPSYSNICE 特性。比如我机器上用systemd 给mysql 服务加上

systemctl edit mysql@80 [Service]AmbientCapabilities=CAP_SYS_NICE

mysql 线程池开启后RG失效。

freebsd,solaris 平台thread_priority 失效。

目前只能绑定CPU,不能绑定其他资源。

多线程 *** 作,请确保每个线程 *** 作的SQL语句中的表是相对独立的。 不然,你需要安排线程间的顺序,也就是lock代码段。 同一时间,两个线程一起跑同一句SQL,而且还 *** 作同一张表,那么,肯定就会有问题了。

一般这种是因为超出数据库最大链接上限。再建立链接,不管缓存多少,会自动队列消息等待。Timeout时间内没有链接取消无法获得链接权限。可以将自己的数据库链接个数设置大一些。

选择成品数据库,要看之后的应用结构的才能确定用哪个快

如果仅仅是要写入时快,不考虑查询情况那当然直接把C/C++的数据结构给保存了最快

比如保存一个struct Record,或class Record,限定好成员大小后,直接内存到磁盘的写盘

这样写最快,而读取只能顺序读取

另json等是交换格式不是存储格式更不能当数据库用哇

以上就是关于java 多线程存储数据库全部的内容,包括:java 多线程存储数据库、请教如何进行多线程连接数据库并写入数据、MySQL数据库是一个多用户,多线程的关系数据库管理系统,其主要技术都包括哪些等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/10158705.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存