-
基于RDD方式1:调用spark
- 通过SparkSession对象的createDataframe方法来将RDD转换为Dataframe,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)
# 首先构建一个RDD rdd[(name, age), 0] rdd = sc.textFile("../data/sql/people.txt"). map(lambda x: x.split(',')). map(lambda x: [x[0], int(x[1])]) # 需要做类型转换,因为类型从RDD中探测 # 构建DF方式1 df = spark.createDataframe(rdd, schema=['name', 'age'])
-
基于RDD方式2:
- 通过StructType对象来定义Dataframe的“表结构”转换RDD
rdd = sc.textFile("../data/sql/stu_score.txt"). map(lambda x: x.split(',')). map(lambda x: (int(x[0], x[1], int(x[2])))) # StructType类 # 这个类可以定义整个Dataframe中的Schema schema = StructType(). add("id", IntegerType(), nullable=False). add("name", StringType(), nullable=True). add("score", IntegerType(), nullable=False) # 一个add方法:定义一个趔的信息,如果有3个列,就写3个add,每一个add代表一个StructField # add方法:参数1:列名称,参数2:列类型,参数3:是否允许为空 df = spark.createDataframe(rdd, schema)
-
基于RDD方式3
- 使用RDD的toDF方法转换RDD
# StructType类 # 这个类可以定义整个Dataframe中的Schema schema = StructType(). add("id", IntegerType(), nullable=False). add("name", StringType(), nullable=True). add("score", IntegerType(), nullable=False) # 方式1:只传列名,类型靠推断,是否允许为空是true df = rdd.toDF(['id', 'subject', 'score']) df.printSchema() df.show() # 方式2:传入完整版的Schema描述对象StructType df = rdd.toDF(schema) df.printSchema() df.show()
-
基于Pandas的Dataframe
- 将Pandas的Dataframe对象,转变为分布式的SparkSQL
# 构建Pandas的DF pdf = pd.Dataframe({ "id": [1, 2, 3], "name": ["张大仙", "王晓晓", "王大锤"], "age": [11, 11, 11] }) # 将Pandas的DF对象转换成Spark的DF df = spark.createDataframe(pdf)
-
读取外部数据
- 通过SparkSQL的统一API进行数据读取构建Dataframe
sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......") .option("K", "V") # option可选 .schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT") .load("被读取文件的路径,支持本地文件系统和HDFS")
- 读取text数据源,使用forma(“text”)读取文本数据,读取到的Dataframe只会有一个趔,列名称默认值为:value
schema = StructType().add("data", StringType(), nullable=True) df = spark.read.format("text") .chema(schema) .load("../data/sql/people.txt")
- 读取json数据源,使用format(“json”)读取json数据源
df = spark.read.format("json"). load("../data/sql/people.json") # JSON类型一般不用写.schema,json自带,json带有列名和列类型(字符串和数字) df.printSchema() df.show()
- 读取csv数据源,使用format(“csv”)读取csv数据
df = spark.read.format("csv") .option("sep", ",") # 列分隔符 .option("header", False) # 是否有CSV标头 .option("encoding", "utf-8") # 编码 .schema("name STRING, age INT, job STRING") # 指定列名和类型 .load("../data/sql/people.csv") # 路径 df.printSchema() df.show()
- 读取parquet数据源,使用format(“parquet”)读取parquet数据
# parquet自带schema,直接load啥也不需要了 df = spark.read.format("parquet"). load("../data/sql/users.parquet") df.printSchema() df.show()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)