我有一份清单
l = [1,2,3]
和一个DataFrame
df = sc.parallelize([ ['p1','a'],['p2','b'],['p3','c'],]).toDF(('product','name'))
我想获得一个新的DataFrame,其中列表l作为另一列添加,即
+-------+----+---------+|product|name| new_col |+-------+----+---------+| p1| a| 1 || p2| b| 2 || p3| c| 3 |+-------+----+---------+
JOIN的方法,我加入df的时候
sc.parallelize([[1],[2],[3]])
失败了.使用withColumn的方法,如
new_df = df.withColumn('new_col',l)
失败,因为列表不是Column对象.
解决方法 因此,通过阅读一些有趣的东西 here,我已经确定你不能真正只是将随机/任意列附加到给定的DataFrame对象.看起来你想要的更多的是拉链而不是连接.我环顾四周找到了 this ticket,这让我觉得如果你有DataFrame而不是RDD对象你将无法压缩.我能够解决你的问题的唯一方法就是离开DataFrame对象的世界并返回到RDD对象.我还需要为连接创建索引,这可能适用于您的用例,也可能不适用.
l = sc.parallelize([1,3])index = sc.parallelize(range(0,l.count()))z = index.zip(l)rdd = sc.parallelize([['p1','c']])rdd_index = index.zip(rdd)# just in case!assert(rdd.count() == l.count())# perform an inner join on the index we generated above,then map it to look pretty.new_rdd = rdd_index.join(z).map(lambda (x,y): [y[0][0],y[0][1],y[1]])new_df = new_rdd.toDF(["product",'name','new_col'])
当我运行new_df.show()时,我得到:
+-------+----+-------+|product|name|new_col|+-------+----+-------+| p1| a| 1|| p2| b| 2|| p3| c| 3|+-------+----+-------+
旁注:我真的很惊讶这没用.看起来像外部联接?
from pyspark.sql import Rowl = sc.parallelize([1,3])new_row = Row("new_col_name")l_as_df = l.map(new_row).toDF()new_df = df.join(l_as_df)
当我运行new_df.show()时,我得到:
+-------+----+------------+|product|name|new_col_name|+-------+----+------------+| p1| a| 1|| p1| a| 2|| p1| a| 3|| p2| b| 1|| p3| c| 1|| p2| b| 2|| p2| b| 3|| p3| c| 2|| p3| c| 3|+-------+----+------------+总结
以上是内存溢出为你收集整理的python – PySpark:当列是列表时,向DataFrame添加一列全部内容,希望文章能够帮你解决python – PySpark:当列是列表时,向DataFrame添加一列所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)