hadoop

hadoop,第1张


    
        hadoop.tmp.dir
        file:/usr/local/hadoop/tmp
        Abase for other temporary directories.
    
    
        fs.defaultFS
        hdfs://localhost:9000
    








    
        dfs.replication
        1
    
    
        dfs.namenode.name.dir
        file:/usr/local/hadoop/tmp/dfs/name
    
    
        dfs.datanode.data.dir
        file:/usr/local/hadoop/tmp/dfs/data
    

import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
 
/**
 * 过滤掉文件名满足特定条件的文件 
 */
class MyPathFilter implements PathFilter {
     String reg = null; 
     MyPathFilter(String reg) {
          this.reg = reg;
     }
     public boolean accept(Path path) {
        if (!(path.toString().matches(reg)))
            return true;
        return false;
    }
}
/***
 * 利用FSDataOutputStream和FSDataInputStream合并HDFS中的文件
 */
public class MergeFile {
    Path inputPath = null; //待合并的文件所在的目录的路径
    Path outputPath = null; //输出文件的路径
    public MergeFile(String input, String output) {
        this.inputPath = new Path(input);
        this.outputPath = new Path(output);
    }
    public void doMerge() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://localhost:9000");
          conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
        FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()), conf);
        FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
                //下面过滤掉输入目录中后缀为.abc的文件
        FileStatus[] sourceStatus = fsSource.listStatus(inputPath,
                new MyPathFilter(".*\\.abc")); 
        FSDataOutputStream fsdos = fsDst.create(outputPath);
        PrintStream ps = new PrintStream(System.out);
        //下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
        for (FileStatus sta : sourceStatus) {
            //下面打印后缀不为.abc的文件的路径、文件大小
            System.out.print("路径:" + sta.getPath() + "    文件大小:" + sta.getLen()
                    + "   权限:" + sta.getPermission() + "   内容:");
            FSDataInputStream fsdis = fsSource.open(sta.getPath());
            byte[] data = new byte[1024];
            int read = -1;
 
            while ((read = fsdis.read(data)) > 0) {
                ps.write(data, 0, read);
                fsdos.write(data, 0, read);
            }
            fsdis.close();          
        }
        ps.close();
        fsdos.close();
    }
    public static void main(String[] args) throws IOException {
        MergeFile merge = new MergeFile(
                "hdfs://localhost:9000/user/hadoop/",
                "hdfs://localhost:9000/user/hadoop/merge.txt");
        merge.doMerge();
    }
}


        
                hbase.rootdir
                hdfs://localhost:9000/hbase
        
        
                hbase.cluster.distributed
                true
        
        
        hbase.unsafe.stream.capability.enforce
        false
    
