KAFKA 知乎

KAFKA 知乎,第1张

KAFKA 知乎 KAFKA module
import csv
from kafka import KafkaProducer
import time

def main():
    ##生产模块
    producer = KafkaProducer(bootstrap_servers=['121.196.222.214:9092'])
    with open('D:/QQfile/answer_results.csv','r',encoding='utf8')as fp:
        reader=csv.reader(fp)
        data_temp=[]
        for row in reader:
            data_temp.append(row)
        pre=[]
        date=[]
        pos=0
        for i in range(len(data_temp)-1):
            if data_temp[i][0]==data_temp[i+1][0]:
                date=data_temp[i][0]
                pre.append(data_temp[i][1])
            else:
                pos=i
                break
        print(pos)
        for i in range(len(pre)):
            string=pre[i]
            time.sleep(1)
            producer.send("txt", bytes(string.replace('n','').encode('utf-8')))
            print(bytes(string.replace('n','').encode('utf-8')))
if __name__ == '__main__':
    main()
KAFKA消费者
import findspark
findspark.init('/root/Downloads/spark-2.4.7-bin-hadoop2.7')
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition


def start():
    sconf = SparkConf()
    sconf.set('spark.cores.max', 2).set('spark.io.compression.codec','snappy')
    sc = SparkContext(appName='txt', conf=sconf)
    ssc = StreamingContext(sc, 15)
    brokers = "121.196.222.214:9092"
    topic = 'txt'
    user_data = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams={"metadata.broker.list": brokers})
    gender_users = user_data.flatMap(lambda x:x[1].split(' ')).map(lambda gender: (gender, 1)).reduceByKey(lambda a, b: a + b)
    gender_users.pprint()
    gender_users.saveAsTextFiles("/usr/wordcount/w")
    ssc.start()
    ssc.awaitTermination()


if __name__ == '__main__':
    start()
import csv
from kafka import KafkaProducer
import time

def main():
    ##生产模块
    producer = KafkaProducer(bootstrap_servers=['121.196.222.214:9092'])
    with open('D:/QQfile/answer_results.csv','r',encoding='utf8')as fp:
        reader=csv.reader(fp)
        data_temp=[]
        for row in reader:
            data_temp.append(row)
        pre=[]
        date=[]
        pos=0
        for i in range(len(data_temp)-1):
            if data_temp[i][0]==data_temp[i+1][0]:
                date=data_temp[i][0]
                pre.append(data_temp[i][1])
            else:
                pos=i
                break
        print(pos)
        for i in range(len(pre)):
            string=pre[i]
            time.sleep(1)
            producer.send("txt", bytes(string.replace('n','').encode('utf-8')))
            print(bytes(string.replace('n','').encode('utf-8')))
if __name__ == '__main__':
    main()

JSON转换

import json

filename='C:/Users/2020-01-20.txt'
with open(filename, 'r',encoding='UTF-8') as fp:
    lines = fp.readlines()
    dict_all={}
    day='2020-01-20'
    dict_day={}
    key=[]
    value=[]
    for line in lines:
        list=[]
        line=line.replace('(','').replace(')','').replace('n','').replace(''','').replace(')','')
        list=line.split(',')
        for i in range(len(list)):
            list[i]=list[i].strip()
        if list[0]=='0':
            list[0]='neutral'
        elif list[0]=='1':
            list[0]='postive'
        else:
            list[0]='negative'
        key.append(list[0])
        value.append(list[1])
    for i in range(len(key)):
        dict_day[str(key[i])]=int(value[i])
    dict_all[day]=dict_day
    str_json=json.dumps(dict_all)
    with open("result_mode.json", "w") as fp:
        fp.write(json.dumps(dict_all,indent=4))

JSON转换WORDCOUNTS

import json

filename='word.txt'
with open(filename, 'r',encoding='UTF-8') as fp:
    lines = fp.readlines()
    dict_all={}
    day='2020-01-20'
    dict_day={}
    key=[]
    value=[]
    for line in lines:
        list=[]
        line=line.replace('(','').replace(')','').replace('n','').replace(''','').replace(')','')
        list=line.split(',')
        for i in range(len(list)):
            list[i]=list[i].strip()
        key.append(list[0])
        value.append(list[1])
    for i in range(len(key)):
        dict_day[str(key[i])]=int(value[i])
    dict_all[day]=dict_day
    str_json=json.dumps(dict_all)
    with open("result.json", "w") as fp:
        fp.write(json.dumps(dict_all,indent=4))

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5706227.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存