from re import A from unittest import result from pyspark import SparkConf,SparkContext, StorageLevel import json import jieba # /opt/module/spark/bin/spark-submit /opt/Code/searchSouGou.py def context_jieba(data): seg = jieba.cut_for_search(data) l = list() for word in seg: l.append(word) return l def filter_words(data): return data not in ['谷','帮','客'] def append_words(data): if data == '传智播': data = '传智播客' if data == '院校': data = '院校帮' if data == '博学': data = '博学谷' return (data,1) def extract_user_and_word(data): # 传入数据的是元组 (1,我喜欢传智播客) user_id = data[0] content = data[1] words = context_jieba(content) return_list = list() for word in words: if filter_words(word): return_list.append((user_id + '_' + append_words(word)[0],1)) return return_list if __name__ == '__main__': conf = SparkConf().setAppName("SouGou").setMaster("local[*]") sc = SparkContext(conf=conf) file_rdd = sc.textFile('file:///opt/Data/SogouQ.txt') # TODO1 需求1:对用户搜索的关键词分析 split_rdd = file_rdd.map(lambda line:line.split('t')) split_rdd.persist(StorageLevel.DISK_ONLY) context_rdd = split_rdd.map(lambda x:x[2]) words_rdd = context_rdd.flatMap(context_jieba) filter_rdd = words_rdd.filter(filter_words) final_words_rdd = filter_rdd.map(append_words) result = final_words_rdd.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],ascending=False,numPartitions=1).take(5) print('需求1结果: ',result) # TODO2 对用户和关键词组合分析 user_content_rdd = split_rdd.map(lambda x:(x[1],x[2])) user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word) result = user_word_with_one_rdd.reduceByKey(lambda a,b:a+b). sortBy(lambda x:x[1],ascending=False,numPartitions=1).take(5) print('需求2结果: ',result) #TODO3 对热门搜索时间段进行统计 time_rdd = split_rdd.map(lambda x:x[0]) hour_with_one_rdd = time_rdd.map(lambda x:(x.split(':')[0],1)) result = hour_with_one_rdd.reduceByKey(lambda a,b:a+b). sortBy(lambda x:x[1],ascending=False,numPartitions=1).collect() print('需求3结果:',result)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)