############################################## step-1 pyspark py脚本实现每天跑数逻辑
######## concat_ws(’,’,collect_list(event_info)) 配合前一步row_number()时间排序的, 方法组合的数据在交互引擎是有序体现的。但是download到本地就会无序
######## 分析上面原因:分布式下每个节点是有序的拉取到本地又变成无序 。
##step–1 通过py脚本将数据落地到hdfs
from pyspark.sql import SparkSession
import sys
import argparse
spark=SparkSession.builder.appName(“w9010794”).getOrCreate()
spark.conf.set(‘hive.exec.dynamic.partition’, ‘true’)
spark.conf.set(‘hive.optimize.sort.dynamic.partition’, ‘false’)
spark.conf.set(‘hive.exec.dynamic.partition.mode’, ‘nonstrict’)
spark.conf.set(“spark.sql.shuffle.partitions”, “3000”)
spark.conf.set(“spark.sql.parser.quotedRegexColumnNames”, “true”)
def down_data() :
df = spark.sql(f’’’
select t3.imei,t3.model
,regexp_replace(
concat_ws(’’,
sort_array(
collect_list(
concat_ws(’##’, lpad(cast(rank as string),5,‘0’), network) )
) ), ‘\d+##’,’’) network
from ( select t2.imei,t2.model
,t2.network
,t1.pack_name as fff
,row_number() over (partition by t2.imei order by t2.client_time asc) as rank
from(select *
from dataintel_dw.PEEM00_app_launch_ind_h
where dayno={args.dayno} ) t2
left join( select pack_name
from dataintel_dw.PDEM10_app_blacklist_ind_d
where dayno=20210714 ) t1
on t1.pack_name=t2.pack_name
) t3
where t3.fff is null
group by t3.imei, t3.model
‘’’)
####### data_path = args.export_hdfs_path+’/PEEM00_data_’+ args.dayno + ‘.csv’
####### df.repartition(1).option(“header”, “true”).save(data_path)
####### df.write.csv(‘mycsv.csv’)
####### .write.format(“com.databricks.spark.csv”) 报错 Dataframe object has no attribute ‘option’
return df
if name == ‘main’:
import argparse
parser = argparse.ArgumentParser(description=‘download_data’)
parser.add_argument(’–dayno’, type=str, required=True)
parser.add_argument(’–export_hdfs_path’, type=str, required=True)
args = parser.parse_args()
df=down_data()
df.write.save(f’’’{args.export_hdfs_path}/data_{args.dayno}’’’,
format=“csv”, header = True, sep="t", mode=“overwrite”)
####### 将对方写入报存到,解析后的参数路径和文件名 。 (写入成单一文件失败,换方法)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)