Spark *** 作MongoDB案例

Spark *** 作MongoDB案例,第1张

Spark *** 作MongoDB案例 一、引入依赖

        pom文件中引入mongo-spark依赖,如下所示:

 
            org.mongodb.spark
            mongo-spark-connector_2.11
            ${spark.version}
 

二、spark写数据到MongoDB
object SparkMongoWriteJob {
  def main(args: Array[String]): Unit = {
    //Spark配置项
    val conf = new SparkConf()
      .setAppName("SparkMongoWriteJob")
      .setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val ss = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    val testList = List(
      Test(1, "张三"),
      Test(2, "李四"),
      Test(3, "王五"),
      Test(4, "赵六"),
      Test(5, "秦七")
    )

    val dataDF = ss.createDataframe(testList)
    dataDF.show()

    //数据写入Mongo
    val config = WriteConfig.create("test",  //数据库名
      "test",   //表名
      "mongodb://localhost",  //uri
      2,   //线程数
      WriteConcern.ACKNOWLEDGED)

    MongoSpark.save(dataDF, config)

    //关闭资源
    ss.stop()


  }

  //样例测试类
  case class Test(_id: Int, name: String)

}
三、spark从MongoDB读取数据
object SparkMongoReadJob {
  def main(args: Array[String]): Unit = {
    //Spark配置项
    val conf = new SparkConf()
      .setAppName("SparkMongoReadJob")
      .setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val ss = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    val map = new util.HashMap[String,String]()
    map.put("spark.mongodb.input.database","test")  //数据库
    map.put("spark.mongodb.input.collection","test")  //表
    map.put("spark.mongodb.input.uri","mongodb://localhost")  //uri


    val readConfig = ReadConfig.create(map)
    val dataDF = MongoSpark.load(ss,readConfig)
    dataDF.show()


    //关闭资源
    ss.stop()


  }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5653065.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存