1.import org.apache.hadoop.conf.Configuration;
2.import org.apache.hadoop.hbase.*;
3.import org.apache.hadoop.hbase.client.*;
4.import org.apache.hadoop.hbase.util.Bytes;
5. 
6.import java.io.IOException;
7.public class ExampleForHBase {
8.    public static Configuration configuration;
9.    public static Connection connection;
10.    public static Admin admin;
11.    public static void main(String[] args)throws IOException{
12.        init();
13.        createTable("student",new String[]{"score"});
14.        insertData("student","zhangsan","score","English","69");
15.        insertData("student","zhangsan","score","Math","86");
16.        insertData("student","zhangsan","score","Computer","77");
17.        getData("student", "zhangsan", "score","English");
18.        close();
19.    }
20. 
21.    public static void init(){
22.        configuration  = HBaseConfiguration.create();
23.        configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
24.        try{
25.            connection = ConnectionFactory.createConnection(configuration);
26.            admin = connection.getAdmin();
27.        }catch (IOException e){
28.            e.printStackTrace();
29.        }
30.    }
31. 
32.    public static void close(){
33.        try{
34.            if(admin != null){
35.                admin.close();
36.            }
37.            if(null != connection){
38.                connection.close();
39.            }
40.        }catch (IOException e){
41.            e.printStackTrace();
42.        }
43.    }
44. 
45.    public static void createTable(String myTableName,String[] colFamily) throws IOException {
46.        TableName tableName = TableName.valueOf(myTableName);
47.        if(admin.tableExists(tableName)){
48.            System.out.println("talbe is exists!");
49.        }else {
50.            TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
51.            for(String str:colFamily){
52.                ColumnFamilyDescriptor family = 
53.ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build();
54.                tableDescriptor.setColumnFamily(family);
55.            }
56.            admin.createTable(tableDescriptor.build());
57.        } 
58.    }
59. 
60.    public static void insertData(String tableName,String rowKey,String colFamily,String col,String val) throws IOException { 
61.        Table table = connection.getTable(TableName.valueOf(tableName));
62.        Put put = new Put(rowKey.getBytes());
63.        put.addColumn(colFamily.getBytes(),col.getBytes(), val.getBytes());
64.        table.put(put);
65.        table.close(); 
66.    }
67. 
68.    public static void getData(String tableName,String rowKey,String colFamily, String col)throws  IOException{ 
69.        Table table = connection.getTable(TableName.valueOf(tableName));
70.        Get get = new Get(rowKey.getBytes());
71.        get.addColumn(colFamily.getBytes(),col.getBytes());
72.        Result result = table.get(get);
73.        System.out.println(new String(result.getValue(colFamily.getBytes(),col==null?null:col.getBytes())));
74.        table.close(); 
75.    }
76.}

1.import java.io.IOException;
2.import java.util.Iterator;
3.import java.util.StringTokenizer;
4.import org.apache.hadoop.conf.Configuration;
5.import org.apache.hadoop.fs.Path;
6.import org.apache.hadoop.io.IntWritable;
7.import org.apache.hadoop.io.Text;
8.import org.apache.hadoop.mapreduce.Job;
9.import org.apache.hadoop.mapreduce.Mapper;
10.import org.apache.hadoop.mapreduce.Reducer;
11.import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12.import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13.import org.apache.hadoop.util.GenericOptionsParser;
14.public class WordCount {
15.    public WordCount() {
16.    }
17.     public static void main(String[] args) throws Exception {
18.        Configuration conf = new Configuration();
19.        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
20.        if(otherArgs.length < 2) {
21.            System.err.println("Usage: wordcount  [...] ");
22.            System.exit(2);
23.        }
24.        Job job = Job.getInstance(conf, "word count");
25.        job.setJarByClass(WordCount.class);
26.        job.setMapperClass(WordCount.TokenizerMapper.class);
27.        job.setCombinerClass(WordCount.IntSumReducer.class);
28.        job.setReducerClass(WordCount.IntSumReducer.class);
29.        job.setOutputKeyClass(Text.class);
30.        job.setOutputValueClass(IntWritable.class); 
31.        for(int i = 0; i < otherArgs.length - 1; ++i) {
32.            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
33.        }
34.        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
35.        System.exit(job.waitForCompletion(true)?0:1);
36.    }
37.    public static class TokenizerMapper extends Mapper {
38.        private static final IntWritable one = new IntWritable(1);
39.        private Text word = new Text();
40.        public TokenizerMapper() {
41.        }
42.        public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
43.            StringTokenizer itr = new StringTokenizer(value.toString()); 
44.            while(itr.hasMoreTokens()) {
45.                this.word.set(itr.nextToken());
46.                context.write(this.word, one);
47.            }
48.        }
49.    }
50.public static class IntSumReducer extends Reducer {
51.        private IntWritable result = new IntWritable();
52.        public IntSumReducer() {
53.        }
54.        public void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
55.            int sum = 0;
56.            IntWritable val;
57.            for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
58.                val = (IntWritable)i$.next();
59.            }
60.            this.result.set(sum);
61.            context.write(key, this.result);
62.        }
63.    }
64.}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/916177.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存