整体结构
Config
package com.fuwei.bigdata.profile.conf import org.slf4j.LoggerFactory import scopt.OptionParser case class Config( env:String = "", username:String = "", password:String = "", url:String = "", cluster:String = "", startDate:String = "", endDate:String = "", proxyUser:String = "", topK:Int = 25 ) object Config{ private val logger = LoggerFactory.getLogger("Config") def parseConfig(obj:Object,args:Array[String]):Config = { //1、通过我们的类名获取到程序名 val programName: String = obj.getClass.getSimpleName.replaceAll("\$", "") //2、获取到一个解析器,解析器解析参数 val parser = new OptionParser[Config]("spark sql "+programName) { //2.1添加使用说明 head(programName,"v1.0") //就相当于抬头 //2.2给env属性赋值 //这种效果就是-v或者--v ,后面的text()是说明的内容 opt[String]('e',"env").required().action((x,config) => config.copy(env = x)).text("dev or prod") opt[String]('n',name = "proxyUser").required().action((x,config) => config.copy(proxyUser = x)).text("proxy username") programName match { case "LabelGenerator" => { logger.info("LabelGenerator") opt[String]('n', "username").required().action((x, config) => config.copy(username = x)).text("username") opt[String]('p', "password").required().action((x, config) => config.copy(password = x)).text("password") opt[String]('u', "url").required().action((x, config) => config.copy(url = x)).text("url") opt[String]('c', "cluster").required().action((x, config) => config.copy(cluster = x)).text("cluster") } case _ => } } parser.parse(args,Config()) match { //这个主要作用是解析参数,看参数中有没有值 case Some(conf) => conf case None => { logger.error("can not parse args") System.exit(-1) null } } } }
LabelGenerator
package com.fuwei.bigdata.profile import com.qf.bigdata.profile.conf.Config import com.qf.bigdata.profile.utils.{SparkUtils, TableUtils} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession} import org.slf4j.LoggerFactory object LabelGenerator { private val logger = LoggerFactory.getLogger(LabelGenerator.getClass.getSimpleName) def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) //1、解析参数 val params: Config = Config.parseConfig(LabelGenerator, args) //2、获取SparkSession val spark: SparkSession = SparkUtils.getSparkSession(params.env, LabelGenerator.getClass.getSimpleName) //val spark: SparkSession = SparkUtils.getSparkSession("dev", "test") import spark.implicits._ //3、读取归属地数据 val df: Dataframe = spark.read.option("sep", "t").csv("src/main/resources/phoneinfo.txt").toDF("prefix", "phone", "province", "city", "isp", "post_code", "city_code", "area_code", "types") df.createOrReplaceTempView("phone_info") //构建一个虚表 //4、baseFeatrueSql val userSql = """ |select |t1.distinct_id as uid, |t1.gender, |t1.age, |case when length(t1.mobile) >= 11 then substring(t1.mobile,-11,length(t1.mobile)) else '' end as mobile, |case when size(split(t1.email,'@')) = 2 then split(t1.email,'@')[1] else '' end email_suffix, |t2.model |from ods_news.user_ro as t1 left join dwb_news.user_base_info as t2 |on t1.distinct_id = t2.uid |""".stripMargin val userDF: Dataframe = spark.sql(userSql) userDF.createOrReplaceTempView("user_info") //4.2baseFeatureSql val baseFeatureSql = """ |select |t1.uid, |t1.gender, |t1.age, |t1.email_suffix, |t1.model, |concat(ifnull(t2.province,''),ifnull(t2.city,'')) as region |from user_info as t1 left join phone_info as t2 |on |t2.phone = substring(t1.mobile,0,7) |""".stripMargin //4.3、建表 spark.sql( """ |create table if not exists dws_news.user_profile_base( |uid string, |gender string, |age string, |email_suffix string, |model string, |region string |) |stored as parquet |""".stripMargin) //4.4 查询生成df val baseFeaturedDF: Dataframe = spark.sql(baseFeatureSql) baseFeaturedDF.cache() //对查询的数据进行持久化内存中,用完之后要关闭 //把查询的数据导入到数据表中(查询生成df数据到HDFS) baseFeaturedDF.write.mode(SaveMode.Overwrite).saveAsTable("dws_news.user_profile_base") //5、把数据保存到clickhouse:1.meta:(filename,pl),2.占位符 val meta = TableUtils.getClickHouseUserProfilebaseTable(baseFeaturedDF,params) //6、插入ClickHouse数据 //6.1插入的sql val insertCHSql = s""" |insert into ${TableUtils.USER_PROFILE_CLICKHOUSE_DATAbase}.${TableUtils.USER_PROFILE_CLICKHOUSE_TABLE}(${meta._1}) values(${meta._2}) |""".stripMargin logger.warn(insertCHSql) //6.2按分区插入数据到clickhouse的表 baseFeaturedDF.foreachPartition(partition => { TableUtils.insertbaseFeaturedTable(partition,insertCHSql,params) }) baseFeaturedDF.unpersist()//关闭持久化 //7、释放资源 spark.stop() logger.info("job has success") } }
ClickHouseUtils
package com.fuwei.bigdata.profile.utils import ru.yandex.clickhouse.ClickHouseDataSource import ru.yandex.clickhouse.settings.ClickHouseProperties object ClickHouseUtils { def getDataSource(username: String, password: String, url: String): ClickHouseDataSource = { Class.forName("ru.yandex.clickhouse.ClickHouseDriver") val properties = new ClickHouseProperties() properties.setUser(username) properties.setPassword(password) val dataSource = new ClickHouseDataSource(url, properties) dataSource } def df2TypeName2CH(dfCol: String): Unit ={ dfCol.split(",").map(line => { val fields: Array[String] = line.split(" ") val fName: String = fields(0) val fType: String = dfType2chType(fields(1)) //开始类型的转换 fName +" "+ fType //最后结果变成为age String, gender String }).mkString(",") } def dfType2chType(fieldType: String):String = { fieldType.toLowerCase() match { case "string" => "String" case "integer" => "Int32" case "long" => "Int64" case "bigint" => "Int64" case "double" => "Float64" case "float" => "Float32" case "timestamp" => "Datetime" case _ => "String" } } }
SparkUtils(这个连接以后可以通用)
package com.fuwei.bigdata.profile.utils import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory object SparkUtils { private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName) def getSparkSession(env:String,appName:String):SparkSession = { val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.sql.hive.metastore.version", "1.2.1") .set("spark.sql.cbo.enabled", "true") .set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true") .set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER") env match { case "prod" => { conf.setAppName(appName+"_prod") SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() } case "dev" => { conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven") SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() } case _ => { logger.error("not match env") System.exit(-1) null } } } }
TableUtils
package com.fuwei.bigdata.profile.utils import com.qf.bigdata.profile.conf.Config import org.apache.spark.sql.types.{IntegerType, LongType, StringType} import org.apache.spark.sql.{Dataframe, Row} import org.slf4j.LoggerFactory import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource} import java.sql.PreparedStatement object TableUtils { def insertbaseFeaturedTable(partition: Iterator[Row], insertCHSql: String, params: Config): Unit = { //1、获取到Clickhouse的数据源 val dataSource: ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username, params.password, params.url) val connection: ClickHouseConnection = dataSource.getConnection val ps: PreparedStatement = connection.prepareStatement(insertCHSql) //插入数据 var batchCount = 0 val batchSize = 2000 var lastBatchTime = System.currentTimeMillis() //2、填充占位符对应的参数值 partition.foreach(row => { var index = 1//设置值的索引下标 row.schema.fields.foreach(field => { field.dataType match { case StringType => ps.setString(index,row.getAs[String](field.name)) case LongType => ps.setLong(index,row.getAs[Long](field.name)) case IntegerType => ps.setInt(index,row.getAs[Int](field.name)) case _ => logger.error(s"type is err,${field.dataType}") } index +=1 }) //3、添加到批 ps.addBatch() batchCount += 1 //4、控制批次大小 var currentTime = System.currentTimeMillis() if (batchCount >= batchSize || lastBatchTime < currentTime - 3000){ ps.executeBatch()//执行一批 logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(currentTime - lastBatchTime)/1000} s") batchCount = 0 lastBatchTime = currentTime } }) //5、控制如果没有满足以上条件的时候循环结束之后立刻执行ps中的数据 ps.executeBatch() logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(System.currentTimeMillis() - lastBatchTime)/1000} s") //6、释放资源 ps.close() connection.close() } private val logger = LoggerFactory.getLogger(TableUtils.getClass.getSimpleName) val USER_PROFILE_CLICKHOUSE_DATAbase = "app_news" //创建的数据库 val USER_PROFILE_CLICKHOUSE_TABLE = "user_profile_base" //创建的表 def getClickHouseUserProfilebaseTable(baseFeaturedDF: Dataframe, params: Config ):(String,String)= { //schema就是获取表的所有元数据(包括以上三个) //foldLeft是折叠函数 val (fileName,fieldType,pl) = baseFeaturedDF.schema.fields.foldLeft("","","")( (z,f) => { //我们要返回的数据类型是:(age,gender , age string, gender string, ?,?) if (z._1.nonEmpty && z._2.nonEmpty && z._3.nonEmpty){ //说明不是第一次拼接 (z._1 + "," + f.name, z._2+","+f.name+" "+f.dataType.simpleString, z._3 + ",?") }else{ (f.name,f.name+" "+ f.dataType.simpleString,"?") } } ) val chCol = ClickHouseUtils.df2TypeName2CH(fieldType) //5、获取到连接到ch的cluster val cluster:String = params.cluster //6、创建数据库 val createCHDatabaseSql = s""" |create database if not exisths ${USER_PROFILE_CLICKHOUSE_DATAbase} |""".stripMargin //7、创建表 val createCHTableSql = s""" |create table ${USER_PROFILE_CLICKHOUSE_DATAbase}.${USER_PROFILE_CLICKHOUSE_TABLE}(${chCol}) |ENGINE = MergeTree() |ORDER BY(uid) |""".stripMargin //8、删除表的SQL val dropCHTableSql = s""" |drop table if exists ${USER_PROFILE_CLICKHOUSE_DATAbase}.${USER_PROFILE_CLICKHOUSE_TABLE} |""".stripMargin //9、连接clickhouse val dataSource:ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username,params.password,params.url) val connection: ClickHouseConnection = dataSource.getConnection logger.warn(createCHDatabaseSql) var ps: PreparedStatement = connection.prepareStatement(createCHDatabaseSql)//建库 ps.execute() logger.warn(dropCHTableSql) ps = connection.prepareStatement(dropCHTableSql) //删表 ps.execute() logger.warn(createCHTableSql) ps = connection.prepareStatement(createCHTableSql)//建表 ps.execute() ps.close() connection.close() logger.info("success!!!!!!!!!") (fileName,pl) } }
xml
4.0.0 com.fuwei.bigdata user-profile1.0-SNAPSHOT 2.11.12 2.3.9 2.10.1 3.2.0 2.6 2.4.5 compile 1.2.3 com.alibaba fastjson${json.version} org.apache.spark spark-core_2.11${spark.version} ${scope.type} org.apache.spark spark-sql_2.11${spark.version} ${scope.type} org.apache.spark spark-hive_2.11${spark.version} ${scope.type} mysql mysql-connector-java5.1.47 log4j log4j1.2.17 ${scope.type} commons-codec commons-codec1.6 org.scala-lang scala-library${scala.version} ${scope.type} org.scala-lang scala-reflect${scala.version} ${scope.type} com.github.scopt scopt_2.114.0.0-RC2 org.apache.hudi hudi-spark-bundle_2.110.5.2-incubating ${scope.type} org.apache.spark spark-avro_2.11${spark.version} com.hankcs hanlpportable-1.7.8 org.apache.spark spark-mllib_2.11${spark.version} ${scope.type} org.apache.hive hive-jdbc1.2.1 ${scope.type} javax.mail mailorg.eclipse.jetty.aggregate *ru.yandex.clickhouse clickhouse-jdbc0.2.4 alimaven http://maven.aliyun.com/nexus/content/groups/public/ never never src/main/scala src/test/scala org.apache.maven.plugins maven-assembly-plugin${maven-assembly-plugin.version} jar-with-dependencies make-assembly package single net.alchim31.maven scala-maven-plugin${scala-maven-plugin.version} compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-archetype-plugin2.2
测试
##1. 将core-site.xmlyarn-site.xmlhive-site.xml拷贝到工程resources目录下 ##2. clean and package ##3. hive metastore服务必须开 ##4. yarn/hdfs必须要开 ##5. clickhouse/chproxy也要打开 ##6. 编写提交jar包的spark脚本 ${SPARK_HOME}/bin/spark-submit --jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar --conf spark.sql.hive.convertmetastoreParquet=false --conf spark.executor.heartbeatInterval=120s --conf spark.network.timeout=600s --conf spark.sql.catalogImplementation=hive --conf spark.yarn.submit.waitAppCompletion=false --name log2hudi --conf spark.task.cpus=1 --conf spark.executor.cores=4 --conf spark.sql.shuffle.partitions=50 --master yarn --deploy-mode cluster --driver-memory 1G --executor-memory 3G --num-executors 1 --class com.qf.bigdata.profile.LabelGenerator /data/jar/user-profile.jar -e prod -u jdbc:clickhouse://10.206.0.4:8321 -n fw-insert -p fw-001 -x root -c 1 ##7. 通过clickhouse-client去测试 clickhouse-client --host 10.206.0.4 --port 9999 --password qwert
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)