背景:
数据中某字段A需要进行转换,批次拉取后进行行处理
为提高效率,将大批次分为10个小批次,分线程处理
read_df = hive_context.sql(hivesql) allrows = read_df.collect() #此处将大批次分为10个小批次,分线程处理 temp_list = list_of_groups(allrows, 10) # step3 line handel threads = [] for i in range(len(temp_list)): th = threading.Thread(target=lineHandel, args=(temp_list[i],)) threads.append(th) for i in range(len(temp_list)): threads[i].start() for i in range(len(temp_list)): threads[i].join()
行处理逻辑省略。。。
问题出现点:
处理完成后的Dataframe我使用registerTempTable函数的方式写入Hive
分析原因:
临时表线程共享,会导致多个线程多次写入的 *** 作
解决方法:
直接使用
DF.write.insertInto("database.table")
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)