如何拆卸SparkSession并在一个应用程序中创建一个新的SparkSession?

如何拆卸SparkSession并在一个应用程序中创建一个新的SparkSession?,第1张

如何拆卸SparkSession并在一个应用程序中创建一个新的SparkSession?

长话短说,Spark(包括PySpark)并非旨在在单个应用程序中处理多个上下文。如果您对故事的JVM感兴趣,我建议阅读SPARK-2243(已解决,
无法解决 )。

在PySpark中做出了许多设计决定,这些决定反映了这些决定,包括但不限于单例Py4J网关。实际上,您不能

SparkContexts
在一个应用程序中包含多个。
SparkSession
不仅绑定
SparkContext
而且本身也带来问题,例如如果使用本地Hive元存储,则处理本地(独立)Hive元存储。而且,有些函数在
SparkSession.builder.getOrCreate
内部使用,并且
取决于您现在看到的行为 。一个明显的例子是UDF注册。如果存在多个SQL上下文,则其他功能可能会表现出意外的行为(例如
RDD.toDF
)。

我个人认为,多重环境不仅不受支持,而且还违反了单一责任原则。您的业​​务逻辑不应与所有设置,清理和配置详细信息有关。

我的个人建议如下:

  • 如果应用程序包含多个可组成的一致模块,并受益于具有缓存和公共metastore的单个执行环境,则在应用程序入口点初始化所有必需的上下文,并在必要时将它们传递给各个管道:

    • main.py

          from pyspark.sql import SparkSession

      import collect
      import process

      if name == “main”:
      spark: SparkSession = …

      # Pass data between modulescollected = collect.execute(spark)processed = process.execute(spark, data=collected)...spark.stop()
    • collect.py
      /
      process.py

          from pyspark.sql import SparkSession

      def execute(spark: SparkSession, data=None):

  • 否则(根据您的描述,这里似乎是这种情况),我将设计入口点以执行单个管道,并使用外部worfklow管理器(例如Apache Airflow或Toil)来处理执行。

它不仅更清洁,而且允许更灵活的故障恢复和调度。

建设者当然可以做同样的事情,但是就像一个聪明的人曾经说过的: 显式胜于隐式。

* `main.py` import argparse    from pyspark.sql import SparkSession    import collect    import process    pipelines = {"collect": collect, "process": process}    if __name__ == "__main__":        parser = argparse.ArgumentParser()        parser.add_argument('--pipeline')        args = parser.parse_args()        spark: SparkSession = ...        # Execute a single pipeline only for side effects        pipelines[args.pipeline].execute(spark)        spark.stop()* `collect.py`/`process.py`与上一点一样。

我会以一种方式或另一种方式保留建立上下文的一个地方,而将其拆解的只有一个地方。



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5662148.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存