通过sparksql读取presto中的数据存到clickhouse

通过sparksql读取presto中的数据存到clickhouse,第1张

通过sparksql读取presto中的数据存到clickhouse

整体结构

Config

package com.fuwei.bigdata.profile.conf

import org.slf4j.LoggerFactory
import scopt.OptionParser


case class Config(
                 env:String = "",
                 username:String = "",
                 password:String = "",
                 url:String = "",
                 cluster:String = "",
                 startDate:String = "",
                 endDate:String = "",
                 proxyUser:String = "",
                 topK:Int = 25
                 )

object Config{

    private val logger = LoggerFactory.getLogger("Config")

    

    def parseConfig(obj:Object,args:Array[String]):Config = {
        //1、通过我们的类名获取到程序名
        val programName: String = obj.getClass.getSimpleName.replaceAll("\$", "")

        //2、获取到一个解析器,解析器解析参数
        val parser = new OptionParser[Config]("spark sql "+programName) {
            //2.1添加使用说明
            head(programName,"v1.0") //就相当于抬头
            //2.2给env属性赋值
            //这种效果就是-v或者--v ,后面的text()是说明的内容
            opt[String]('e',"env").required().action((x,config) => config.copy(env = x)).text("dev or prod")
            opt[String]('n',name = "proxyUser").required().action((x,config) => config.copy(proxyUser = x)).text("proxy username")
            programName match {
                case "LabelGenerator" => {
                    logger.info("LabelGenerator")
                    opt[String]('n', "username").required().action((x, config) => config.copy(username = x)).text("username")
                    opt[String]('p', "password").required().action((x, config) => config.copy(password = x)).text("password")
                    opt[String]('u', "url").required().action((x, config) => config.copy(url = x)).text("url")
                    opt[String]('c', "cluster").required().action((x, config) => config.copy(cluster = x)).text("cluster")
                }
                case _ =>
            }
        }
        parser.parse(args,Config()) match { //这个主要作用是解析参数,看参数中有没有值
            case Some(conf) => conf
            case None => {
                logger.error("can not parse args")
                System.exit(-1)
                null
            }
        }
    }
}

LabelGenerator

package com.fuwei.bigdata.profile

import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.{SparkUtils, TableUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
import org.slf4j.LoggerFactory


object LabelGenerator {

   private val logger = LoggerFactory.getLogger(LabelGenerator.getClass.getSimpleName)

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.WARN)

        //1、解析参数
        val params: Config = Config.parseConfig(LabelGenerator, args)

        //2、获取SparkSession
        val spark: SparkSession = SparkUtils.getSparkSession(params.env, LabelGenerator.getClass.getSimpleName)
        //val spark: SparkSession = SparkUtils.getSparkSession("dev", "test")
        import spark.implicits._

        //3、读取归属地数据
        val df: Dataframe = spark.read.option("sep", "t").csv("src/main/resources/phoneinfo.txt").toDF("prefix", "phone", "province", "city", "isp", "post_code", "city_code", "area_code", "types")
        df.createOrReplaceTempView("phone_info") //构建一个虚表

        //4、baseFeatrueSql
        val userSql =
            """
              |select
              |t1.distinct_id as uid,
              |t1.gender,
              |t1.age,
              |case when length(t1.mobile) >= 11 then substring(t1.mobile,-11,length(t1.mobile)) else '' end as mobile,
              |case when size(split(t1.email,'@')) = 2 then split(t1.email,'@')[1] else '' end email_suffix,
              |t2.model
              |from ods_news.user_ro as t1 left join dwb_news.user_base_info as t2
              |on t1.distinct_id = t2.uid
              |""".stripMargin


        val userDF: Dataframe = spark.sql(userSql)
        userDF.createOrReplaceTempView("user_info")

        //4.2baseFeatureSql
        val baseFeatureSql =
            """
              |select
              |t1.uid,
              |t1.gender,
              |t1.age,
              |t1.email_suffix,
              |t1.model,
              |concat(ifnull(t2.province,''),ifnull(t2.city,'')) as region
              |from user_info as t1 left join phone_info as t2
              |on
              |t2.phone = substring(t1.mobile,0,7)
              |""".stripMargin

        //4.3、建表
        spark.sql(
            """
              |create table if not exists dws_news.user_profile_base(
              |uid string,
              |gender string,
              |age string,
              |email_suffix string,
              |model string,
              |region string
              |)
              |stored as parquet
              |""".stripMargin)
        //4.4 查询生成df
        val baseFeaturedDF: Dataframe = spark.sql(baseFeatureSql)
        baseFeaturedDF.cache() //对查询的数据进行持久化内存中,用完之后要关闭

        //把查询的数据导入到数据表中(查询生成df数据到HDFS)
        baseFeaturedDF.write.mode(SaveMode.Overwrite).saveAsTable("dws_news.user_profile_base")

        //5、把数据保存到clickhouse:1.meta:(filename,pl),2.占位符
        val meta = TableUtils.getClickHouseUserProfilebaseTable(baseFeaturedDF,params)

        //6、插入ClickHouse数据
         //6.1插入的sql
        val insertCHSql =
            s"""
               |insert into ${TableUtils.USER_PROFILE_CLICKHOUSE_DATAbase}.${TableUtils.USER_PROFILE_CLICKHOUSE_TABLE}(${meta._1}) values(${meta._2})
               |""".stripMargin

        logger.warn(insertCHSql)

        //6.2按分区插入数据到clickhouse的表
        baseFeaturedDF.foreachPartition(partition => {
            TableUtils.insertbaseFeaturedTable(partition,insertCHSql,params)
        })
        baseFeaturedDF.unpersist()//关闭持久化
        //7、释放资源
        spark.stop()
        logger.info("job has success")

    }
}

