一
hadoop.tmp.dir
file:/usr/local/hadoop/tmp
Abase for other temporary directories.
fs.defaultFS
hdfs://localhost:9000
dfs.replication
1
dfs.namenode.name.dir
file:/usr/local/hadoop/tmp/dfs/name
dfs.datanode.data.dir
file:/usr/local/hadoop/tmp/dfs/data
二
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
/**
* 过滤掉文件名满足特定条件的文件
*/
class MyPathFilter implements PathFilter {
String reg = null;
MyPathFilter(String reg) {
this.reg = reg;
}
public boolean accept(Path path) {
if (!(path.toString().matches(reg)))
return true;
return false;
}
}
/***
* 利用FSDataOutputStream和FSDataInputStream合并HDFS中的文件
*/
public class MergeFile {
Path inputPath = null; //待合并的文件所在的目录的路径
Path outputPath = null; //输出文件的路径
public MergeFile(String input, String output) {
this.inputPath = new Path(input);
this.outputPath = new Path(output);
}
public void doMerge() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://localhost:9000");
conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()), conf);
FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
//下面过滤掉输入目录中后缀为.abc的文件
FileStatus[] sourceStatus = fsSource.listStatus(inputPath,
new MyPathFilter(".*\\.abc"));
FSDataOutputStream fsdos = fsDst.create(outputPath);
PrintStream ps = new PrintStream(System.out);
//下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
for (FileStatus sta : sourceStatus) {
//下面打印后缀不为.abc的文件的路径、文件大小
System.out.print("路径:" + sta.getPath() + " 文件大小:" + sta.getLen()
+ " 权限:" + sta.getPermission() + " 内容:");
FSDataInputStream fsdis = fsSource.open(sta.getPath());
byte[] data = new byte[1024];
int read = -1;
while ((read = fsdis.read(data)) > 0) {
ps.write(data, 0, read);
fsdos.write(data, 0, read);
}
fsdis.close();
}
ps.close();
fsdos.close();
}
public static void main(String[] args) throws IOException {
MergeFile merge = new MergeFile(
"hdfs://localhost:9000/user/hadoop/",
"hdfs://localhost:9000/user/hadoop/merge.txt");
merge.doMerge();
}
}
三
hbase.rootdir
hdfs://localhost:9000/hbase
hbase.cluster.distributed
true
hbase.unsafe.stream.capability.enforce
false
1.import org.apache.hadoop.conf.Configuration;
2.import org.apache.hadoop.hbase.*;
3.import org.apache.hadoop.hbase.client.*;
4.import org.apache.hadoop.hbase.util.Bytes;
5.
6.import java.io.IOException;
7.public class ExampleForHBase {
8. public static Configuration configuration;
9. public static Connection connection;
10. public static Admin admin;
11. public static void main(String[] args)throws IOException{
12. init();
13. createTable("student",new String[]{"score"});
14. insertData("student","zhangsan","score","English","69");
15. insertData("student","zhangsan","score","Math","86");
16. insertData("student","zhangsan","score","Computer","77");
17. getData("student", "zhangsan", "score","English");
18. close();
19. }
20.
21. public static void init(){
22. configuration = HBaseConfiguration.create();
23. configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
24. try{
25. connection = ConnectionFactory.createConnection(configuration);
26. admin = connection.getAdmin();
27. }catch (IOException e){
28. e.printStackTrace();
29. }
30. }
31.
32. public static void close(){
33. try{
34. if(admin != null){
35. admin.close();
36. }
37. if(null != connection){
38. connection.close();
39. }
40. }catch (IOException e){
41. e.printStackTrace();
42. }
43. }
44.
45. public static void createTable(String myTableName,String[] colFamily) throws IOException {
46. TableName tableName = TableName.valueOf(myTableName);
47. if(admin.tableExists(tableName)){
48. System.out.println("talbe is exists!");
49. }else {
50. TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
51. for(String str:colFamily){
52. ColumnFamilyDescriptor family =
53.ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build();
54. tableDescriptor.setColumnFamily(family);
55. }
56. admin.createTable(tableDescriptor.build());
57. }
58. }
59.
60. public static void insertData(String tableName,String rowKey,String colFamily,String col,String val) throws IOException {
61. Table table = connection.getTable(TableName.valueOf(tableName));
62. Put put = new Put(rowKey.getBytes());
63. put.addColumn(colFamily.getBytes(),col.getBytes(), val.getBytes());
64. table.put(put);
65. table.close();
66. }
67.
68. public static void getData(String tableName,String rowKey,String colFamily, String col)throws IOException{
69. Table table = connection.getTable(TableName.valueOf(tableName));
70. Get get = new Get(rowKey.getBytes());
71. get.addColumn(colFamily.getBytes(),col.getBytes());
72. Result result = table.get(get);
73. System.out.println(new String(result.getValue(colFamily.getBytes(),col==null?null:col.getBytes())));
74. table.close();
75. }
76.}
四
1.import java.io.IOException;
2.import java.util.Iterator;
3.import java.util.StringTokenizer;
4.import org.apache.hadoop.conf.Configuration;
5.import org.apache.hadoop.fs.Path;
6.import org.apache.hadoop.io.IntWritable;
7.import org.apache.hadoop.io.Text;
8.import org.apache.hadoop.mapreduce.Job;
9.import org.apache.hadoop.mapreduce.Mapper;
10.import org.apache.hadoop.mapreduce.Reducer;
11.import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12.import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13.import org.apache.hadoop.util.GenericOptionsParser;
14.public class WordCount {
15. public WordCount() {
16. }
17. public static void main(String[] args) throws Exception {
18. Configuration conf = new Configuration();
19. String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
20. if(otherArgs.length < 2) {
21. System.err.println("Usage: wordcount [...] ");
22. System.exit(2);
23. }
24. Job job = Job.getInstance(conf, "word count");
25. job.setJarByClass(WordCount.class);
26. job.setMapperClass(WordCount.TokenizerMapper.class);
27. job.setCombinerClass(WordCount.IntSumReducer.class);
28. job.setReducerClass(WordCount.IntSumReducer.class);
29. job.setOutputKeyClass(Text.class);
30. job.setOutputValueClass(IntWritable.class);
31. for(int i = 0; i < otherArgs.length - 1; ++i) {
32. FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
33. }
34. FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
35. System.exit(job.waitForCompletion(true)?0:1);
36. }
37. public static class TokenizerMapper extends Mapper
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)