Spark DataFrame的代码构建

Spark DataFrame的代码构建,第1张

Spark DataFrame的代码构建
  • 基于RDD方式1:调用spark

    • 通过SparkSession对象的createDataframe方法来将RDD转换为Dataframe,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)
    # 首先构建一个RDD rdd[(name, age), 0]
    rdd = sc.textFile("../data/sql/people.txt").
    		map(lambda x: x.split(',')).
        	map(lambda x: [x[0], int(x[1])])	# 需要做类型转换,因为类型从RDD中探测
    # 构建DF方式1
    df = spark.createDataframe(rdd, schema=['name', 'age'])
    
  • 基于RDD方式2:

    • 通过StructType对象来定义Dataframe的“表结构”转换RDD
    rdd = sc.textFile("../data/sql/stu_score.txt").
        map(lambda x: x.split(',')).
        map(lambda x: (int(x[0], x[1], int(x[2]))))
    
    # StructType类
    # 这个类可以定义整个Dataframe中的Schema
    schema = StructType().
        add("id", IntegerType(), nullable=False).
        add("name", StringType(), nullable=True).
        add("score", IntegerType(), nullable=False)
    
    # 一个add方法:定义一个趔的信息,如果有3个列,就写3个add,每一个add代表一个StructField
    # add方法:参数1:列名称,参数2:列类型,参数3:是否允许为空
    df = spark.createDataframe(rdd, schema)	
    
  • 基于RDD方式3

    • 使用RDD的toDF方法转换RDD
    # StructType类
    # 这个类可以定义整个Dataframe中的Schema
    schema = StructType().
        add("id", IntegerType(), nullable=False).
        add("name", StringType(), nullable=True).
        add("score", IntegerType(), nullable=False)
    
    # 方式1:只传列名,类型靠推断,是否允许为空是true
    df = rdd.toDF(['id', 'subject', 'score'])
    df.printSchema()
    df.show()
    
    # 方式2:传入完整版的Schema描述对象StructType
    df = rdd.toDF(schema)
    df.printSchema()
    df.show()
    
  • 基于Pandas的Dataframe

    • 将Pandas的Dataframe对象,转变为分布式的SparkSQL
    # 构建Pandas的DF
    pdf = pd.Dataframe({
        "id": [1, 2, 3],
        "name": ["张大仙", "王晓晓", "王大锤"],
        "age": [11, 11, 11]
    })
    # 将Pandas的DF对象转换成Spark的DF
    df = spark.createDataframe(pdf)
    
  • 读取外部数据

    • 通过SparkSQL的统一API进行数据读取构建Dataframe
    sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
        .option("K", "V")    # option可选
        .schema(StructType | String)    # STRING的语法如.schema("name STRING", "age INT")
        .load("被读取文件的路径,支持本地文件系统和HDFS")
    
    • 读取text数据源,使用forma(“text”)读取文本数据,读取到的Dataframe只会有一个趔,列名称默认值为:value
    schema = StructType().add("data", StringType(), nullable=True)
    df = spark.read.format("text")
        .chema(schema)
        .load("../data/sql/people.txt")
    
    • 读取json数据源,使用format(“json”)读取json数据源
    df = spark.read.format("json").
        load("../data/sql/people.json")
    # JSON类型一般不用写.schema,json自带,json带有列名和列类型(字符串和数字)
    df.printSchema()
    df.show()
    
    • 读取csv数据源,使用format(“csv”)读取csv数据
    df = spark.read.format("csv")
        .option("sep", ",")                            # 列分隔符
        .option("header", False)                       # 是否有CSV标头
        .option("encoding", "utf-8")                   # 编码
        .schema("name STRING, age INT, job STRING")    # 指定列名和类型
        .load("../data/sql/people.csv")                 # 路径
    df.printSchema()
    df.show()
    
    • 读取parquet数据源,使用format(“parquet”)读取parquet数据
    # parquet自带schema,直接load啥也不需要了
    df = spark.read.format("parquet").
        load("../data/sql/users.parquet")
    df.printSchema()
    df.show()
    

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5605982.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存