里面有更多知识分享,以及一些有意思的小项目~
环境spark 2.4.0
df查看信息(常用)df.schema # df表结构 df.columns # df各字段 df.dtypes # df各字段数据类型 df.first() # df的第一行数据 df.head() # df的第一行数据 # df.head(5) # df的前五行数据 df.show() # 展示 df 的部分行数据 # df.show(5) # 展示 df 的前五行数据 display(df) # 显示df的各字段数据类型 # display(df.toPandas()) # pandas样式展示df的部分行数据 df.count() # 统计df的总数据量选取df部分行作为新的df
#df = df.withColumn('index', F.monotonically_increasing_id()) # 不重复的自增列'index' # 但会出现不连续的情况,故不实用 # 用于添加自增列 def flat(l): for k in l: if not isinstance(k, (list, tuple)): yield k else: yield from flat(k) # 新增一列从0开始连续自增的'index'列 schema = df.schema.add(StructField('index', LongType())) rdd = df.rdd.zipWithIndex() rdd = rdd.map(lambda x: list(flat(x))) df = spark.createDataframe(rdd, schema) df = df.where(F.col('index').between(0, 49999)) # 选择 index 为 0-49999 共5万条数据 df = df.drop(df.index) # 删除'index'列空值填充
# 步骤1:NaN替换为Null【然后再fillna空值填充,才不会出问题(fillna不会对NaN进行填充)】 # 方式1 df = df.replace(float('nan'), None) # 方式2 import pyspark.sql.functions as F columns = df.columns for column in columns: df = df.withColumn(column, F.when(F.isnan(F.col(column)) | F.col(column).isNull(), None).otherwise(F.col(column))) # 步骤2 fillna空值填充 # 给 'a','b','c' 3列的空值填充为 '0' # 注意:'a','b','c' 3列 需要是 string类型,不然填充无效(如果 'a','b','c' 3列 为 int类型,则填充 0) df_train = df.fillna('0', ['a', 'b', 'c'])删除指定列
# 删除1列 df = df.drop('col1') df = df.drop(df.col1) # 删除多列 cols_drop = ['a', 'b', 'c'] df = df.drop(*cols_drop)选择指定列
df_1 = df.select('a', 'b', 'c') # 或者写成:df_1 = df.select(['a', 'b', 'c'])df某列重命名
df = df.withColumnRenamed('a', 'a_new')df新增一列
df = df.withColumn('a', F.lit(1)) # 给df添加'a'列,值为1df.where条件
# where(sql语句) df_1 = df.where("col1 not in {}".format([1,2,3,4])) # 选择df中'col1'列值不在 [1,2,3,4] 的所有行 作为 df_1整行去重
df = df.distinct()将df的某列转成list
# 方式1 import pandas as pd df_col1 = pd.Dataframe(df.select('a').toPandas(),columns=['a']) # df 为pyspark的df, ‘a’为指定列名 col1_list = list(df_col1['a'])
# 方式2 col1_list = [x['a'] for x in df.select('a').collect()] # df.collect() 的结果是 list(pyspark.sql.types.Row)类型数据对某列进行聚合成list
import pyspark.sql.functions as F df = df.groupBy('a').agg(F.collect_list('b').alias('b_list')) df = df.groupBy(['a','c']).agg(F.collect_list('b').alias('b_list'))对list类型列变成每个元素对应一行
import pyspark.sql.functions as F df = df.select('a', 'b', 'c', F.explode('d_list').alias('d'))将df映射成表写sql
df_test.createOrReplaceTempView('df_test') df_new = spark.sql("select a,b from df_test") # 从df_test中选择a,b两列作为df_new想了解更多,欢迎移步"文渊小站"
里面有更多知识分享,以及一些有意思的小项目~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)