spark1.2.1实现读取hbase的数据后怎么实现实时查询

spark1.2.1实现读取hbase的数据后怎么实现实时查询,第1张

WordCountHbaseReaderMapper类继承了TableMapper抽象类,TableMapper类专门用于完成MapReduce中Map过程与Hbase表之间的 *** 作。此时的map(ImmutableBytesWritablekey,Resultvalue,Contextcontext)方法,第一个参数key为Hbase表的rowkey主键,第二个参数value为key主键对应的记录集合,此处的map核心实现是遍历key主键对应的记录集合value,将其组合成一条记录通过contentxwrite(key,value)填充到键值对中。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReaderjavapublicstaticclassWordCountHbaseReaderMapperextendsTableMapper{@Overrideprotectedvoidmap(ImmutableBytesWritablekey,Resultvalue,Contextcontext)throwsIOException,InterruptedException{StringBuffersb=newStringBuffer("");for(Entryentry:valuegetFamilyMap("content"getBytes())entrySet()){Stringstr=newString(entrygetValue());//将字节数组转换为String类型if(str!=null){sbappend(newString(entrygetKey()));sbappend(":");sbappend(str);}contextwrite(newText(keyget()),newText(newString(sb)));}}}3、Reducer函数实现此处的WordCountHbaseReaderReduce实现了直接输出Map输出的键值对,没有对其做任何处理。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReaderjavapublicstaticclassWordCountHbaseReaderReduceextendsReducer{privateTextresult=newText();@Overrideprotectedvoidreduce(Textkey,Iterablevalues,Contextcontext)throwsIOException,InterruptedException{for(Textval:values){resultset(val);contextwrite(key,result);}}}4、驱动函数实现与WordCount的驱动类不同,在Job配置的时候没有配置jobsetMapperClass(),而是用以下方法执行Mapper类:TableMapReduceUtilinitTableMapperJob(tablename,scan,WordCountHbaseReaderMapperclass,Textclass,Textclass,job);该方法指明了在执行job的Map过程时,数据输入源是hbase的tablename表,通过扫描读入对象scan对表进行全表扫描,为Map过程提供数据源输入,通过WordCountHbaseReaderMapperclass执行Map过程,Map过程的输出key/value类型是Textclass与Textclass,最后一个参数是作业对象。特别注意:这里声明的是一个最简单的扫描读入对象scan,进行表扫描读取数据,其中scan可以配置参数,这里为了例子简单不再详述,用户可自行尝试。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReaderjavapublicstaticvoidmain(String[]args)throwsException{Stringtablename="wordcount";Configurationconf=HBaseConfigurationcreate();confset("hbasezookeeperquorum","Master");String[]otherArgs=newGenericOptionsParser(conf,args)getRemainingArgs();if(otherArgslength!=1){Systemerrprintln("Usage:WordCountHbaseReader");Systemexit(2);}Jobjob=newJob(conf,"WordCountHbaseReader");jobsetJarByClass(WordCountHbaseReaderclass);//设置任务数据的输出路径;FileOutputFormatsetOutputPath(job,newPath(otherArgs[0]));jobsetReducerClass(WordCountHbaseReaderReduceclass);Scanscan=newScan();TableMapReduceUtilinitTableMapperJob(tablename,scan,WordCountHbaseReaderMapperclass,Textclass,Textclass,job);//调用jobwaitForCompletion(true)执行任务,执行成功后退出;Systemexit(jobwaitForCompletion(true)0:1);}5、部署运行1)启动Hadoop集群和Hbase服务[hadoop@K-Master~]$start-dfssh#启动hadoopHDFS文件管理系统[hadoop@K-Master~]$start-mapredsh#启动hadoopMapReduce分布式计算服务[hadoop@K-Master~]$start-hbasesh#启动Hbase[hadoop@K-Master~]$jps#查看进程22003HMaster10611SecondaryNameNode22226Jps21938HQuorumPeer10709JobTracker22154HRegionServer20277Main10432NameNode

"在古代,下列哪个姓氏可以不使用“免贵”的谦称

只有南张北孔可以不用免贵,不然其他都要。南张是因为张玉皇,既玉皇大帝,玉帝是民间信仰中的最高神明,主宰一切,因此玉帝的姓氏可以不用免贵北孔是因为孔圣人,孔子的儒家文化对中华历史文化产生了非常深远的影响,孔子也由人到圣进行了转化,因此孔圣人的姓氏也可以不用免贵。除了这两个姓氏,其他姓氏的回答的时候都要说,免贵姓。

mapreduce可以不使用hbase吗

1HBase如果加了列限定,如果该列不存在时返回的结果为empty

看下面的代码:

1

2

Get get = new Get(BytestoBytes("100"));

getaddColumn(BytestoBytes("info"), BytestoBytes("name"));

这里加入了列限定,也就是只返回列族info下面的name字段。但是如果name字段根本不存在,返回的Result在调用resultisEmpty()时则返回为true,也就是说就算其他字段存在,也什么都没返回来,包括rowkey也没有返回来。当然,如果是限定多个列,只要一个列存在就可以正常返回。所以需要注意。

2HBase在scan时指定的StartRow里面不能加-

看下面的代码:

1

2

3

4

Scan scan = new Scan();

scansetStartRow(BytestoBytes("3136947-"));

scansetSRow(BytestoBytes("3136947-" + 1));

我的本意是查询rowkey以 3136947- 开头的行,但是因为我的里面有一个-(“杠”),所以什么都没返回,去掉-后正常。这说明这里是不能使用-,-也并不是转义字符,转义后也还是scan不出来的。不知道其他字符是不是也不行,没有测试。 所以需要注意。

3HBase在scan时过滤掉指定列不存在的记录

如果想返回某个字段必须存在的行,不存在该字段的记录过滤掉不返回,方法如下:

1

2

3

4

5

6

7

8

9

10

11

12

Scan scan = new Scan();

scansetStartRow(BytestoBytes("3136947"));

scansetSRow(BytestoBytes("3136947" + 1));

scanaddColumn(BytestoBytes("info"),

BytestoBytes("name"));

SingleColumnValueFilter filter = new SingleColumnValueFilter(BytestoBytes("info"),

BytestoBytes("name"),

CompareFilterCompareOpNOT_EQUAL, BytestoBytes("0"));

filtersetFilterIfMissing(true);

scansetFilter(filter);

注意:如果是判断某个列是否存在,必须在addColumn里面加上该列,也就是必须返回的字段里面必须包含该列,否则也不会返回,因为在处理的时候是调用addColumn然后才会调用过滤器。

这里的过滤器里面指定该列的字段值必须不等于0(当然,如果你的name里有等于0的当然不能使用0),并且设置setFilterIfMissing为true,也就是设置为如果该列不存在就过滤掉这条数据,默认为false。

4利用MapReduce导出hbase数据

如果hbase作为数据的输出,job设置如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

Configuration conf = HBaseConfigurationcreate();

Scan scan = new Scan();

scansetStartRow(BytestoBytes("3136947"));

scansetSRow(BytestoBytes("3136947" + 1));

scanaddColumn(BytestoBytes("info"), BytestoBytes("name"));

scanaddFamily(UserStoreHelperFAMILY_INFO);

scanaddColumn(UserStoreHelperFAMILY_INFO, UserStoreHelperUSER_ID);

scanaddColumn(UserStoreHelperFAMILY_INFO, UserStoreHelperFRIENDS);

scanaddColumn(UserStoreHelperFAMILY_INFO, UserStoreHelperLEVEL_CODE);

final Job job = new Job(conf, "exportHBaseUser");

jobsetJarByClass(TestJobCreatorclass);

jobsetOutputFormatClass(TextOutputFormatclass);

FileOutputFormatsetOutputPath(job, new Path("test1"));

jobsetReducerClass(HbaseExportReduceclass);

jobsetPartitionerClass(UserPartitionerclass);

jobsetNumReduceTasks(14);

TableMapReduceUtilinitTableMapperJob(BytestoBytes("usertable"),

scan,

TestMapperclass,

Textclass,

NullWritableclass,

job);

在initTableMapperJob里面设置的map必须继承apachehadoophbasemapreduceTableMapper,并且最后两个设置的参数是自己定义的map的输出时的key和value的类型。

5利用mapReduce插入数据到HBase

如果hbase作为数据的输入。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

final Configuration conf = HBaseConfigurationcreate();

final Job job = new Job(conf, "Sync-To-HBase");

jobsetJarByClass(PostStoreExportHBaseJobCreatorclass);

我这里是以mongodb为输入

jobsetInputFormatClass(MongoInputFormatclass);

TableMapReduceUtilinitTableReducerJob("usertable", null, job);

把数据转换为hbase表格式的map

jobsetMapperClass(TestMapperclass);

直接入hbase库不需要reduce

jobsetNumReduceTasks(0);

这里map的输出必须是key为ImmutableBytesWritable,value为 Put

facetime mac端可以不使用wifi吗

必须是WIFI环境

做app可以不使用模板吗?

可以你可以自己创建自己的模板也可以使用自带的模板,都可以的

当然可以拉,app开发分为两种,一种是直接套用模仿来开发的,这种开发的好处就是开发时间短,开发费用低,不过弊端就是不能拓展功能;另一种就是定制开发类型的,这种app定制开发类型的好处就是可以按照自己的需求来定制开发,想要什么功能就开发什么功能,局限性小,不过相对模版app来说,价格就贵上很多。目前市场上比较好的app定制开发公司,如优百通、启汇网络等。

ubuntu可以不使用sudo命令吗

本来就不用啊。但是一些命令只有root才能执行的必须用它取得临时root权限

xp中病毒很多,因为别人只要获得一个用户权限就是超级用户就可以对你电脑做什么都可以。win7稍微又加入了一些linux里面的权限的概念。在ubuntu下,如果你不是root,那么,你在执行一些命令做一些 *** 作时是不允许的,可以在命令前面加上sudo rm -r

xbox one 使用skype可以不使用耳机么?

如果你xbox one有摄像头,是不需要耳机的,摄像头就有麦克风功能和音箱功能

"可用可不用"的英文怎么说如,"可以使用do,可以不使用do。"

这个动词可用可不用。

The verb may be expressed or understood

它是可用可不用的,有时用了分号会使句子更容易理解。

It's optional, but may on oasion make the sentence more understandable

PS可参考以上例句

Linux怎么可以不使用right ctrl

如果是Oracle VM VirtualBox那就是一种窗口模式,如果你用这种模式就不用每次去按热键RightCtrl了,而且一但设定除非你改回来,不然即使你关闭了系统下次再次进入系统还是有效的。

如何使用scala+spark读写Hbase

软件版本如下:

scala2118

spark210

hbase120

公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。

接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时 *** 作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的 *** 作api,势必速度回慢上许多。

关于批量 *** 作Hbase,一般我们都会用MapReduce来 *** 作,这样可以大大加快处理效率,原来也写过MR *** 作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。

整个流程如下:

(1)全量读取hbase表的数据

(2)做一系列的ETL

(3)把全量数据再写回hbase

核心代码如下:

//获取conf

val conf=HBaseConfigurationcreate() //设置读取的表

confset(TableInputFormatINPUT_TABLE,tableName) //设置写入的表

confset(TableOutputFormatOUTPUT_TABLE,tableName)//创建sparkConf

val sparkConf=new SparkConf() //设置spark的任务名

sparkConfsetAppName("read and write for hbase ") //创建spark上下文

val sc=new SparkContext(sparkConf)

//为job指定输出格式和输出表名

val newAPIJobConfiguration1 = JobgetInstance(conf)

newAPIJobConfiguration1getConfiguration()set(TableOutputFormatOUTPUT_TABLE, tableName)

newAPIJobConfiguration1setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

//全量读取hbase表

val rdd=scnewAPIHadoopRDD(conf,classOf[TableInputFormat]

,classOf[ImmutableBytesWritable]

,classOf[Result]

)

//过滤空数据,然后对每一个记录做更新,并转换成写入的格式

val final_rdd= rddfilter(checkNotEmptyKs)map(forDatas)

//转换后的结果,再次做过滤

val save_rdd=final_rddfilter(checkNull)

//最终在写回hbase表

save_rddsaveAsNewAPIHadoopDataset(newAPIJobConfiguration1getConfiguration)

scstop()

从上面的代码可以看出来,使用spark+scala *** 作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:

第一个:checkNotEmptyKs

作用:过滤掉空列簇的数据

def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f_2 val rowkey=BytestoString(rgetRow) val map:scalacollectionmutableMap[Array[Byte],Array[Byte]]= rgetFamilyMap(BytestoBytes("ks"))asScala if(mapisEmpty) false else true

}

第二个:forDatas

作用:读取每一条数据,做update后,在转化成写入 *** 作

def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f_2 //获取Result

val put:Put=new Put(rgetRow) //声明put

val ks=BytestoBytes("ks") //读取指定列簇

val map:scalacollectionmutableMap[Array[Byte],Array[Byte]]= rgetFamilyMap(ks)asScala

mapforeach(kv=>{//遍历每一个rowkey下面的指定列簇的每一列的数据做转化

val kid= BytestoString(kv_1)//知识点id

var value=BytestoString(kv_2)//知识点的value值

value="修改后的value"

putaddColumn(ks,kv_1,BytestoBytes(value)) //放入put对象

}

) if(putisEmpty) null else (new ImmutableBytesWritable(),put)

}

第三个:checkNull 作用:过滤最终结果里面的null数据

def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true

}

