记录-bigdata-使用Scala语言,使用Spark抽取MySQL指定数据表中的数据到HIVE的ODS层的表中

记录-bigdata-使用Scala语言,使用Spark抽取MySQL指定数据表中的数据到HIVE的ODS层的表中,第1张

有问题可以私聊我交流

我这里是完成编码之后,打包发送到集群上运行的!!!

1.使用IDEA创建MAVEN项目

pom配置如下


  4.0.0
  com.tledu
  llll
  1.0-SNAPSHOT
  ${project.artifactId}
  My wonderfull scala app
  2018
  
    
      My License
      http://....
      repo
    
  

  
    1.8
    1.8
    UTF-8
    2.11.11
    2.11
    4.2.0
  

  
    
      org.scala-lang
      scala-library
      ${scala.version}
    


    
      org.apache.spark
      spark-core_${scala.compat.version}
      2.3.2
      provided
    

    
      org.apache.spark
      spark-sql_${scala.compat.version}
      2.3.2
      provided
    

    
      org.apache.spark
      spark-hive_2.11
      2.0.2
      provided
    

    
      mysql
      mysql-connector-java
      8.0.23
    



    
    
      junit
      junit
      4.12
      test
    
    
      org.scalatest
      scalatest_${scala.compat.version}
      3.0.5
      test
    
    
      org.specs2
      specs2-core_${scala.compat.version}
      ${spec2.version}
      test
    
    
      org.specs2
      specs2-junit_${scala.compat.version}
      ${spec2.version}
      test
    
  

  
    src/main/scala
    src/test/scala
    
      
        
        net.alchim31.maven
        scala-maven-plugin
        3.3.2
        
          
            
              compile
              testCompile
            
            
              
                -dependencyfile
                ${project.build.directory}/.scala_dependencies
              
            
          
        
      
      
        org.apache.maven.plugins
        maven-surefire-plugin
        2.21.0
        
          
          true
        
      
      
        org.scalatest
        scalatest-maven-plugin
        2.0.0
        
          ${project.build.directory}/surefire-reports
          .
          TestSuiteReport.txt
          
          samples.AppTest
        
        
          
            test
            
              test
            
          
        
      

      
        maven-assembly-plugin
        
          
            jar-with-dependencies
          
        
        
          
            make-assembly
            package
            
              assembly
            
          
        
      
    
  

编码过程如下

// 1. 构建sparkSession 
    val sparkSession = SparkSession
      .builder()
      .appName("抽取mysql数据到hive")
      .enableHiveSupport() // 开启hive支持
      //.master("local[2]") // 指定运行模式,使用本地模式进行调试, 启动的时候指定即可,这个参数只在本地调试的时候使用
      .getOrCreate()

//定义函数,获取mysql链接
 def extractFromMysql(sparkSession: SparkSession, tableName: String): DataFrame = {
    val DB_URL = "jdbc:mysql://  ip地址   /库名"
    val jdbcMap = Map(
      "driver" -> "com.mysql.jdbc.Driver",
      "url" -> DB_URL,
      "dbtable" -> tableName,
      "user" -> "用户名",
      "password" -> "密码"
    )
    sparkSession.read.format("jdbc").options(jdbcMap).load()
  }

//调用函数获取dataframe
    val df = extractFromMysql(sparkSession, "tablename")

 // 加载hive表数据
    // 切换数据库
    sparkSession.sql("use hive库名")
// 读取数据
    // spark 可以直接 *** 作非事务表,但是无法 *** 作事务表
    val customerDF = sparkSession.sql(
      """
        | select * from customer
        |""".stripMargin)
    // hive表中的数据
    customerDF.show()

    // 把数据存进去,全量的数据存储
    df.write.mode(SaveMode.Append).format("hive").saveAsTable("customer")

    sparkSession.close()

加一段抽取到分区表的代码

// 1. 获取sparksession
    val sparkSession = SparkSession
      .builder()
      .appName("抽取分区表")
      .enableHiveSupport() // 开启hive支持
      // 增加了对hive的配置,设置的动态分区的大小
      .config("hive.exec.max.dynamic.partitions",5000)
      // 配置动态分区的支持
      .config("hive.exec.dynamic.partition.mode","nonstric")
      .getOrCreate()


 def extractFromMysql(sparkSession: SparkSession, tableName: String): DataFrame = {
      val DB_URL = "jdbc:mysql:// 数据库IP地址 / 库名"
      val jdbcMap = Map(
        "driver" -> "com.mysql.jdbc.Driver",
        "url" -> DB_URL,
        "dbtable" -> tableName,
        "user" -> "root",
        "password" -> "123456"
      )
      sparkSession.read.format("jdbc").options(jdbcMap).load()
    }


// 加载mysql中的数据
    val df = extractFromMysql(sparkSession, "ORDERS")
    df.createOrReplaceTempView("mysql_orders")
    //使用hive中的数据
    sparkSession.sql("use ods")

    
    sparkSession.sql(
      """
        |insert overwrite table orders partition(etldate='2022-03-27')
        |select * from mysql_orders limit 5000
        |""".stripMargin)


   //获取增量数据,这里是只把新增的数据抽取到ods层
        //val addDF = df.except(customerDF)

        //addDF.write.mode(SaveMode.Append).format("hive").saveAsTable("customer")
    sparkSession.close()

编码之后可能爆红,因为没有引入spark的jar包和mysql-connect的jar包

点击idea右上角的这里

 

 引入你的jar包

(我在这里引入完之后  习惯   先mvn clean install  再rebuild   再restart    如果你不习惯,就看下一句)

之后就好了  不行就   rebuild   和   restart

最后mvn clean   mvn package    执行打包 *** 作

2.在集群上建个空的hive表

可以用脚本建表(我用的脚本)

#! /bin/bash 

hive -e "
	use hive库名;
	CREATE  TABLE CUSTOMER (
		CUSTKEY INT comment '',
		NAME string comment '',
		ADDRESS string comment '',
		NATIONKEY string comment '',
                PHONE string comment '',
                ACCTBAL string comment '',
                MKTSEGMENT string comment '',
		COMMENT string comment ''
	)
	comment 'customer表'
	ROW FORMAT DELIMITED 
	FIELDS TERMINATED BY '#! /bin/bash
export HADOOP_CONF_DIR=/usr/hdp/3.1.0.0-78/hadoop/conf
/usr/hdp/3.1.0.0-78/spark2/bin/spark-submit \
--class 这里是你要运行的类 \
--master local[2] \
--driver-memory 512m \
--executor-memory 512m \
--num-executors 2 \
/这里是你jar包的地址    最前面有这个/哦
1' 
	LINES TERMINATED BY '\n'
	STORED AS textfile
	TBLPROPERTIES(
		'transactional'='false'
	); 
"
3. 上传你打好的包

在集群上   rz -bye   即可上传

可以写个脚本运行你的包(我写的脚本)

 

sh start.sh     运行脚本    MYSQL数据就导入进HIVE数据库了

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/991609.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存