ClickHouseUtils

package com.fuwei.bigdata.profile.utils

import ru.yandex.clickhouse.ClickHouseDataSource
import ru.yandex.clickhouse.settings.ClickHouseProperties

object ClickHouseUtils {

    
    def getDataSource(username: String, password: String, url: String): ClickHouseDataSource = {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
        val properties = new ClickHouseProperties()
        properties.setUser(username)
        properties.setPassword(password)
        val dataSource = new ClickHouseDataSource(url, properties)
        dataSource
    }

    
    def df2TypeName2CH(dfCol: String): Unit ={
        dfCol.split(",").map(line => {
            val fields: Array[String] = line.split(" ")
            val fName: String = fields(0)
            val fType: String = dfType2chType(fields(1)) //开始类型的转换
            fName +" "+ fType //最后结果变成为age String, gender String
        }).mkString(",")
    }
    
    def dfType2chType(fieldType: String):String = {
        fieldType.toLowerCase() match {
            case "string" => "String"
            case "integer" => "Int32"
            case "long" => "Int64"
            case "bigint" => "Int64"
            case "double" => "Float64"
            case "float" => "Float32"
            case "timestamp" => "Datetime"
            case _ => "String"
        }
    }
}

SparkUtils(这个连接以后可以通用)

package com.fuwei.bigdata.profile.utils

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

object SparkUtils {
    private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)

    def getSparkSession(env:String,appName:String):SparkSession = {
        val conf = new SparkConf()
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .set("spark.sql.hive.metastore.version", "1.2.1")
            .set("spark.sql.cbo.enabled", "true")
            .set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true")
            .set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")

        env match {
            case "prod" => {
                conf.setAppName(appName+"_prod")

                SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
            }
            case "dev" => {
                conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven")
                SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
            }
            case _ => {
                logger.error("not match env")
                System.exit(-1)
                null
            }
        }
    }

}

TableUtils

package com.fuwei.bigdata.profile.utils

import com.qf.bigdata.profile.conf.Config
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.sql.{Dataframe, Row}
import org.slf4j.LoggerFactory
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource}

import java.sql.PreparedStatement


object TableUtils {
    
