pom文件中引入mongo-spark依赖,如下所示:
二、spark写数据到MongoDB
org.mongodb.spark
mongo-spark-connector_2.11
${spark.version}
object SparkMongoWriteJob { def main(args: Array[String]): Unit = { //Spark配置项 val conf = new SparkConf() .setAppName("SparkMongoWriteJob") .setMaster("local[2]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ss = SparkSession.builder() .config(conf) .getOrCreate() val testList = List( Test(1, "张三"), Test(2, "李四"), Test(3, "王五"), Test(4, "赵六"), Test(5, "秦七") ) val dataDF = ss.createDataframe(testList) dataDF.show() //数据写入Mongo val config = WriteConfig.create("test", //数据库名 "test", //表名 "mongodb://localhost", //uri 2, //线程数 WriteConcern.ACKNOWLEDGED) MongoSpark.save(dataDF, config) //关闭资源 ss.stop() } //样例测试类 case class Test(_id: Int, name: String) }三、spark从MongoDB读取数据
object SparkMongoReadJob { def main(args: Array[String]): Unit = { //Spark配置项 val conf = new SparkConf() .setAppName("SparkMongoReadJob") .setMaster("local[2]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ss = SparkSession.builder() .config(conf) .getOrCreate() val map = new util.HashMap[String,String]() map.put("spark.mongodb.input.database","test") //数据库 map.put("spark.mongodb.input.collection","test") //表 map.put("spark.mongodb.input.uri","mongodb://localhost") //uri val readConfig = ReadConfig.create(map) val dataDF = MongoSpark.load(ss,readConfig) dataDF.show() //关闭资源 ss.stop() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)