定义:
> sampleDF是样本数据框,具有用于查找目的的列表记录.
> sampleDS是一个RDD,其中包含元素列表.
> mapPingFunction是在sampleDF中查找sampleDS的元素,如果它们存在于sampleDF中则映射到1,如果不存在则映射到0.
我有一个映射函数如下:
def mapPingFunction(element): # The dataframe lookup! lookupResult = sampleDF.filter(sampleDF[0] == element).collect() if len(lookupResult) > 0: print lookupResult return 1 return 0
问题:
在映射函数之外访问sampleDF工作得非常好但是只要我在函数内部使用它,我就会收到以下错误:
py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getmethod(ReflectionEngine.java:335) at py4j.reflection.ReflectionEngine.getmethod(ReflectionEngine.java:344) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:744)
我还尝试了什么:
我确实尝试保存一个临时表并在map函数中使用sqlContext select仍然无法使其工作.这是我得到的错误:
file "/usr/lib64/python2.6/pickle.py",line 286,in save f(self,obj) # Call unbound method with explicit self file "/usr/lib64/python2.6/pickle.py",line 649,in save_dict self._batch_setitems(obj.iteritems()) file "/usr/lib64/python2.6/pickle.py",line 686,in _batch_setitems save(v) file "/usr/lib64/python2.6/pickle.py",line 331,in save self.save_reduce(obj=obj,*rv) file "/opt/spark/python/pyspark/cloudpickle.py",line 542,in save_reduce save(state) file "/usr/lib64/python2.6/pickle.py",line 681,line 306,in save rv = reduce(self.proto)TypeError: 'JavaPackage' object is not callable
我要的是:
我试图通过简单的例子来简化我的问题.任何有关如何在地图功能中使用数据框的帮助都非常受欢迎.
最佳答案这不可能. Spark不支持对分布式数据结构(RDD,DataFrame,Datasets)的嵌套 *** 作.即使它确实执行了大量的工作也不是一个好主意.根据您显示的代码,您可能希望将RDD转换为DataFrame并执行join wit(rdd.map(x => (x,)).toDF(["element"]) .join(sampleDF,sampleDF[0] == df[0]) .groupBy("element") .agg(count("element") > 0))
在侧面说明打印内部地图是完全没用的,不提及它增加额外的IO开销. 总结
以上是内存溢出为你收集整理的如何在Spark中的map函数中使用数据帧?全部内容,希望文章能够帮你解决如何在Spark中的map函数中使用数据帧?所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)