import findspark findspark.init() from datetime import datetime,date import pandas as pd from pyspark.sql import Row,SparkSession spark=SparkSession.builder.getOrCreate() df=spark.createDataframe([Row(a=1,b=2.,c='string1',d=date(2000,1,1),e=datetime(2000,1,1,12,0)), Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)), Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)) ]) df1=spark.createDataframe([ (1,2.,'string',date(2000,1,1),datetime(2000,1,1,12,0)), (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)) ],schema='a long,b double,c string,d date,e timestamp') pandas_df=pd.Dataframe({'a':[1,2,3], 'b':[2.,3.,4.], 'c':['string1','string2','string3'], 'd':[date(2000,1,1),date(2000,2,1),date(2000,3,1)], 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)] }) df3 = spark.createDataframe(pandas_df) rdd=spark.sparkContext.parallelize([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))]) df4=spark.createDataframe(rdd,schema=['a','b','c','d','e']) # spark.conf.set("spark.sql.repl.eagereval.enabled",True) df.show(1,vertical=True) ''' a | 1 b | 2.0 c | string1 d | 2000-01-01 e | 2000-01-01 12:00:00 ''' # 筛选描述性统计 df.select("a","b","c").describe().show() ''' |summary| a| b| c| +-------+------------------+------------------+-------+ | count| 3| 3| 3| | mean|2.3333333333333335|3.3333333333333335| null| | stddev|1.5275252316519468|1.5275252316519468| null| | min| 1| 2.0|string1| | max| 4| 5.0|string3| +-------+------------------+------------------+-------+ ''' df.collect() ''' [Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)), Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)), Row(a=4, b=5.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))] ''' df.take(1) ''' [Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))] ''' df.tail(1) ''' [Row(a=4, b=5.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))] ''' df.toPandas() ''' a b c d e 0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00 1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00 2 4 5.0 string3 2000-03-01 2000-01-03 12:00:00 ''' from pyspark.sql import Column from pyspark.sql.functions import upper type(df.c)==type(upper(df.c))==type(df.c.isNull()) df.select(df.c).show() ''' | c| +-------+ |string1| |string2| |string3| +-------+ ''' df.withColumn('upper_c',upper(df.c)).show() ''' | a| b| c| d| e|upper_c| +---+---+-------+----------+-------------------+-------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1| | 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2| | 4|5.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3| +---+---+-------+----------+-------------------+-------+ ''' df.filter(df.a==1).show() ''' | a| b| c| d| e| +---+---+-------+----------+-------------------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00| +---+---+-------+----------+-------------------+ ''' from pyspark.sql.functions import pandas_udf @pandas_udf('long') def pandas_plus_one(series:pd.Series)->pd.Series: return series+1 @pandas_udf('long') def pandas_reduce_one(series:pd.Series)->pd.Series: return series+1 df.select(pandas_plus_one(df.a)).show() def pandas_filter_func(iterator): for pandas_df in iterator: yield pandas_df[pandas_df.a==1] df.mapInPandas(pandas_filter_func,schema=df.schema).show() df5 = spark.createDataframe([ ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30], ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60], ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2']) df5.show() df5.groupby('color').avg().show() ''' +-----+-------+-------+ |color|avg(v1)|avg(v2)| +-----+-------+-------+ | red| 4.8| 48.0| | blue| 3.0| 30.0| |black| 6.0| 60.0| +-----+-------+-------+ ''' def plus_mean(pandas_df): return pandas_df.assign(v1=pandas_df.v1-pandas_df.v1.mean()) df5.groupby('color').applyInPandas(plus_mean, schema=df5.schema).show() df6=spark.createDataframe( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ('time', 'id', 'v1')) df7= spark.createDataframe( [(20000101, 1, 'x'), (20000101, 2, 'y')], ('time', 'id', 'v2')) def asof_join(l,r): return pd.merge_asof(l,r,on='time',by='id') df6.groupby('id').cogroup(df7.groupby('id')).applyInPandas(asof_join,schema='time int, id int, v1 double, v2 string').show() ''' +--------+---+---+---+ | time| id| v1| v2| +--------+---+---+---+ |20000101| 1|1.0| x| |20000102| 1|3.0| x| |20000101| 2|2.0| y| |20000102| 2|4.0| y| +--------+---+---+---+ ''' df5.write.csv('foo.csv',header=True) spark.read.csv('foo.csv',header=True).show() df5.write.parquet('bar.parquet') spark.read.parquet('bar.parquet').show() df5.write.orc('zoo.orc') spark.read.orc('zoo.orc').show() df5.createOrReplaceTempView("tableA") spark.sql("SELECT count(*) from tableA").show() ''' +--------+ |count(1)| +--------+ | 8| +--------+ ''' @pandas_udf("integer") def add_one(s: pd.Series) -> pd.Series: return s + 1 spark.udf.register("add_one",add_one) spark.sql("SELECt color,fruit,v1,add_one(v1) FROM tableA").show() ''' +-----+------+---+-----------+ |color| fruit| v1|add_one(v1)| +-----+------+---+-----------+ | red|banana| 1| 2| | blue|banana| 2| 3| | red|carrot| 3| 4| | blue| grape| 4| 5| | red|carrot| 5| 6| |black|carrot| 6| 7| | red|banana| 7| 8| | red| grape| 8| 9| +-----+------+---+-----------+ ''' df5.selectExpr("add_one(v1)").show() ''' +-----------+ |add_one(v1)| +-----------+ | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| +-----------+ ''' from pyspark.sql.functions import expr # df5.select(expr('count(*)')>0).show() +--------------+ |(count(1) > 0)| +--------------+ | true| +--------------+
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)