如何使用JDBC源在(Py)Spark中写入和读取数据?

如何使用JDBC源在(Py)Spark中写入和读取数据?,第1张

如何使用JDBC源在(Py)Spark中写入和读取数据? 写数据
  1. 提交应用程序或启动Shell时,包括适用的JDBC驱动程序。您可以使用例如
    --packages
    bin/pyspark --packages group:name:version

或结合

driver-class-path
jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

也可以

PYSPARK_SUBMIT_ARGS
在启动JVM实例之前使用环境变量来设置这些属性,或使用
conf/spark-defaults.conf
set
spark.jars.packages
spark.jars
/来设置这些属性
spark.driver.extraClassPath

  1. 选择所需的模式。Spark JDBC编写器支持以下模式:
* `append`:将此:class:的内容追加`Dataframe`到现有数据中。* `overwrite`:覆盖现有数据。* `ignore`:如果数据已经存在,请静默忽略此 *** 作。* `error` (默认情况):如果数据已经存在,则引发异常。

不支持更新或其他细粒度的修改

    mode = ...
  1. 准备JDBC URI,例如:

    # You can enpre credentials in URI or pass
    separately using properties argumentof jdbc method or options

    url = “jdbc:postgresql://localhost/foobar”

  2. (可选)创建JDBC参数字典。

    properties = {"user": "foo","password": "bar"

    }

properties
/
options
还可以用于设置支持的JDBC连接属性。

  1. 采用
    Dataframe.write.jdbc
    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)

保存数据(

pyspark.sql.DataframeWriter
有关详细信息,请参阅)。

已知问题

  • 使用
    --packages
    java.sql.SQLException: No suitable driver found for jdbc: ...
    )包含驱动程序后,找不到合适的驱动程序

假设没有驱动程序版本不匹配可以解决此问题,则可以将

driver
类添加到中
properties
。例如:

    properties = {    ...    "driver": "org.postgresql.Driver"}
  • 使用
    df.write.format("jdbc").options(...).save()
    可能会导致:

java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource不允许创建表为select。

解决方案未知。

  • 在Pyspark 1.3中,您可以尝试直接调用Java方法:
    df._jdf.insertIntoJDBC(url, "baz", True)
读取数据
  1. 遵循 写数据中的 步骤1-4 __
  2. 用途
    sqlContext.read.jdbc
    sqlContext.read.jdbc(url=url, table="baz", properties=properties)

sqlContext.read.format("jdbc")

    (sqlContext.read.format("jdbc")    .options(url=url, dbtable="baz", **properties)    .load())

已知问题和陷阱

  • 找不到合适的驱动程序-请参阅:写入数据
  • Spark SQL支持JDBC源的谓词下推,尽管并非所有谓词都可以下推。它也没有委派限制或聚合。可能的解决方法是用有效的子查询替换
    dbtable
    /
    table
    参数。
  • 默认情况下,JDBC数据源使用单个执行程序线程顺序加载数据。为确保分布式数据加载,您可以:

    • 提供分区
      column
      (必须
      IntegeType
      lowerBound
      upperBound
      numPartitions
    • 提供互斥谓词的列表,
      predicates
      每个所需分区一个。

看到:

* [通过JDBC从RDBMS读取时,对spark进行分区](https://stackoverflow.com/q/43150694/6910411),* [从JDBC源迁移数据时如何优化分区?](https://stackoverflow.com/q/52603131/6910411),* [如何使用Dataframe和JDBC连接提高慢速Spark作业的性能?](https://stackoverflow.com/q/32188295/6910411)* [使用JDBC导入Postgres时如何对Spark RDD进行分区?](https://stackoverflow.com/q/39597971/6910411)
  • 在分布式模式(具有分区列或谓词)中,每个执行程序都在其自己的事务中运行。如果同时修改源数据库,则不能保证最终视图将保持一致。
在哪里找到合适的驱动程序:
  • Maven存储库(以获取用于

    --packages
    选择所需版本的所需坐标,并从Gradle选项卡中以
    compile-group:name:version
    替换各个字段的形式复制数据)或Maven Central存储库:

    • PostgreSQL的
    • 的MySQL
其他选择

根据数据库的不同,可能存在专门的来源,并且在某些情况下是首选的来源:

  • Greenplum-关键Greenplum-Spark连接器
  • Apache Phoenix- Apache Spark插件
  • Microsoft SQL Server- Azure SQL数据库和SQL Server的Spark连接器
  • Amazon Redshift- Databricks Redshift连接器(当前版本仅在专有Databricks Runtime中可用。已停产的开源版本,可在GitHub上获得)。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5649974.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存