pyspark多线程DF写Hive,出现重复数据及解决办法

pyspark多线程DF写Hive,出现重复数据及解决办法,第1张

pyspark多线程DF写Hive,出现重复数据及解决办法

背景:

        数据中某字段A需要进行转换,批次拉取后进行行处理

        为提高效率,将大批次分为10个小批次,分线程处理

read_df = hive_context.sql(hivesql)
allrows = read_df.collect()
#此处将大批次分为10个小批次,分线程处理
temp_list = list_of_groups(allrows, 10)
    # step3 line handel
    threads = []
    for i in range(len(temp_list)):
        th = threading.Thread(target=lineHandel, args=(temp_list[i],))
        threads.append(th)
    for i in range(len(temp_list)):
        threads[i].start()
    for i in range(len(temp_list)):
        threads[i].join()

行处理逻辑省略。。。

问题出现点:

        处理完成后的Dataframe我使用registerTempTable函数的方式写入Hive

分析原因:

        临时表线程共享,会导致多个线程多次写入的 *** 作

解决方法:

        直接使用

DF.write.insertInto("database.table")

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5635886.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存