通过java实现mapreduce对电影评论内容的分词进行词频统计,以此来制作电影评论内容的词云,基于hadoop集群实现。
代码如下:
import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.*; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.huaban.analysis.jieba.Jiebasegmenter; public class emotion_analysis_wordcount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs(); if(otherArgs.length < 2) { System.err.println("Usage: Emotion_Analysis[ ...] "); System.exit(2); } Job job = Job.getInstance(conf, "Emotion Analysis"); job.setJarByClass(emotion_analysis_wordcount.class); job.setMapperClass(emotion_analysis_wordcount.TokenizerMapper.class); //job.setCombinerClass(Emotion_Analysis.IntSumReducer.class); job.setReducerClass(emotion_analysis_wordcount.IntSumReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, new Path("/user/hadoop/movie/test2.csv")); //设置文件输出 FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/output2")); //解决输出路径已经存在的问题 FileSystem fileSystem = FileSystem.get(conf); Path outputPath = new Path("/user/hadoop/output2"); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } //3.执行 job.waitForCompletion(true); } public static class TokenizerMapper extends Mapper { public TokenizerMapper() { } public static List most = new ArrayList<>(); public static List very = new ArrayList<>(); public static List more = new ArrayList<>(); public static List ish = new ArrayList<>(); public static List insufficiently = new ArrayList<>(); public static List over = new ArrayList<>(); public static List negative_words = new ArrayList<>(); public static List postive_words = new ArrayList<>(); public static List stop_words = new ArrayList<>(); public static void read() throws IOException { String temp=null; String filepath="/home/hadoop/Chinese_English_stopwords.txt"; File file =new File(filepath); BufferedReader bufferedReader = new BufferedReader(new FileReader(file)); while ((temp = bufferedReader.readLine()) != null) { stop_words.add(temp.replace(" ", "")); } } public static List withoutstopwords(String oldstring) throws IOException{ String newString = oldstring; Jiebasegmenter segmenter = new Jiebasegmenter(); List termlist=segmenter.sentenceProcess(newString); termlist.removeAll(stop_words); return termlist; } public static boolean isNumeric(String str){ Pattern pattern = Pattern.compile("[0-9]*"); Matcher isNum = pattern.matcher(str); if( !isNum.matches() ){ return true; } return false; } public static boolean checkname(String name) { int n = 0; for(int i = 0; i < name.length(); i++) { n = (int)name.charAt(i); if(!(19968 <= n && n <40869)) { return false; } } return true; } public void setup() throws IOException, InterruptedException { System.out.println("setup"); read(); } public void run(Context context) throws IOException, InterruptedException { setup(); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } public void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String line=value.toString(); String[] words = line.split(","); if (words.length-1<=8) return; String[] pre=Arrays.copyOfRange(words,0,8); String[] comment_lines=Arrays.copyOfRange(words, 8,words.length-1); String commentString=""; for(String comment:comment_lines) { commentString+=comment; } if (isNumeric(pre[0])) { return; } List comment=withoutstopwords(commentString); for (String g : comment) { if (!checkname(g.replace(" ", ""))) { return; } context.write(new Text(pre[1]+","+g),new IntWritable(1)); } } } public static class IntSumReducer extends Reducer { public IntSumReducer() { } public static Configuration configuration; public static Connection connection; public static Admin admin; public static Table table; public static void insertData(String rowKey,String colFamily,String col,String val) throws IOException { Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(colFamily.getBytes(),col.getBytes(), val.getBytes()); table.put(put); } public void setup() throws IOException, InterruptedException { configuration = HbaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum","slave1"); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); TableName tableName=TableName.valueOf("movie2"); String[] colFamily= {"information"}; if (admin.tableExists(tableName)) { System.out.println("文件存在,我要删了他"); admin.disableTable(tableName); admin.deleteTable(tableName); } TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName); for(String str:colFamily){ ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build(); tableDescriptor.setColumnFamily(family); } admin.createTable(tableDescriptor.build()); table = connection.getTable(tableName); } public void run(Context context) throws IOException, InterruptedException { setup(); try { while (context.nextKeyValue()) { reduce(context.getCurrentKey(), context.getValues(), context); } } finally { cleanup(context); } } public void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { int sum = 0; IntWritable val; for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) { val = (IntWritable)i$.next(); } if (sum<5) return; insertData(key.toString(),"information","number",""+sum); context.write(key, new DoubleWritable(sum)); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)