spark1.2.1实现读取hbase的数据后怎么实现实时查询

spark1.2.1实现读取hbase的数据后怎么实现实时查询,第1张

 调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:

1 JavaRDD<Integer> myRDD = scparallelize(ArraysasList(1,2,3));

Scala版本如下:

1 val myRDD= scparallelize(List(1,2,3))

这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。

python访问hbase需要额外的库,一般用thrift。使用thrift调用hbase,由于篇幅限制在这里不能说的很详细。

请百度Phthon thrift 或 python hbase 自行查阅相关资料。

下面是一个例子仅供参考

# coding:utf-8

from thrift import Thrift

from thrifttransport import TSocket

from thrifttransport import TTransport

from thriftprotocol import TBinaryProtocol

from hbase import Hbase

from hbasettypes import 

import csv

def client_conn():

    transport = TSocketTSocket('hostname,like:localhost', port)

    transport = TTransportTBufferedTransport(transport)

    protocol = TBinaryProtocolTBinaryProtocol(transport)

    client = HbaseClient(protocol)

    transportopen()

    return client

if __name__ == "__main__":

    client = client_conn()

    result = clientgetRow("table name","row name")

    data_simple =[]

    for k, v in result[0]columnsitems(): #keys()

        data_simpleappend((vtimestamp, vvalue))

    writerwriterows(data)

    csvfileclose()

    csvfile_simple = open("data_xy_simplecsv", "wb")

    writer_simple = csvwriter(csvfile_simple)

    writer_simplewriterow(["timestamp", "value"])

    writer_simplewriterows(data_simple)

    csvfile_simpleclose()

hbase13

HTable 是我们对数据读取, *** 作的入口, implements HTableInterface, RegionLocator

内部构造

有一个检查 的动作待详细查看

关于BufferedMutator, 是用来缓存客户端的 *** 作的, hbase 将客户端的DML抽象成了 Mutation , 子类有: Append, Delete, Increment, Put *** 作

put方法将Put对象包装成Mutation,交给BufferedMutator, 到达设置的大小限制,或者主动调用flush *** 作, 会触发 backgroundFlushCommits(boolean synchronous) *** 作, 然后Mutation由 AsyncProcess 提交,详细查看 BufferedMutatorImpl 类

由 AscncProcess 提交后, (注释:Action类是将行与对应 *** 作结合的类), 由connection去寻找每一行对应的region位置, 包装action, server, region等信息添加到 MutiAction 中去, 这个类持有按照region分组的actions,

然后会对每个action都创建 SingleServerRequestRunnable (rpc caller 和rpc callable, caller call callable), 交给线程池去运行

删除 *** 作很简单: 创建 RegionServerCallable , 然后rpc工厂类创建rpc caller来调用它

get和scan都是继承了Query

get很简单:首先检查,这个get是否只是检查数据存在否, 并且检查是否指定了一致性等级(默认 (ConsistencySTRONG) ), 之后创建rpc请求Request, 如果 不是强一致性ConsistencyTIMELINE , 则调用 RpcRetryingCallerWithReadReplicas , 它可以从replica上读取, 返回的数据被标记为stale(读 *** 作是通过 ConsistencyTIMELINE ,然后读RPC将会首先发送到主region服务器上,在短时间内(hbaseclientprimaryCallTimeoutget默认为10ms),如果主region没有响应RPC会被发送到从region。 之后结果会从第一个完成RPC的返回。如果响应是来自主region副本,我们就会知道数据是最新的,ResultisStale() API是检查过期数据,如果结果是 从region返回,那么ResultisStale()为true,然后用户就可以检查关于过期数据可能的原因。)

当replica_id=0的regin不可以时候, 给所有的replica region发送请求,获取第一个从这些replica返回的数据, 客户端可以 ResultisStale()检查是否是来自副本的数据

Scan 类可以设置一系列的属性, startkey,endkey, 过滤器, 版本,缓存,最大取回大小等等, 但是获取数据是由 getScanner(Scan)返回的 ResultScanner *** 作的

返回的 ResultScanner 有small, Reversed,big和纯client 的不同,

什么是small scan

见 >

hbase每次处理数据不需要实时的调用数据。根据查询相关公开信息显示,在HBase中,客户端在访问数据时不需要每次实时调用数据,HBase使用一个称为RegionServer的组件来缓存和管理数据,客户端可以通过与RegionServer进行交互来获取数据,当客户端首次请求数据时,RegionServer会将数据加载到内存中,并在内存中保持一段时间以提高下一次访问的性能。

以上就是关于spark1.2.1实现读取hbase的数据后怎么实现实时查询全部的内容,包括:spark1.2.1实现读取hbase的数据后怎么实现实时查询、如何在python中访问hbase的数据、hbase(一) : HTable等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9331252.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-27
下一篇 2023-04-27

发表评论

登录后才能评论

评论列表(0条)

保存