请教如何在hadoop获取数据

请教如何在hadoop获取数据,第1张

我们的一些应用程序中,常常避免不了要与数据进行交互,而在我们的hadoop中,有时候也需要和数据库进行交互,比如说,数据分析的结果存入数据库,

或者是,读取数据库的信息写入HDFS上,不过直接使用MapReduce *** 作数据库,这种情况在现实开发还是比较少,一般我们会采用Sqoop来进行数

据的迁入,迁出,使用Hive分析数据集,大多数情况下,直接使用Hadoop访问关系型数据库,可能产生比较大的数据访问压力,尤其是在数据库还是单机

的情况下,情况可能更加糟糕,在集群的模式下压力会相对少一些。

那么,今天散仙就来看下,如何直接使用Hadoop1.2.0的MR来读写 *** 作数据库,hadoop的API提供了DBOutputFormat和

DBInputFormat这两个类,来进行与数据库交互,除此之外,我们还需要定义一个类似JAVA

Bean的实体类,来与数据库的每行记录进行对应,通常这个类要实现Writable和DBWritable接口,来重写里面的4个方法以对应获取每行记

MaxCompute(原ODPS)有一套自己的MapReduce编程模型和接口,简单说来,这套接口的输入输出都是MaxCompute中的Table,处理的数据是以Record为组织形式的,它可以很好地描述Table中的数据处理过程,然而与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果要将原来的Hadoop MR作业迁移到MaxCompute的MR执行,需要重写MR的代码,使用MaxCompute的接口进行编译和调试,运行正常后再打成一个Jar包才能放到MaxCompute的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的Hadoop MR代码就能在MaxCompute平台上跑起来,将是一个比较理想的方式。

现在MaxCompute平台提供了一个HadoopMR到MaxCompute MR的适配工具,已经在一定程度上实现了Hadoop MR作业的二进制级别的兼容,即用户可以在不改代码的情况下通过指定一些配置,就能将原来在Hadoop上运行的MR jar包拿过来直接跑在MaxCompute上。目前该插件处于测试阶段,暂时还不能支持用户自定义comparator和自定义key类型,下面将以WordCount程序为例,介绍一下这个插件的基本使用方式。

使用该插件在MaxCompute平台跑一个HadoopMR作业的基本步骤如下:

1. 下载HadoopMR的插件

下载插件,包名为hadoop2openmr-1.0.jar,注意,这个jar里面已经包含hadoop-2.7.2版本的相关依赖,在作业的jar包中请不要携带hadoop的依赖,避免版本冲突。

2. 准备好HadoopMR的程序jar包

编译导出WordCount的jar包:wordcount_test.jar ,wordcount程序的源码如下:

package com.aliyun.odps.mapred.example.hadoop

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.Path

import org.apache.hadoop.io.IntWritable

import org.apache.hadoop.io.Text

import org.apache.hadoop.mapreduce.Job

import org.apache.hadoop.mapreduce.Mapper

import org.apache.hadoop.mapreduce.Reducer

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

import java.io.IOException

import java.util.StringTokenizer

public class WordCount {

public static class TokenizerMapper

extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1)

private Text word = new Text()

public void map(Object key, Text value, Context context

) throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString())

while (itr.hasMoreTokens()) {

word.set(itr.nextToken())

context.write(word, one)

}

}

}

public static class IntSumReducer

extends Reducer<Text,IntWritable,Text,IntWritable>{

private IntWritable result = new IntWritable()

public void reduce(Text key, Iterable<IntWritable>values,

Context context

) throws IOException, InterruptedException {

int sum = 0

for (IntWritable val : values) {

sum += val.get()

}

result.set(sum)

context.write(key, result)

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration()

Job job = Job.getInstance(conf, "word count")

job.setJarByClass(WordCount.class)

job.setMapperClass(TokenizerMapper.class)

job.setCombinerClass(IntSumReducer.class)

job.setReducerClass(IntSumReducer.class)

job.setOutputKeyClass(Text.class)

job.setOutputValueClass(IntWritable.class)

FileInputFormat.addInputPath(job, new Path(args[0]))

FileOutputFormat.setOutputPath(job, new Path(args[1]))

System.exit(job.waitForCompletion(true) ? 0 : 1)

}

}

3. 测试数据准备

创建输入表和输出表

create table if not exists wc_in(line string)

create table if not exists wc_out(key string, cnt bigint)

通过tunnel将数据导入输入表中

待导入文本文件data.txt的数据内容如下:

hello maxcompute

hello mapreduce

例如可以通过如下命令将data.txt的数据导入wc_in中,

tunnel upload data.txt wc_in

4. 准备好表与hdfs文件路径的映射关系配置

配置文件命名为:wordcount-table-res.conf

{

"file:/foo": {

"resolver": {

"resolver": "c.TextFileResolver",

"properties": {

"text.resolver.columns.combine.enable": "true",

"text.resolver.seperator": "\t"

}

},

"tableInfos": [

{

"tblName": "wc_in",

"partSpec": {},

"label": "__default__"

}

],

"matchMode": "exact"

},

"file:/bar": {

"resolver": {

"resolver": "openmr.resolver.BinaryFileResolver",

"properties": {

"binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",

"binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"

}

},

"tableInfos": [

{

"tblName": "wc_out",

"partSpec": {},

"label": "__default__"

}

],

"matchMode": "fuzzy"

}

}

MR是一个多义词,所代表的意思分别是:

1、MR指的是英文缩写:

MR,作为英文缩写和简写有多种含义,可表示MR = Match Record 赛会纪录,MR = memory read,存储器读出。另在移动通信领域还可表示测量报告,另有香港新晋乐MR,在hadoop中代表Map Reduce的简写。

2、MR指的是介导现实:

介导现实由智能硬件之父多伦多大学教授SteveMann提出的介导现实,全称Mediated Reality,简称MR。

3、MR指的是边际收益:

边际收益是指增加一单位产品的销售所增加的收益,即最后一单位产品的售出所取得的收益。它可以是正值或负值。边际收益是厂商分析中的重要概念。

4、MR指的是移动通信:

MR是指信息在业务信道上每480ms发送一次数据,这些数据可用于网络评估和优化。

5、MR指的是混合现实:

混合现实是一组技术组合,不仅提供新的观看方法,还提供新的输入方法,而且所有方法相互结合,从而推动创新 。输入和输出的结合对中小型企业而言是关键的差异化优势。

参考资料来源:百度百科—MR

参考资料来源:百度百科—MR

参考资料来源:百度百科—MR

参考资料来源:百度百科—MR

参考资料来源:百度百科—MR


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/9972839.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-03
下一篇 2023-05-03

发表评论

登录后才能评论

评论列表(0条)

保存