长话短说,Spark(包括PySpark)并非旨在在单个应用程序中处理多个上下文。如果您对故事的JVM感兴趣,我建议阅读SPARK-2243(已解决,
无法解决 )。
在PySpark中做出了许多设计决定,这些决定反映了这些决定,包括但不限于单例Py4J网关。实际上,您不能
SparkContexts在一个应用程序中包含多个。
SparkSession不仅绑定
SparkContext而且本身也带来问题,例如如果使用本地Hive元存储,则处理本地(独立)Hive元存储。而且,有些函数在
SparkSession.builder.getOrCreate内部使用,并且
取决于您现在看到的行为 。一个明显的例子是UDF注册。如果存在多个SQL上下文,则其他功能可能会表现出意外的行为(例如
RDD.toDF)。
我个人认为,多重环境不仅不受支持,而且还违反了单一责任原则。您的业务逻辑不应与所有设置,清理和配置详细信息有关。
我的个人建议如下:
如果应用程序包含多个可组成的一致模块,并受益于具有缓存和公共metastore的单个执行环境,则在应用程序入口点初始化所有必需的上下文,并在必要时将它们传递给各个管道:
main.py
:from pyspark.sql import SparkSession
import collect
import processif name == “main”:
spark: SparkSession = …# Pass data between modulescollected = collect.execute(spark)processed = process.execute(spark, data=collected)...spark.stop()
collect.py
/process.py
:from pyspark.sql import SparkSession
def execute(spark: SparkSession, data=None):
…
否则(根据您的描述,这里似乎是这种情况),我将设计入口点以执行单个管道,并使用外部worfklow管理器(例如Apache Airflow或Toil)来处理执行。
它不仅更清洁,而且允许更灵活的故障恢复和调度。
建设者当然可以做同样的事情,但是就像一个聪明的人曾经说过的: 显式胜于隐式。
* `main.py` import argparse from pyspark.sql import SparkSession import collect import process pipelines = {"collect": collect, "process": process} if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--pipeline') args = parser.parse_args() spark: SparkSession = ... # Execute a single pipeline only for side effects pipelines[args.pipeline].execute(spark) spark.stop()* `collect.py`/`process.py`与上一点一样。
我会以一种方式或另一种方式保留建立上下文的一个地方,而将其拆解的只有一个地方。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)