里面有更多知识分享,以及一些有意思的小项目~
环境spark 2.4.0
1 读写 csv文件# 读csv file_path = r"xx/xx/xx/" # 可以是文件路径,也可以是文件名 # df = spark.read.csv(file_path, header=True, inferSchema=True) df = spark.read.format('csv').option("header", "true").option("inferSchema", "true").csv(file_path) # 写csv # df.write.csv(file_path) df.write.save(path=file_path, format='csv', mode='overwrite', sep=',', header=True) # df.repartition(1).write.save(path=file_path, format='csv', mode='overwrite', sep=',', header=True) # 写成1个csv文件2 读写 parquet文件
# 读parquet file_path = r"xx/xx/xx/" df_buzz_country = spark.read.parquet(file_path) # 写parquet df.write.parquet(file_path, mode="overwrite") # df.repartition(1).write.parquet(file_path, mode="overwrite") # 写成1个parquet文件3 读写 json文件
# 读csv file_path = r"xx/xx/xx.json" peopleDF = spark.read.json(file_path) # 写csv df.write.json(file_path)4 读 hive
spark = SparkSession.builder.appName("task_read_hive").enableHiveSupport().getOrCreate() df = spark.sql("select * from table_1")5 读 mysql
# spark-submit 中,需要配置如下(即添加‘mysql-connector-java-5.1.47.jar’包): # --jars xxx/xxx/mysql-connector-java-5.1.47.jar mysql_url = 'jdbc:mysql://域名或ip:3306/数据库名?characterEncoding=UTF-8' mysql_user = 'xxx' mysql_pw = 'xxx' table = "(select * from table_A) a" df = spark.read.format('jdbc').options(url=mysql_url, driver='com.mysql.jdbc.Driver', dbtable=table, user=mysql_user, password=mysql_pw).load()想了解更多,欢迎移步"文渊小站"
里面有更多知识分享,以及一些有意思的小项目~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)