六十三、Spark-读取数据并写入数据库

六十三、Spark-读取数据并写入数据库,第1张

六十三、Spark-读取数据并写入数据库

支持的数据源-JDBC

需求说明:使用Spark流式计算 将数据写入MySQL,并读取数据库信息进行打印

文章目录

支持的数据源-JDBC

项目主体架构

pom.xml依赖

创建数据库

业务逻辑

完整代码

程序运行

项目总结


项目主体架构

pom.xml依赖


    4.0.0

    cn.itcast
    SparkDemo
    1.0-SNAPSHOT

    
        
            aliyun
            http://maven.aliyun.com/nexus/content/groups/public/
        
        
            apache
            https://repository.apache.org/content/repositories/snapshots/
        
        
            cloudera
            https://repository.cloudera.com/artifactory/cloudera-repos/
        
    
    
        UTF-8
        1.8
        1.8
        2.12.11
        3.0.1
        2.7.5
    
    
        
        
            org.scala-lang
            scala-library
            ${scala.version}
        

        
        
            org.apache.spark
            spark-core_2.12
            ${spark.version}
        

        
        
            org.apache.spark
            spark-streaming_2.12
            ${spark.version}
        

        
        
            org.apache.spark
            spark-streaming-kafka-0-10_2.12
            ${spark.version}
        

        
        
            org.apache.spark
            spark-sql_2.12
            ${spark.version}
        

        
        
            org.apache.spark
            spark-hive_2.12
            ${spark.version}
        
        
            org.apache.spark
            spark-hive-thriftserver_2.12
            ${spark.version}
        

        
        
            org.apache.spark
            spark-sql-kafka-0-10_2.12
            ${spark.version}
        

        
        
            org.apache.spark
            spark-mllib_2.12
            ${spark.version}
        

        
            org.apache.hadoop
            hadoop-client
            2.7.5
        

        
            com.hankcs
            hanlp
            portable-1.7.7
        

        
            mysql
            mysql-connector-java
            8.0.23
        

        
            redis.clients
            jedis
            2.9.0
        

        
            com.alibaba
            fastjson
            1.2.47
        

        
            org.projectlombok
            lombok
            1.18.2
            provided
        
    

    
        src/main/scala
        
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.5.1
            
            
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
                
                    
                        
                            compile
                            testCompile
                        
                        
                            
                                -dependencyfile
                                ${project.build.directory}/.scala_dependencies
                            
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-surefire-plugin
                2.18.1
                
                    false
                    true
                    
                        ***Suite.*
                    
                
            
            
                org.apache.maven.plugins
                maven-shade-plugin
                2.3
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    *:*
                                    
                                        meta-INF/*.SF
                                        meta-INF/*.DSA
                                        meta-INF/*.RSA
                                    
                                
                            
                            
                                
                                    
                                
                            
                        
                    
                
            
        
    

        注:pom依赖在业务实施中是极其重要的一环,相当于配置文件,例如可能需要的 jar 包,可能需要的 Scala 语言版本都在此处进行配置 等等

创建数据库
CREATE TABLE `data` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

业务逻辑

1、创建本地环境,并设置日志提示级别

val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")

2、加载数据,创建RDD

val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("tuomasi", 21), ("孙悟空", 19), ("猪八戒", 20)))

3、分区迭代

dataRDD.foreachPartition(iter => {
})

4、加载驱动

val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123456")

5、封装SQL语句

val sql: String = "INSERT INTO `data` (`id`, `name`, `age`) VALUES (NULL, ?, ?);"

val ps: PreparedStatement = conn.prepareStatement(sql)

6、数据处理

iter.foreach(t => { //t就表示每一条数据
val name: String = t._1
val age: Int = t._2
ps.setString(1, name)
ps.setInt(2, age)
ps.addBatch()
})
ps.executeBatch()

7、关闭连接

if (conn != null) conn.close()
if (ps != null) ps.close()

8、读取数据库

val getConnection = () => DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123456")

9、SQL语句上下界设定以及分区数设置

val studentTupleRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD[(Int, String, Int)](
      sc,
      getConnection,
      sql,
      1,      //id为1~20之间的记录进行提取
      20,
      1,
      mapRow
    )

10、结果集处理函数

val mapRow: ResultSet => (Int, String, Int) = (r: ResultSet) => {
      val id: Int = r.getInt("id")
      val name: String = r.getString("name")
      val age: Int = r.getInt("age")
      (id, name, age)
    }

11、遍历打印数据

studentTupleRDD.foreach(println)

完整代码
package org.example.spark

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

object RDD_DataSource {
  def main(args: Array[String]): Unit = {
    //TODO 0.env/创建环境
    val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //TODO 1.source/加载数据/创建RDD
    //RDD[(姓名, 年龄)]
    val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("tuomasi", 21), ("孙悟空", 19), ("猪八戒", 20)))

    //TODO 2.transformation
    //TODO 3.sink/输出
    //需求:将数据写入到MySQL,再从MySQL读出来
    dataRDD.foreachPartition(iter => {
      //加载驱动
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123456")

      val sql: String = "INSERT INTO `data` (`id`, `name`, `age`) VALUES (NULL, ?, ?);"

      val ps: PreparedStatement = conn.prepareStatement(sql)

      iter.foreach(t => { //t就表示每一条数据
        val name: String = t._1
        val age: Int = t._2
        ps.setString(1, name)
        ps.setInt(2, age)
        ps.addBatch()
        //ps.executeUpdate()
      })
      ps.executeBatch()
      //关闭连接
      if (conn != null) conn.close()
      if (ps != null) ps.close()
    })

    //    //从MySQL读取
    val getConnection = () => DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123456")
    val sql: String = "select id,name,age from data where id >= ? and id <= ?"
    val mapRow: ResultSet => (Int, String, Int) = (r: ResultSet) => {
      val id: Int = r.getInt("id")
      val name: String = r.getString("name")
      val age: Int = r.getInt("age")
      (id, name, age)
    }
    val studentTupleRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD[(Int, String, Int)](
      sc,
      getConnection,
      sql,
      1,
      20,
      1,
      mapRow
    )
    studentTupleRDD.foreach(println)
  }
}

程序运行

控制台打印

 数据库查看

         注:此为实验案例,在真实的场景中往往数据都是数以万计级别或者更多,优秀的代码往往体现在数据量极大的场景下,调优不失为一种升职加薪的必备技能

项目总结

        总结:在代码编写过程中,难免出现知识匮乏,在遇到问题时,养成多看源码的好习惯,在以后的开发书写过程中会有事半功倍的效果,当然日志,及其 debug 的作用在开发中也不容小觑。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5716852.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存