Hbase 客户端批量写入数据

Hbase 客户端批量写入数据,第1张

org.apache.hadoop.hbase.client.BufferedMutator主要用来对HBase的单个表进行 *** 作。它和Put类的作用差不多,但是主要用来实现批量的异步写 *** 作。

可以从Connection的实例中获取BufferedMutator的实例。在使用完成后需要调用close()方法关闭连接。对BufferedMutator进行配置需要通过BufferedMutatorParams完成。

BufferedMutator接收发送来的Put数据后,会根据某些因素(比如接收的Put数据的总量)启发式地执行Batch Put *** 作,且会异步的提交Batch Put请求。

BufferedMutator也可以用在一些特殊的情况上。多线程作业的每个线程将会拥有一个独立的BufferedMutator对象。

一个独立的BufferedMutator也可以用在大容量的在线系统上来执行批量Put *** 作,但是这时需要注意一些极端情况比如JVM异常或机器故障,此时有可能造成数据丢失。

官方源码路径: https://github.com/apache/hbase/blob/master/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java

#include <QCoreApplication>

#include "thread.h"

#include <QVector>

#include <QDebug>

int main(int argc, char *argv[])

{

QCoreApplication a(argc, argv)

QVector<Thread*>vector

Thread *thread

//创建多个线程,并start

for(int i=0i<10i++){

thread=new Thread

vector.append(thread)

thread->set(i)

thread->start()

}

//等待所有线程执行完,然后删除线程

foreach(thread,vector){

thread->wait()

}

foreach(thread,vector){

delete thread

}

return a.exec()

}

-------------------------------------------------------------------------------------------------

#include "thread.h"

Thread::Thread(QObject *parent) : QThread(parent)

{

}

void Thread::run()

{

begin()

}

//为每个线程创建一个连接名

void Thread::set(int a)

{

connectionName=QString::number(a)

}

void Thread::connectionDatabase(QString dbName)

{

QSqlDatabasedb=QSqlDatabase::addDatabase("QMYSQL",connectionName)

db.setHostName("localhost")

db.setDatabaseName(dbName)

db.setUserName("root")

db.setPassword("")

if(!db.open())

qDebug()<<"db open fail"

}

void Thread::begin()

{

QString dbName="learnsql"

connectionDatabase(dbName)

QSqlDatabase db=QSqlDatabase::database(connectionName)

db.transaction()//开启事物

QSqlQuery query(db)

//向表student中插入10000条数据

for(int i=1i<=10000i++){

query.exec("insert into student values(1)")

}

db.commit()//提交事物

}

在MySQL 8.0 之前, 我们假设一下有一条烂SQL,

mysqlselect * from t1 order by rand()

以多个线程在跑,导致CPU被跑满了,其他的请求只能被阻塞进不来。那这种情况怎么办?

大概有以下几种解决办法:

设置max_execution_time 来阻止太长的读SQL。那可能存在的问题是会把所有长SQL都给KILL 掉。有些必须要执行很长时间的也会被误杀。

自己写个脚本检测这类语句,比如order by rand(), 超过一定时间用Kill query thread_id 给杀掉。

那能不能不要杀掉而让他正常运行,但是又不影响其他的请求呢?

那mysql 8.0 引入的资源组(resource group,后面简写微RG)可以基本上解决这类问题。

比如我可以用 RG 来在SQL层面给他限制在特定的一个CPU核上,这样我就不管他,让他继续运行,如果有新的此类语句,让他排队好了。

为什么说基本呢?目前只能绑定CPU资源,其他的暂时不行。

那我来演示下如何使用RG。

创建一个资源组user_ytt. 这里解释下各个参数的含义,

type = user 表示这是一个用户态线程,也就是前台的请求线程。如果type=system,表示后台线程,用来限制mysql自己的线程,比如Innodb purge thread,innodb read thread等等。

vcpu 代表cpu的逻辑核数,这里0-1代表前两个核被绑定到这个RG。可以用lscpu,top等列出自己的CPU相关信息。

thread_priority 设置优先级。user 级优先级设置大于0。

mysqlmysql>create resource group user_ytt type = user  vcpu = 0-1 thread_priority=19 enableQuery OK, 0 rows affected (0.03 sec)

RG相关信息可以从 information_schema.resource_groups 系统表里检索。

mysqlmysql>select * from information_schema.resource_groups+---------------------+---------------------+------------------------+----------+-----------------+| RESOURCE_GROUP_NAME | RESOURCE_GROUP_TYPE | RESOURCE_GROUP_ENABLED | VCPU_IDS | THREAD_PRIORITY |+---------------------+---------------------+------------------------+----------+-----------------+| USR_default         | USER                |                      1 | 0-3      |               0 || SYS_default         | SYSTEM              |                      1 | 0-3      |               0 || user_ytt            | USER                |                      1 | 0-1      |              19 |+---------------------+---------------------+------------------------+----------+-----------------+3 rows in set (0.00 sec)

我们来给语句select guid from t1 group by left(guid,8) order by rand() 赋予RG user_ytt。

mysql>show processlist+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| Id  | User            | Host      | db   | Command | Time  | State                  | Info                                                      |+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+|   4 | event_scheduler | localhost | NULL | Daemon  | 10179 | Waiting on empty queue | NULL                                                      || 240 | root            | localhost | ytt  | Query   |   101 | Creating sort index    | select guid from t1 group by left(guid,8) order by rand() || 245 | root            | localhost | ytt  | Query   |     0 | starting               | show processlist                                          |+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+3 rows in set (0.00 sec)

找到连接240对应的thread_id。

mysqlmysql>select thread_id from performance_schema.threads where processlist_id = 240+-----------+| thread_id |+-----------+|       278 |+-----------+1 row in set (0.00 sec)

给这个线程278赋予RG user_ytt。没报错就算成功了。

mysqlmysql>set resource group user_ytt for 278Query OK, 0 rows affected (0.00 sec)

当然这个是在运维层面来做的,我们也可以在开发层面结合 MYSQL HINT 来单独给这个语句赋予RG。比如:

mysqlmysql>select /*+ resource_group(user_ytt) */guid from t1 group by left(guid,8) order by rand()....8388602 rows in set (4 min 46.09 sec)

RG的限制:

Linux 平台上需要开启 CAPSYSNICE 特性。比如我机器上用systemd 给mysql 服务加上

systemctl edit mysql@80 [Service]AmbientCapabilities=CAP_SYS_NICE

mysql 线程池开启后RG失效。

freebsd,solaris 平台thread_priority 失效。

目前只能绑定CPU,不能绑定其他资源。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/10835640.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-11
下一篇 2023-05-11

发表评论

登录后才能评论

评论列表(0条)

保存