    def insertbaseFeaturedTable(partition: Iterator[Row], insertCHSql: String, params: Config): Unit = {
        //1、获取到Clickhouse的数据源
        val dataSource: ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username, params.password, params.url)
        val connection: ClickHouseConnection = dataSource.getConnection
        val ps: PreparedStatement = connection.prepareStatement(insertCHSql) //插入数据

        var batchCount = 0
        val batchSize = 2000
        var lastBatchTime = System.currentTimeMillis()
        //2、填充占位符对应的参数值
        partition.foreach(row => {
            var index = 1//设置值的索引下标
            row.schema.fields.foreach(field => {
                field.dataType match {
                    case StringType => ps.setString(index,row.getAs[String](field.name))
                    case LongType => ps.setLong(index,row.getAs[Long](field.name))
                    case IntegerType => ps.setInt(index,row.getAs[Int](field.name))
                    case _ => logger.error(s"type is err,${field.dataType}")
                }
                index +=1
            })
            //3、添加到批
            ps.addBatch()
            batchCount += 1

            //4、控制批次大小
            var currentTime = System.currentTimeMillis()
            if (batchCount >= batchSize || lastBatchTime < currentTime - 3000){
                ps.executeBatch()//执行一批
                logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(currentTime - lastBatchTime)/1000} s")
                batchCount = 0
                lastBatchTime = currentTime
            }
        })

        //5、控制如果没有满足以上条件的时候循环结束之后立刻执行ps中的数据
        ps.executeBatch()
        logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(System.currentTimeMillis() - lastBatchTime)/1000} s")

        //6、释放资源
        ps.close()
        connection.close()

    }

    private val logger = LoggerFactory.getLogger(TableUtils.getClass.getSimpleName)

    

        

        val USER_PROFILE_CLICKHOUSE_DATAbase = "app_news" //创建的数据库
        val USER_PROFILE_CLICKHOUSE_TABLE = "user_profile_base" //创建的表


    def getClickHouseUserProfilebaseTable(baseFeaturedDF: Dataframe, params: Config ):(String,String)= {
            //schema就是获取表的所有元数据(包括以上三个)
            //foldLeft是折叠函数
            
            val (fileName,fieldType,pl) = baseFeaturedDF.schema.fields.foldLeft("","","")(
                (z,f) => {
                    //我们要返回的数据类型是:(age,gender , age string, gender string, ?,?)
                    if (z._1.nonEmpty && z._2.nonEmpty && z._3.nonEmpty){
                        //说明不是第一次拼接
                        (z._1 + "," + f.name, z._2+","+f.name+" "+f.dataType.simpleString, z._3 + ",?")
                    }else{
                        (f.name,f.name+" "+ f.dataType.simpleString,"?")
                    }
                }
            )
            
            val chCol = ClickHouseUtils.df2TypeName2CH(fieldType)


            //5、获取到连接到ch的cluster
            val cluster:String = params.cluster

            //6、创建数据库
            val createCHDatabaseSql =
                s"""
                   |create database if not exisths ${USER_PROFILE_CLICKHOUSE_DATAbase}
                   |""".stripMargin

            //7、创建表
        
        val createCHTableSql =
            s"""
               |create table ${USER_PROFILE_CLICKHOUSE_DATAbase}.${USER_PROFILE_CLICKHOUSE_TABLE}(${chCol})
               |ENGINE = MergeTree()
               |ORDER BY(uid)
               |""".stripMargin

        //8、删除表的SQL
        val dropCHTableSql =
            s"""
               |drop table if exists ${USER_PROFILE_CLICKHOUSE_DATAbase}.${USER_PROFILE_CLICKHOUSE_TABLE}
               |""".stripMargin

        //9、连接clickhouse
        val dataSource:ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username,params.password,params.url)

        val connection: ClickHouseConnection = dataSource.getConnection

        logger.warn(createCHDatabaseSql)
        var ps: PreparedStatement = connection.prepareStatement(createCHDatabaseSql)//建库
        ps.execute()

        logger.warn(dropCHTableSql)
        ps =  connection.prepareStatement(dropCHTableSql) //删表
        ps.execute()

        logger.warn(createCHTableSql)
        ps = connection.prepareStatement(createCHTableSql)//建表
        ps.execute()

        ps.close()
        connection.close()
        logger.info("success!!!!!!!!!")
        (fileName,pl)
    }
}