上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。

除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark *** 作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:

本文中的数据平台已迭代三个版本,从头开始遇到很多常见的难题,终于有片段时间整理一些已完善的文档,在此分享以供所需朋友的。实现参考,少走些弯路,在此篇幅中偏重于ES的优化,目前生产已存储百亿数据,性能良好,关于HBase,Hadoop的设计优化估计有很多文章可以参考,不再赘述。

项目背景:在一业务系统中,部分表每天的数据量过亿,已按天分表,但业务上受限于按天查询,并且DB中只能保留3个月的数据(硬件高配),分库代价较高。

改进版本目标:

谈到优化必须能了解组件的基本原理,才容易找到瓶颈所在,以免走多种弯路,先从ES的基础结构说起(如下图):

一些基本概念:

ES依赖一个重要的组件Lucene,关于数据结构的优化通常来说是对Lucene的优化,它是集群的一个存储于检索工作单元,结构如下图:

在Lucene中,分为索引(录入)与检索(查询)两部分,索引部分包含 分词器、过滤器、字符映射器 等,检索部分包含 查询解析器 等。一个Lucene索引包含多个segments,一个segment包含多个文档,每个文档包含多个字段,每个字段经过分词后形成一个或多个term。

通过Luke工具查看ES的lucene文件如下,主要增加了_id 和 _source字段:

