package com.qin.operadb
import java.io.DataInput
import java.io.DataOutput
import java.io.IOException
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.lib.db.DBWritable
/***
* 封装数据库实体信息
* 的记录
*
* 搜索大数据技术交流群:376932160
*
* **/
public class PersonRecoder implements Writable,DBWritable {
public String name//对应数据库中的name字段
public int age//对应数据库中的age字段
@Override
public void readFields(ResultSet result) throws SQLException {
this.id=result.getInt(1)
this.name=result.getString(2)
this.age=result.getInt(3)
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, id)
stmt.setString(2, name)
stmt.setInt(3, age)
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
this.id=arg0.readInt()
this.name=Text.readString(arg0)
this.age=arg0.readInt()
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(id)
Text.writeString(out, this.name)
out.writeInt(this.age)
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "id: "+id+" 年龄: "+age+" 名字:"+name
}
}
</pre>
MR类的定义代码,注意是一个Map Only作业:
<pre name="code" class="java">package com.qin.operadb
import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.lib.IdentityReducer
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
public class ReadMapDB {
/**
* Map作业读取数据记录数
*
* **/
private static class DBMap extends Mapper&ltLongWritable, PersonRecoder , LongWritable, Text&gt{
@Override
protected void map(LongWritable key, PersonRecoder value,Context context)
throws IOException, InterruptedException {
context.write(new LongWritable(value.id), new Text(value.toString()))
}
}
public static void main(String[] args)throws Exception {
JobConf conf=new JobConf(ReadMapDB.class)
//Configuration conf=new Configuration()
// conf.set("mapred.job.tracker","192.168.75.130:9001")
//读取person中的数据字段
// conf.setJar("tt.jar")
//注意这行代码放在最前面,进行初始化,否则会报
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.211.36:3306/test", "root", "qin")
/**要读取的字段信息**/
String fileds[]=new String[]{"id","name","age"}
/**Job任务**/
Job job=new Job(conf, "readDB")
System.out.println("模式: "+conf.get("mapred.job.tracker"))
/**设置数据库输入格式的一些信息**/
DBInputFormat.setInput(job, PersonRecoder.class, "person", null, "id", fileds)
/***设置输入格式*/
job.setInputFormatClass(DBInputFormat.class)
job.setOutputKeyClass(LongWritable.class)
job.setOutputValueClass(Text.class)
job.setMapperClass(DBMap.class)
String path="hdfs://192.168.75.130:9000/root/outputdb"
FileSystem fs=FileSystem.get(conf)
Path p=new Path(path)
if(fs.exists(p)){
fs.delete(p, true)
System.out.println("输出路径存在,已删除!")
}
FileOutputFormat.setOutputPath(job,p )
System.exit(job.waitForCompletion(true) ? 0 : 1)
}
}
hadoop不支持数据的随机读写是因为其效率低下。Hadoop并非不支持随机读写,它也支持。但效率低下,并且存在很多问题需要开发人员在自己的程序解决。hadoop设计的初衷是大规模数据的计算和olap分析,应用场景区别与数据库,因此在HDFS设计时候就侧重在一次写入多次读取。hadoop写入出现重复数据hadoop写入出现重复数据的原因有很多,主要有以下几点:
1、数据源中存在重复数据:如果数据源中存在重复数据,那么hadoop在读取数据时就会出现重复数据。
2、文件写入失败:如果在写入文件的过程中出现了失败,那么文件中的数据可能会重复。
3、程序错误:如果程序出现了错误,那么可能会出现重复的数据。
4、数据库中的重复:如果数据库中的数据有重复,那么hadoop读取数据时也会出现重复的数据。
解决重复数据的方法:
1、检查数据源,确保数据源中没有重复的数据;
2、在写入文件之前,先检查文件是否存在,如果存在,则先删除文件;
3、修改程序,检查程序是否出现了错误;
4、检查数据库中的数据,确保没有重复的数据。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)