调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:
1 JavaRDD<Integer> myRDD = scparallelize(ArraysasList(1,2,3));
Scala版本如下:
1 val myRDD= scparallelize(List(1,2,3))
这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。
python访问hbase需要额外的库,一般用thrift。使用thrift调用hbase,由于篇幅限制在这里不能说的很详细。
请百度Phthon thrift 或 python hbase 自行查阅相关资料。
下面是一个例子仅供参考
# coding:utf-8from thrift import Thrift
from thrifttransport import TSocket
from thrifttransport import TTransport
from thriftprotocol import TBinaryProtocol
from hbase import Hbase
from hbasettypes import
import csv
def client_conn():
transport = TSocketTSocket('hostname,like:localhost', port)
transport = TTransportTBufferedTransport(transport)
protocol = TBinaryProtocolTBinaryProtocol(transport)
client = HbaseClient(protocol)
transportopen()
return client
if __name__ == "__main__":
client = client_conn()
result = clientgetRow("table name","row name")
data_simple =[]
for k, v in result[0]columnsitems(): #keys()
data_simpleappend((vtimestamp, vvalue))
writerwriterows(data)
csvfileclose()
csvfile_simple = open("data_xy_simplecsv", "wb")
writer_simple = csvwriter(csvfile_simple)
writer_simplewriterow(["timestamp", "value"])
writer_simplewriterows(data_simple)
csvfile_simpleclose()
hbase13
HTable 是我们对数据读取, *** 作的入口, implements HTableInterface, RegionLocator
内部构造
有一个检查 的动作待详细查看
关于BufferedMutator, 是用来缓存客户端的 *** 作的, hbase 将客户端的DML抽象成了 Mutation , 子类有: Append, Delete, Increment, Put *** 作
put方法将Put对象包装成Mutation,交给BufferedMutator, 到达设置的大小限制,或者主动调用flush *** 作, 会触发 backgroundFlushCommits(boolean synchronous) *** 作, 然后Mutation由 AsyncProcess 提交,详细查看 BufferedMutatorImpl 类
由 AscncProcess 提交后, (注释:Action类是将行与对应 *** 作结合的类), 由connection去寻找每一行对应的region位置, 包装action, server, region等信息添加到 MutiAction 中去, 这个类持有按照region分组的actions,
然后会对每个action都创建 SingleServerRequestRunnable (rpc caller 和rpc callable, caller call callable), 交给线程池去运行
删除 *** 作很简单: 创建 RegionServerCallable , 然后rpc工厂类创建rpc caller来调用它
get和scan都是继承了Query
get很简单:首先检查,这个get是否只是检查数据存在否, 并且检查是否指定了一致性等级(默认 (ConsistencySTRONG) ), 之后创建rpc请求Request, 如果 不是强一致性ConsistencyTIMELINE , 则调用 RpcRetryingCallerWithReadReplicas , 它可以从replica上读取, 返回的数据被标记为stale(读 *** 作是通过 ConsistencyTIMELINE ,然后读RPC将会首先发送到主region服务器上,在短时间内(hbaseclientprimaryCallTimeoutget默认为10ms),如果主region没有响应RPC会被发送到从region。 之后结果会从第一个完成RPC的返回。如果响应是来自主region副本,我们就会知道数据是最新的,ResultisStale() API是检查过期数据,如果结果是 从region返回,那么ResultisStale()为true,然后用户就可以检查关于过期数据可能的原因。)
当replica_id=0的regin不可以时候, 给所有的replica region发送请求,获取第一个从这些replica返回的数据, 客户端可以 ResultisStale()检查是否是来自副本的数据
Scan 类可以设置一系列的属性, startkey,endkey, 过滤器, 版本,缓存,最大取回大小等等, 但是获取数据是由 getScanner(Scan)返回的 ResultScanner *** 作的
返回的 ResultScanner 有small, Reversed,big和纯client 的不同,
什么是small scan
见 >
hbase每次处理数据不需要实时的调用数据。根据查询相关公开信息显示,在HBase中,客户端在访问数据时不需要每次实时调用数据,HBase使用一个称为RegionServer的组件来缓存和管理数据,客户端可以通过与RegionServer进行交互来获取数据,当客户端首次请求数据时,RegionServer会将数据加载到内存中,并在内存中保持一段时间以提高下一次访问的性能。
以上就是关于spark1.2.1实现读取hbase的数据后怎么实现实时查询全部的内容,包括:spark1.2.1实现读取hbase的数据后怎么实现实时查询、如何在python中访问hbase的数据、hbase(一) : HTable等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)