这是我能想到的最简单的DataFrame.我正在使用PySpark 1.6.1.
# one row of datarows = [ (1,2) ]cols = [ "a","b" ]df = sqlContext.createDataFrame(rows,cols)
所以数据框完全适合内存,没有对任何文件的引用,对我来说看起来很微不足道.
然而,当我收集数据时,它使用2000个执行程序:
df.collect()
在收集期间,使用2000执行者:
[Stage 2:===================================================>(1985 + 15) / 2000]
然后是预期的输出:
[Row(a=1,b=2)]
为什么会这样? DataFrame不应该完全在驱动程序的内存中吗?最佳答案所以我稍微研究了一下代码,试图弄清楚发生了什么.似乎sqlContext.createDataFrame确实没有尝试根据数据设置合理的参数值.
为什么2000任务?
Spark使用2000个任务,因为我的数据框有2000个分区. (尽管看起来像分区比行更明显是胡说八道.)
这可以通过以下方式看出:
>>> df.rdd.getNumPartitions()2000
为什么DataFrame有2000个分区?
发生这种情况是因为sqlContext.createDataFrame使用默认的分区数(在我的情况下为2000)结束,而不管数据的组织方式或数据的行数.
代码跟踪如下.
在sql / context.py中,sqlContext.createDataFrame函数调用(在本例中):
rdd,schema = self._createFromLocal(data,schema)
反过来调用:
return self._sc.parallelize(data),schema
sqlContext.parallelize函数在context.py中定义:
numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism
没有检查行数,也无法从sqlContext.createDataFrame指定切片数.
如何更改DataFrame的分区数?
使用DataFrame.coalesce.
>>> smdf = df.coalesce(1)>>> smdf.rdd.getNumPartitions()1>>> smdf.explain()== Physical Plan ==Coalesce 1+- Scan ExistinGrdD[a#0L,b#1L]>>> smdf.collect()[Row(a=1,b=2)]
总结 以上是内存溢出为你收集整理的python – 为什么带有1行的DataFrame上的collect()使用2000个exectors?全部内容,希望文章能够帮你解决python – 为什么带有1行的DataFrame上的collect()使用2000个exectors?所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)