通过mapreduce实现电影评论的情感分析,基于hadoop的分布式集群实现。
代码如下:
import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.*; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.huaban.analysis.jieba.Jiebasegmenter; public class emotion_analysis_level { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs(); if(otherArgs.length < 2) { System.err.println("Usage: Emotion_Analysis[ ...] "); System.exit(2); } Job job = Job.getInstance(conf, "Emotion Analysis"); job.setJarByClass(emotion_analysis_level.class); job.setMapperClass(emotion_analysis_level.TokenizerMapper.class); //job.setCombinerClass(Emotion_Analysis.IntSumReducer.class); job.setReducerClass(emotion_analysis_level.IntSumReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, new Path("/user/hadoop/movie/test2.csv")); //设置文件输出 FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/output")); //解决输出路径已经存在的问题 FileSystem fileSystem = FileSystem.get(conf); Path outputPath = new Path("/user/hadoop/output"); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } //3.执行 job.waitForCompletion(true); } public static class TokenizerMapper extends Mapper { public static List most = new ArrayList<>(); public static List very = new ArrayList<>(); public static List more = new ArrayList<>(); public static List ish = new ArrayList<>(); public static List insufficiently = new ArrayList<>(); public static List over = new ArrayList<>(); public static List negative_words = new ArrayList<>(); public static List postive_words = new ArrayList<>(); public static List stop_words = new ArrayList<>(); public static double score(List words) { //System.out.println(words); double most_score=8; double very_score=6; double more_score=4; double ish_score=0.6; double insufficiently_score= -1.5; double over_score=2; double postive_score=1; double negative_score=-1; double no_attitide_score=0; List PS=new ArrayList<>(); List NS=new ArrayList<>(); List NAS=new ArrayList<>(); for (int i = 0; i < words.size(); i++) { if (negative_words.contains(words.get(i))) { if(i==0) { PS.add(negative_score); } else { if(most.contains(words.get(i-1))) NS.add(most_score*negative_score); else if (very.contains(words.get(i-1))) NS.add(very_score*negative_score); else if (more.contains(words.get(i-1))) NS.add(more_score*negative_score); else if (ish.contains(words.get(i-1))) NS.add(ish_score*negative_score); else if (insufficiently.contains(words.get(i-1))) NS.add(insufficiently_score*negative_score); else if (over.contains(words.get(i-1))) NS.add(over_score*negative_score); else NS.add(negative_score); } } else if (postive_words.contains(words.get(i))) { if(i==0) { PS.add(postive_score); } else { if(most.contains(words.get(i-1))) PS.add(most_score*postive_score); else if (very.contains(words.get(i-1))) PS.add(very_score*postive_score); else if (more.contains(words.get(i-1))) PS.add(more_score*postive_score); else if (ish.contains(words.get(i-1))) PS.add(ish_score*postive_score); else if (insufficiently.contains(words.get(i-1))) PS.add(insufficiently_score*postive_score); else if (over.contains(words.get(i-1))) PS.add(over_score*postive_score); else PS.add(postive_score); } } else { NAS.add(no_attitide_score); } } Double NS_sum = NS.stream().reduce(Double::sum).orElse(0.0); Double PS_sum = PS.stream().reduce(Double::sum).orElse(0.0); double final_score=NS_sum+PS_sum; return final_score; } public static int fenfen(double b) { int a=(int)b; int level=0; if (a<0) { level=a/5-1; } else if (a>0) { level=a/5+1; } else if (a==0) { return level; } return level; } public static boolean isNumeric(String str){ Pattern pattern = Pattern.compile("[0-9]*"); Matcher isNum = pattern.matcher(str); if( !isNum.matches() ){ return true; } return false; } public static void read() throws IOException { String chinese_degree_path = "/home/hadoop/word/chengdu.txt"; BufferedReader degreefile = new BufferedReader(new InputStreamReader(new FileInputStream(chinese_degree_path), "UTF-8")); String temp = null; most = new ArrayList<>(); very = new ArrayList<>(); more = new ArrayList<>(); ish = new ArrayList<>(); insufficiently = new ArrayList<>(); over = new ArrayList<>(); List eList= new ArrayList<>(); eList.add(most); eList.add(very); eList.add(more); eList.add(ish); eList.add(insufficiently); eList.add(over); int i=0; while ((temp = degreefile.readLine()) != null) { if(temp.contains("12345")) { i=i+1; temp = degreefile.readLine(); continue; } eList.get(i).add(temp); } String negative_comments_path="/home/hadoop/word/fumianpingjia.txt"; String negative_emotion_path="/home/hadoop/word/fumianqinggan.txt"; String postive_comments_path="/home/hadoop/word/zhengmianpingjia.txt"; String postive_emotion_path = "/home/hadoop/word/zhengmianqinggan.txt"; BufferedReader negative_comments_file = new BufferedReader(new InputStreamReader(new FileInputStream(negative_comments_path), "UTF-8")); BufferedReader negative_emotion_file = new BufferedReader(new InputStreamReader(new FileInputStream(negative_emotion_path), "UTF-8")); BufferedReader postive_comments_file = new BufferedReader(new InputStreamReader(new FileInputStream(postive_comments_path), "UTF-8")); BufferedReader postive_emotion_file = new BufferedReader(new InputStreamReader(new FileInputStream(postive_emotion_path), "UTF-8")); while ((temp = negative_comments_file.readLine()) != null) { negative_words.add(temp.replace(" ", "")); } while ((temp = negative_emotion_file.readLine()) != null) { negative_words.add(temp.replace(" ", "")); } while ((temp = postive_comments_file.readLine()) != null) { postive_words.add(temp.replace(" ", "")); } while ((temp = postive_emotion_file.readLine()) != null) { postive_words.add(temp.replace(" ", "")); } String filepath="/home/hadoop/Chinese_English_stopwords.txt"; File file =new File(filepath); BufferedReader bufferedReader = new BufferedReader(new FileReader(file)); while ((temp = bufferedReader.readLine()) != null) { stop_words.add(temp.replace(" ", "")); } } public static List
withoutstopwords(String oldstring) throws IOException{ String newString = oldstring; Jiebasegmenter segmenter = new Jiebasegmenter(); List termlist=segmenter.sentenceProcess(newString); //System.out.println(termlist); termlist.removeAll(stop_words); //return newString; return termlist; } public TokenizerMapper() throws IOException { } public static Configuration configuration; public static Connection connection; public static Admin admin; public static Table table; public static void insertData(String rowKey,String colFamily,String col,String val) throws IOException { Put put = new Put(rowKey.getBytes()); put.addColumn(colFamily.getBytes(),col.getBytes(), val.getBytes()); table.put(put); } public void setup() throws IOException, InterruptedException { System.out.println("setup"); read(); configuration = HbaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum","slave1"); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); TableName tableName=TableName.valueOf("movie"); String[] colFamily= {"information"}; if (admin.tableExists(tableName)) { System.out.println("文件存在,我要删了他"); admin.disableTable(tableName); admin.deleteTable(tableName); } TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName); for(String str:colFamily){ ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build(); tableDescriptor.setColumnFamily(family); } admin.createTable(tableDescriptor.build()); table = connection.getTable(TableName.valueOf("movie")); } public void run(Context context) throws IOException, InterruptedException { setup(); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } public void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String line=value.toString(); String[] words = line.split(","); if (words.length-1<=8) return; String[] pre=Arrays.copyOfRange(words,0,8); String[] comment_lines=Arrays.copyOfRange(words, 8,words.length-1); String commentString=""; for(String comment:comment_lines) { commentString+=comment; } if (isNumeric(pre[0])) { return; } List comment=withoutstopwords(commentString); System.out.println(comment); double score=score(comment); int fen=fenfen(score); insertData(pre[0],"information","score",""+score); insertData(pre[0],"information","level",""+fen); context.write(new Text(pre[1]+","+fen+","),new IntWritable(1)); } } public static class IntSumReducer extends Reducer { public IntSumReducer() { } public void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { int sum = 0; IntWritable val; for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) { val = (IntWritable)i$.next(); } context.write(key, new DoubleWritable(sum)); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)