你的job类并没有设置outputformat,如需要输出到数据库,需要特定的输出如下:
job.setJarByClass(TextCheckerJob.class)job.setMapperClass(TextMapper.class)
job.setReducerClass(TextReduce.class)
job.setInputFormatClass(TextInputFormat.class)
job.setOutputFormatClass(MysqlDBOutputFormat.class)
job.setMapOutputKeyClass(Text.class)
job.setMapOutputValueClass(Text.class)
job.setOutputKeyClass(Text.class)
job.setOutputValueClass(Text.class)
job.setNumReduceTasks(1)
MysqlDBOutputFormat.setOutput(job, "data_chck_result", new String[] { "tblName", "colName", "wrongValue", "count" })
同时,job初始化之前,你需要把连接数据库的信息写入conf中,如下
conf.set("mapreduce.jdbc.driver.class", clazz)conf.set("mapreduce.jdbc.username", username)
conf.set("mapreduce.jdbc.password", password)
conf.set("mapreduce.jdbc.url", url)
上面的MysqlDBOutputFormat类是我自己重写了,你可以直接使用DBOutputFormat这个类
我们的一些应用程序中,常常避免不了要与数据库进行交互,而在我们的hadoop中,有时候也需要和数据库进行交互,比如说,数据分析的结果存入数据库,或者是,读取数据库的信息写入HDFS上,不过直接使用MapReduce *** 作数据库,这种情况在现实开发还是比较少,一般我们会采用Sqoop来进行数
据的迁入,迁出,使用Hive分析数据集,大多数情况下,直接使用Hadoop访问关系型数据库,可能产生比较大的数据访问压力,尤其是在数据库还是单机
的情况下,情况可能更加糟糕,在集群的模式下压力会相对少一些。
那么,今天散仙就来看下,如何直接使用Hadoop1.2.0的MR来读写 *** 作数据库,hadoop的API提供了DBOutputFormat和
DBInputFormat这两个类,来进行与数据库交互,除此之外,我们还需要定义一个类似JAVA
Bean的实体类,来与数据库的每行记录进行对应,通常这个类要实现Writable和DBWritable接口,来重写里面的4个方法以对应获取每行记
首先在本地机器上安装并设置MongoDB服务。从Mongo网站上下载MongoDB,解压到本地目录,比如C:>Mongo
在上一个文件夹内创建数据目录。比如:C:\Mongo\Data
如果数据文件存放在其他地方,那么在用mongod.exe命令启动MongoDB时,需要在命令行加参数—-dbpath
启动服务
MongoDB提供了两种方式:mongod.exe以后台进程启动;mongo.exe启动命令行界面,可做管理 *** 作。这两个可执行文件都位于Mongo\bin目录下;
进入Mongo安装目录的bin目录下,比如:C:>cd Mongo\bin
有两种启动方式,如下:
mongod.exe –dbpath C:\Mongo\data
或者
mongod.exe –config mongodb.config
mongodb.config是Mongo\bin目录下的配置文件,需要在此配置文件中指定数据目录(比如,dbpath= C:\Mongo\Data)的位置。
连接到MongoDB,到这一步,mongo后台服务已经启动,可以通过http://localhost:27017查看。 MongoDB启动运行后,我们接下来看它的聚合函数。
实现聚合函数
在关系数据库中,我们可以在数值型字段上执行包含预定义聚合函数的SQL语句,比如,SUM()、COUNT()、MAX()和MIN()。但是在MongoDB中,需要通过MapReduce功能来实现聚合以及批处理,它跟SQL里用来实现聚合的GROUP BY从句比较类似。下一节将描述关系数据库中SQL方式实现的聚合和相应的通过MongoDB提供的MapReduce实现的聚合。
为了讨论这个主题,我们考虑如下所示的Sales表,它以MongoDB中的反范式形式呈现。
Sales表
#
列名
数据类型
1
OrderId
INTEGER
2
OrderDate
STRING
3
Quantity
INTEGER
4
SalesAmt
DOUBLE
5
Profit
DOUBLE
6
CustomerName
STRING
7
City
STRING
8
State
STRING
9
ZipCode
STRING
10
Region
STRING
11
ProductId
INTEGER
12
ProductCategory
STRING
13
ProductSubCategory
STRING
14
ProductName
STRING
15
ShipDate
STRING
基于SQL和MapReduce的实现
我们提供了一个查询的样例集,这些查询使用聚合函数、过滤条件和分组从句,及其等效的MapReduce实现,即MongoDB实现SQL中GROUP BY的等效方式。在MongoDB存储的文档上执行聚合 *** 作非常有用,这种方式的一个限制是聚合函数(比如,SUM、AVG、MIN、MAX)需要通过mapper和reducer函数来定制化实现。
MongoDB没有原生态的用户自定义函数(UDFs)支持。但是它允许使用db.system.js.save命令来创建并保存JavaScript函数,JavaScript函数可以在MapReduce中复用。下表是一些常用的聚合函数的实现。稍后,我们会讨论这些函数在MapReduce任务中的使用。
聚合函数
Javascript 函数
SUM
db.system.js.save( { _id : "Sum" ,
value : function(key,values)
{
var total = 0
for(var i = 0i <values.lengthi++)
total += values[i]
return total
}})
AVERAGE
db.system.js.save( { _id : "Avg" ,
value : function(key,values)
{
var total = Sum(key,values)
var mean = total/values.length
return mean
}})
MAX
db.system.js.save( { _id : "Max" ,
value : function(key,values)
{
var maxValue=values[0]
for(var i=1i
MIN
db.system.js.save( { _id : "Min" ,
value : function(key,values)
{
var minValue=values[0]
for(var i=1i
VARIANCE
db.system.js.save( { _id : "Variance" ,
value : function(key,values)
{
var squared_Diff = 0
var mean = Avg(key,values)
for(var i = 0i <values.lengthi++)
{
var deviation = values[i] - mean
squared_Diff += deviation * deviation
}
var variance = squared_Diff/(values.length)
return variance
}})
STD DEVIATION
db.system.js.save( { _id : "Standard_Deviation"
, value : function(key,values)
{
var variance = Variance(key,values)
return Math.sqrt(variance)
}})
SQL和MapReduce脚本在四种不同的用例场景中实现聚合函数的代码片段如下表所示。
1.各地区的平均订单量
下面的查询是用来获取不同地区的平均订单量。
SQL Query
MapReduce Functions
SELECT
db.sales.runCommand(
{
mapreduce : "sales" ,
City,
State,
Region,
map:function()
{ // emit function handles the group by
emit( {
// Key
city:this.City,
state:this.State,
region:this.Region},
// Values
this.Quantity)
},
AVG(Quantity)
reduce:function(key,values)
{
var result = Avg(key, values)
return result
}
FROM sales
GROUP BY City, State, Region
// Group By is handled by the emit(keys, values)
line in the map() function above
out : { inline : 1 } })
2.产品的分类销售总额
下面的查询是用来获取产品的分类销售额,根据产品类别的层级分组。在下面例子中,不同的产品类别作为个体维度,它们也可以被称为更复杂的基于层次的维度。
SQL 查询
MapReduce 函数
SELECT
db.sales.runCommand(
{
mapreduce : "sales" ,
ProductCategory, ProductSubCategory, ProductName,
map:function()
{
emit(
// Key
{key0:this.ProductCategory,
key1:this.ProductSubCategory,
key2:this.ProductName},
// Values
this.SalesAmt)
},
SUM(SalesAmt)
reduce:function(key,values)
{
var result = Sum(key, values)
return result
}
FROM sales
GROUP BY ProductCategory, ProductSubCategory, ProductName
// Group By is handled by the emit(keys, values)
line in the map() function above
out : { inline : 1 } })
3. 一种产品的最大利润
下面的查询是用来获取一个给定产品基于过滤条件的最大利润。
SQL查询
MapReduce 函数
SELECT
db.sales.runCommand(
{
mapreduce : "sales" ,
ProductId, ProductName,
map:function()
{
if(this.ProductId==1)
emit( {
key0:this.ProductId,
key1:this.ProductName},
this.Profit)
},
MAX(SalesAmt)
reduce:function(key,values)
{
var maxValue=Max(key,values)
return maxValue
}
FROM sales
WHERE ProductId=’1’
// WHERE condition implementation is provided in
map() function
GROUP BY ProductId, ProductName
// Group By is handled by the emit(keys, values)
line in the map() function above
out : { inline : 1 } })
4. 总量、总销售额、平均利润
这个场景的需求是计算订单的总数、总销售额和平均利润,订单ID在1到10之间,发货时间在2011年的1月1日到12月31日之间。下面的查询是用来执行多个聚合,比如,在指定年份以及指定的不同区域和产品类别范围里订单的总数、总销售额和平均利润。
SQL 查询
MapReduce 函数
SELECT
db.sales.runCommand(
{ mapreduce : "sales" ,
Region,
ProductCategory,
ProductId,
map:function()
{
emit( {
// Keys
region:this.Region,
productCategory:this.ProductCategory,
productid:this.ProductId},
// Values
{quantSum:this.Quantity,
salesSum:this.SalesAmt,
avgProfit:this.Profit} )
}
Sum(Quantity),
Sum(Sales),
Avg(Profit)
reduce:function(key,values)
{
var result=
{quantSum:0,salesSum:0,avgProfit:0}
var count = 0
values.forEach(function(value)
{
// Calculation of Sum(Quantity)
result.quantSum += values[i].quantSum
// Calculation of Sum(Sales)
result.salesSum += values[i].salesSum
result.avgProfit += values[i].avgProfit
count++
}
// Calculation of Avg(Profit)
result.avgProfit = result.avgProfit / count
return result
},
FROM Sales
WHERE
Orderid between 1 and 10 AND
Shipdate BETWEEN ‘01/01/2011’ and
‘12/31/2011’
query : {
"OrderId" : { "$gt" : 1 },
"OrderId" : { "$lt" : 10 },
"ShipDate" : { "$gt" : "01/01/2011" },
"ShipDate" : { "$lt" : "31/12/2011" },
},
GROUP BY
Region, ProductCategory, ProductId
// Group By is handled by the emit(keys, values)
line in the map() function above
LIMIT 3
limit : 3,
out : { inline : 1 } })
既然我们已经看了在不同业务场景下的聚合函数的代码示例,接下来我们准备来测试这些函数。
测试聚合函数
MongoDB的MapReduce功能通过数据库命令来调用。Map和Reduce函数在前面章节里已经使用JavaScript实现。下面是执行MapReduce函数的语法。
db.runCommand(
{ mapreduce : <collection>,
map : <mapfunction>,
reduce : <reducefunction>
[, query : <query filter object>]
[, sort : <sorts the input objects using this key. Useful for
optimization, like sorting by the emit key for fewer reduces>]
[, limit : <number of objects to return from collection>]
[, out : <see output options below>]
[, keeptemp: <true|false>]
[, finalize : <finalizefunction>]
[, scope : <object where fields go into javascript global scope >]
[, jsMode : true]
[, verbose : true]
}
)
Where the Output Options include:
{ replace : "collectionName" }
{ merge : "collectionName"
{ reduce : "collectionName" }
{ inline : 1}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)