Lucene 索引文件结构主要的分为:词典、倒排表、正向文件、DocValues等,如下图:

Lucene 随机三次磁盘读取比较耗时。其中fdt文件保存数据值损耗空间大,tim和doc则需要SSD存储提高随机读写性能。

另外一个比较消耗性能的是打分流程,不需要则可屏蔽。

关于DocValues:

倒排索引解决从词快速检索到相应文档ID, 但如果需要对结果进行排序、分组、聚合等 *** 作的时候则需要根据文档ID快速找到对应的值。

通过倒排索引代价缺很高:需迭代索引里的每个词项并收集文档的列里面 token。这很慢而且难以扩展:随着词项和文档的数量增加,执行时间也会增加。Solr docs对此的解释如下:

在lucene 40版本前通过FieldCache,原理是通过按列逆转倒排表将(field value ->doc)映射变成(doc -> field value)映射,问题为逐步构建时间长并且消耗大量内存,容易造成OOM。

DocValues是一种列存储结构,能快速通过文档ID找到相关需要排序的字段。在ES中,默认开启所有(除了标记需analyzed的字符串字段)字段的doc values,如果不需要对此字段做任何排序等工作,则可关闭以减少资源消耗。

ES中一个索引由一个或多个lucene索引构成,一个lucene索引由一个或多个segment构成,其中segment是最小的检索域。

