import csv from kafka import KafkaProducer import time def main(): ##生产模块 producer = KafkaProducer(bootstrap_servers=['121.196.222.214:9092']) with open('D:/QQfile/answer_results.csv','r',encoding='utf8')as fp: reader=csv.reader(fp) data_temp=[] for row in reader: data_temp.append(row) pre=[] date=[] pos=0 for i in range(len(data_temp)-1): if data_temp[i][0]==data_temp[i+1][0]: date=data_temp[i][0] pre.append(data_temp[i][1]) else: pos=i break print(pos) for i in range(len(pre)): string=pre[i] time.sleep(1) producer.send("txt", bytes(string.replace('n','').encode('utf-8'))) print(bytes(string.replace('n','').encode('utf-8'))) if __name__ == '__main__': main()KAFKA消费者
import findspark findspark.init('/root/Downloads/spark-2.4.7-bin-hadoop2.7') from pyspark import SparkContext from pyspark import SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition def start(): sconf = SparkConf() sconf.set('spark.cores.max', 2).set('spark.io.compression.codec','snappy') sc = SparkContext(appName='txt', conf=sconf) ssc = StreamingContext(sc, 15) brokers = "121.196.222.214:9092" topic = 'txt' user_data = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams={"metadata.broker.list": brokers}) gender_users = user_data.flatMap(lambda x:x[1].split(' ')).map(lambda gender: (gender, 1)).reduceByKey(lambda a, b: a + b) gender_users.pprint() gender_users.saveAsTextFiles("/usr/wordcount/w") ssc.start() ssc.awaitTermination() if __name__ == '__main__': start()
import csv from kafka import KafkaProducer import time def main(): ##生产模块 producer = KafkaProducer(bootstrap_servers=['121.196.222.214:9092']) with open('D:/QQfile/answer_results.csv','r',encoding='utf8')as fp: reader=csv.reader(fp) data_temp=[] for row in reader: data_temp.append(row) pre=[] date=[] pos=0 for i in range(len(data_temp)-1): if data_temp[i][0]==data_temp[i+1][0]: date=data_temp[i][0] pre.append(data_temp[i][1]) else: pos=i break print(pos) for i in range(len(pre)): string=pre[i] time.sleep(1) producer.send("txt", bytes(string.replace('n','').encode('utf-8'))) print(bytes(string.replace('n','').encode('utf-8'))) if __name__ == '__main__': main()
JSON转换
import json filename='C:/Users/2020-01-20.txt' with open(filename, 'r',encoding='UTF-8') as fp: lines = fp.readlines() dict_all={} day='2020-01-20' dict_day={} key=[] value=[] for line in lines: list=[] line=line.replace('(','').replace(')','').replace('n','').replace(''','').replace(')','') list=line.split(',') for i in range(len(list)): list[i]=list[i].strip() if list[0]=='0': list[0]='neutral' elif list[0]=='1': list[0]='postive' else: list[0]='negative' key.append(list[0]) value.append(list[1]) for i in range(len(key)): dict_day[str(key[i])]=int(value[i]) dict_all[day]=dict_day str_json=json.dumps(dict_all) with open("result_mode.json", "w") as fp: fp.write(json.dumps(dict_all,indent=4))
JSON转换WORDCOUNTS
import json filename='word.txt' with open(filename, 'r',encoding='UTF-8') as fp: lines = fp.readlines() dict_all={} day='2020-01-20' dict_day={} key=[] value=[] for line in lines: list=[] line=line.replace('(','').replace(')','').replace('n','').replace(''','').replace(')','') list=line.split(',') for i in range(len(list)): list[i]=list[i].strip() key.append(list[0]) value.append(list[1]) for i in range(len(key)): dict_day[str(key[i])]=int(value[i]) dict_all[day]=dict_day str_json=json.dumps(dict_all) with open("result.json", "w") as fp: fp.write(json.dumps(dict_all,indent=4))
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)