xml



    4.0.0

    com.fuwei.bigdata
    user-profile
    1.0-SNAPSHOT

    
        2.11.12
        2.3.9
        2.10.1
        3.2.0
        2.6
        2.4.5
        compile
        1.2.3
        
    

    

        
        
            com.alibaba
            fastjson
            ${json.version}
        

        
            org.apache.spark
            spark-core_2.11
            ${spark.version}
            ${scope.type}
        

        
            org.apache.spark
            spark-sql_2.11
            ${spark.version}
            ${scope.type}
        

        
            org.apache.spark
            spark-hive_2.11
            ${spark.version}
            ${scope.type}
        

        
            mysql
            mysql-connector-java
            5.1.47
        

        
            log4j
            log4j
            1.2.17
            ${scope.type}
        

        
            commons-codec
            commons-codec
            1.6
        

        
            org.scala-lang
            scala-library
            ${scala.version}
            ${scope.type}
        

        
            org.scala-lang
            scala-reflect
            ${scala.version}
            ${scope.type}
        

        
            com.github.scopt
            scopt_2.11
            4.0.0-RC2
        

        
            org.apache.hudi
            hudi-spark-bundle_2.11
            0.5.2-incubating
            ${scope.type}
        

        
            org.apache.spark
            spark-avro_2.11
            ${spark.version}
        

        
            com.hankcs
            hanlp
            portable-1.7.8
        

        
            org.apache.spark
            spark-mllib_2.11
            ${spark.version}
            ${scope.type}
        

        
            org.apache.hive
            hive-jdbc
            1.2.1
            ${scope.type}
            
                
                    javax.mail
                    mail
                
                
                    org.eclipse.jetty.aggregate
                    *
                
            
        

        
            ru.yandex.clickhouse
            clickhouse-jdbc
            0.2.4
        

    

    

        
            alimaven
            http://maven.aliyun.com/nexus/content/groups/public/
            
                never
            
            
                never
            
        
    

    
        src/main/scala
        src/test/scala
        
            
                org.apache.maven.plugins
                maven-assembly-plugin
                ${maven-assembly-plugin.version}
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
            
                net.alchim31.maven
                scala-maven-plugin
                ${scala-maven-plugin.version}
                
                    
                        
                            compile
                            testCompile
                        
                        
                            
                                -dependencyfile
                                ${project.build.directory}/.scala_dependencies
                            
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-archetype-plugin
                2.2
            
        
    

测试

##1. 将core-site.xmlyarn-site.xmlhive-site.xml拷贝到工程resources目录下
##2. clean and package
##3. hive metastore服务必须开
##4. yarn/hdfs必须要开
##5. clickhouse/chproxy也要打开
##6. 编写提交jar包的spark脚本
${SPARK_HOME}/bin/spark-submit 
    --jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar 
    --conf spark.sql.hive.convertmetastoreParquet=false 
    --conf spark.executor.heartbeatInterval=120s 
    --conf spark.network.timeout=600s 
    --conf spark.sql.catalogImplementation=hive 
    --conf spark.yarn.submit.waitAppCompletion=false 
    --name log2hudi 
    --conf spark.task.cpus=1 
    --conf spark.executor.cores=4 
    --conf spark.sql.shuffle.partitions=50 
    --master yarn 
    --deploy-mode cluster 
    --driver-memory 1G 
    --executor-memory 3G 
    --num-executors 1 
    --class com.qf.bigdata.profile.LabelGenerator 
    /data/jar/user-profile.jar 
    -e prod -u jdbc:clickhouse://10.206.0.4:8321 -n fw-insert -p fw-001 -x root -c 1

##7. 通过clickhouse-client去测试
clickhouse-client --host 10.206.0.4 --port 9999 --password qwert

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700348.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存