数据具体被存储到哪个分片上:shard = hash(routing) % number_of_primary_shards

默认情况下 routing参数是文档ID (murmurhash3),可通过 URL中的 _routing 参数指定数据分布在同一个分片中,index和search的时候都需要一致才能找到数据,如果能明确根据_routing进行数据分区,则可减少分片的检索工作,以提高性能。

在我们的案例中,查询字段都是固定的,不提供全文检索功能,这也是几十亿数据能秒级返回的一个大前提:

1、ES仅提供字段的检索,仅存储HBase的Rowkey不存储实际数据。

2、实际数据存储在HBase中,通过Rowkey查询,如下图。

3、提高索引与检索的性能建议,可参考官方文档(如 >

hbase java是什么,让我们一起了解一下?

HBase是一个分布式的、面向列的开源数据库,具有高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。

如何使用JAVA语言 *** 作Hbase、整合Hbase?

可分为五步骤:

步骤1:新创建一个Java Project 。

步骤2:导入JAR包,在工程根目录下新建一个“lib”文件夹,将官方文档中的lib目录下的jar全部导入。

步骤3:修改开发机的hosts文件,在文件莫为增加一行虚拟机IP的映射信息。

步骤4:修改虚拟机的配置文件,修改虚拟机的设备名称,名称需要与之前两个配置文件的映射名称一致。

步骤5:实现查询、新建、删除等。

案例代码展示如下:

package hbase; import javaioIOException; import javautilArrayList; import javautilList; import orgapachehadoopconfConfiguration; import orgapachehadoophbaseCell; import orgapachehadoophbaseHBaseConfiguration; import orgapachehadoophbaseHColumnDescriptor; import orgapachehadoophbaseHTableDescriptor; import orgapachehadoophbaseTableName; import orgapachehadoophbaseclientAdmin; import orgapachehadoophbaseclientConnection; import orgapachehadoophbaseclientConnectionFactory; import orgapachehadoophbaseclientDelete; import orgapachehadoophbaseclientGet; import orgapachehadoophbaseclientPut; import orgapachehadoophbaseclientResult; import orgapachehadoophbaseclientResultScanner; import orgapachehadoophbaseclientScan; import orgapachehadoophbaseclientTable; import orgapachehadoophbaseexceptionsDeserializationException; import orgapachehadoophbasefilterFilter; import orgapachehadoophbasefilterSingleColumnValueFilter; import orgapachehadoophbasefilterCompareFilterCompareOp; import orgapachehadoophbaseutilBytes; import orgjunitBefore; import orgjunitTest; public class HBaseDemo { // 与HBase数据库的连接对象 Connection connection; // 数据库元数据 *** 作对象 Admin admin; @Before public void setUp() throws Exception { // 取得一个数据库连接的配置参数对象 Configuration conf = HBaseConfigurationcreate(); // 设置连接参数:HBase数据库所在的主机IP confset("hbasezookeeperquorum", "19216813713"); // 设置连接参数:HBase数据库使用的端口 confset("hbasezookeeperpropertyclientPort", "2181"); // 取得一个数据库连接对象 connection = ConnectionFactorycreateConnection(conf); // 取得一个数据库元数据 *** 作对象 admin = connectiongetAdmin(); } /      创建表     / public void createTable() throws IOException{ Systemoutprintln("---------------创建表 START-----------------"); // 数据表表名 String tableNameString = "t_book"; // 新建一个数据表表名对象 TableName tableName = TableNamevalueOf(tableNameString); // 如果需要新建的表已经存在 if(admintableExists(tableName)){ Systemoutprintln("表已经存在!"); } // 如果需要新建的表不存在 else{ // 数据表描述对象 HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); // 列族描述对象 HColumnDescriptor family= new HColumnDescriptor("base");; // 在数据表中新建一个列族 hTableDescriptoraddFamily(family); // 新建数据表 admincreateTable(hTableDescriptor); } Systemoutprintln("---------------创建表 END-----------------"); } /      查询整表数据     / @Test public void queryTable() throws IOException{ Systemoutprintln("---------------查询整表数据 START-----------------"); // 取得数据表对象 Table table = connectiongetTable(TableNamevalueOf("t_book")); // 取得表中所有数据 ResultScanner scanner = tablegetScanner(new Scan()); // 循环输出表中的数据 for (Result result : scanner) { byte[] row = resultgetRow(); Systemoutprintln("row key is:" + new String(row)); List  listCells = resultlistCells(); for (Cell cell : listCells) { byte[] familyArray = cellgetFamilyArray(); byte[] qualifierArray = cellgetQualifierArray(); byte[] valueArray = cellgetValueArray(); Systemoutprintln("row value is:" + new String(familyArray) + new String(qualifierArray) + new String(valueArray)); } } Systemoutprintln("---------------查询整表数据 END-----------------"); } /      按行键查询表数据     / @Test public void queryTableByRowKey() throws IOException{ Systemoutprintln("---------------按行键查询表数据 START-----------------"); // 取得数据表对象 Table table = connectiongetTable(TableNamevalueOf("t_book")); // 新建一个查询对象作为查询条件 Get get = new Get("row8"getBytes()); // 按行键查询数据 Result result = tableget(get); byte[] row = resultgetRow(); Systemoutprintln("row key is:" + new String(row)); List  listCells = resultlistCells(); for (Cell cell : listCells) { byte[] familyArray = cellgetFamilyArray(); byte[] qualifierArray = cellgetQualifierArray(); byte[] valueArray = cellgetValueArray(); Systemoutprintln("row value is:" + new String(familyArray) + new String(qualifierArray) + new String(valueArray)); } Systemoutprintln("---------------按行键查询表数据 END-----------------"); } /      按条件查询表数据     / @Test public void queryTableByCondition() throws IOException{ Systemoutprintln("---------------按条件查询表数据 START-----------------"); // 取得数据表对象 Table table = connectiongetTable(TableNamevalueOf("t_book")); // 创建一个查询过滤器 Filter filter = new SingleColumnValueFilter(BytestoBytes("base"), BytestoBytes("name"), CompareOpEQUAL, BytestoBytes("bookName6")); // 创建一个数据表扫描器 Scan scan = new Scan(); // 将查询过滤器加入到数据表扫描器对象 scansetFilter(filter); // 执行查询 *** 作,并取得查询结果 ResultScanner scanner = tablegetScanner(scan); // 循环输出查询结果 for (Result result : scanner) { byte[] row = resultgetRow(); Systemoutprintln("row key is:" + new String(row)); List  listCells = resultlistCells(); for (Cell cell : listCells) { byte[] familyArray = cellgetFamilyArray(); byte[] qualifierArray = cellgetQualifierArray(); byte[] valueArray = cellgetValueArray(); Systemoutprintln("row value is:" + new String(familyArray) + new String(qualifierArray) + new String(valueArray)); } } Systemoutprintln("---------------按条件查询表数据 END-----------------"); } /      清空表     / @Test public void truncateTable() throws IOException{ Systemoutprintln("---------------清空表 START-----------------"); // 取得目标数据表的表名对象 TableName tableName = TableNamevalueOf("t_book"); // 设置表状态为无效 admindisableTable(tableName); // 清空指定表的数据 admintruncateTable(tableName, true); Systemoutprintln("---------------清空表 End-----------------"); } /      删除表     / @Test public void deleteTable() throws IOException{ Systemoutprintln("---------------删除表 START-----------------"); // 设置表状态为无效 admindisableTable(TableNamevalueOf("t_book")); // 删除指定的数据表 admindeleteTable(TableNamevalueOf("t_book")); Systemoutprintln("---------------删除表 End-----------------"); } /      删除行     / @Test public void deleteByRowKey() throws IOException{ Systemoutprintln("---------------删除行 START-----------------"); // 取得待 *** 作的数据表对象 Table table = connectiongetTable(TableNamevalueOf("t_book")); // 创建删除条件对象 Delete delete = new Delete(BytestoBytes("row2")); // 执行删除 *** 作 tabledelete(delete); Systemoutprintln("---------------删除行 End-----------------"); } /      删除行(按条件)     / @Test public void deleteByCondition() throws IOException, DeserializationException{ Systemoutprintln("---------------删除行(按条件) START-----------------"); // 步骤1:调用queryTableByCondition()方法取得需要删除的数据列表 // 步骤2:循环步骤1的查询结果,对每个结果调用deleteByRowKey()方法 Systemoutprintln("---------------删除行(按条件) End-----------------"); } /      新建列族     / @Test public void addColumnFamily() throws IOException{ Systemoutprintln("---------------新建列族 START-----------------"); // 取得目标数据表的表名对象 TableName tableName = TableNamevalueOf("t_book"); // 创建列族对象 HColumnDescriptor columnDescriptor = new HColumnDescriptor("more"); // 将新创建的列族添加到指定的数据表 adminaddColumn(tableName, columnDescriptor); Systemoutprintln("---------------新建列族 END-----------------"); } /      删除列族     / @Test public void deleteColumnFamily() throws IOException{ Systemoutprintln("---------------删除列族 START-----------------"); // 取得目标数据表的表名对象 TableName tableName = TableNamevalueOf("t_book"); // 删除指定数据表中的指定列族 admindeleteColumn(tableName, "more"getBytes()); Systemoutprintln("---------------删除列族 END-----------------"); } /      插入数据     / @Test public void insert() throws IOException{ Systemoutprintln("---------------插入数据 START-----------------"); // 取得一个数据表对象 Table table = connectiongetTable(TableNamevalueOf("t_book")); // 需要插入数据库的数据集合 List  putList = new ArrayList (); Put put; // 生成数据集合 for(int i = 0; i 

package Common;

import javaioFileInputStream;

import javaioFileNotFoundException;

import javaioFileOutputStream;

import javaioIOException;

import javautilArrayList;

import javautilHashMap;

import javautilList;

import javautilMap;

import orgapachepoihssfusermodelHSSFWorkbook;

import orgapachepoipoifsfilesystemPOIFSFileSystem;

import orgapachepoissusermodel;

/

@author LFF

@version 05 Excel文件 *** 作帮助类

/

public class ExcelPOIHelper {

// D盘建一个空的workbookxls文件

public static void Create(String path, String name) {

Workbook wb = new HSSFWorkbook();

FileOutputStream fileOut;

try {

fileOut = new FileOutputStream("D:/workbookxls");

wbwrite(fileOut);

fileOutclose();

} catch (FileNotFoundException e) {

eprintStackTrace();

} catch (IOException e) {

eprintStackTrace();

}

}

/

取出Excel所有工作簿名

@param fullPath

Excel文件完整地址("D:/workbookxls")

@return 工作簿名列表

/

public static List<String> GetSheets(String fullPath) {

List<String> result = new ArrayList<String>();

try {

FileInputStream file = new FileInputStream(fullPath);

POIFSFileSystem ts = new POIFSFileSystem(file);

Workbook workbook = new HSSFWorkbook(ts);

for (int i = 0; i < workbookgetNumberOfSheets(); i++) {

String sheetName = workbookgetSheetName(i);

resultadd(i, sheetName);

}

fileclose();

} catch (FileNotFoundException e) {

eprintStackTrace();

} catch (IOException e) {

eprintStackTrace();

}

return result;

}

/

取工作簿中所有的行

@param fullPath

Excel文件完整地址("D:/workbookxls")

@param sheetName

工作簿名

@return 键值对:<RowKey,<ColumnName, Value>>

/

public static Map<String, List<Map<String, String>>> GetRows(

String fullPath, String sheetName) {

Map<String, List<Map<String, String>>> resultRow = new HashMap<String, List<Map<String, String>>>();

List<Map<String, String>> resultCells;

Map<String, String> resultCell;

try {

FileInputStream file = new FileInputStream(fullPath);

POIFSFileSystem ts = new POIFSFileSystem(file);

Workbook workbook = new HSSFWorkbook(ts);

Sheet sheet = workbookgetSheet(sheetName);

int rowCounts = sheetgetPhysicalNumberOfRows();// 行数

int columnCounts = sheetgetRow(0)getPhysicalNumberOfCells(); // 列数

for (int i = 1; i < rowCounts; i++) {

Row row = sheetgetRow(i);// 循环取第一行之后的每一行

rowgetCell(0)setCellType(CellCELL_TYPE_STRING);

resultCells = new ArrayList<Map<String, String>>();

resultCell = new HashMap<String, String>();

String rowKey = rowgetCell(0)toString();

for (int j = 1; j < columnCounts; j++) {

Cell cell = rowgetCell(j);// 循环取第一列之后的每一列

if (null != cell) {

cellsetCellType(CellCELL_TYPE_STRING);

String columnName = sheetgetRow(0)getCell(j)

toString();

String cellValue = celltoString();

resultCellput(columnName, cellValue);

}

}

resultCellsadd(resultCell);

resultRowput(rowKey, resultCells);

}

fileclose();

} catch (FileNotFoundException e) {

eprintStackTrace();

} catch (IOException e) {

eprintStackTrace();

}

return resultRow;

}

}

以上就是关于spark1.2.1实现读取hbase的数据后怎么实现实时查询全部的内容,包括:spark1.2.1实现读取hbase的数据后怎么实现实时查询、"在古代,下列哪个姓氏可以不使用“免贵”的谦称、如何使用scala+spark读写hbase等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9266730.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-26
下一篇 2023-04-26

发表评论

登录后才能评论

评论列表(0条)

保存