flink写入hbase

flink写入hbase,第1张

flink写入hbase

POM

    
        
            org.apache.flink
            flink-streaming-scala_${scala.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-scala_${scala.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_${scala.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-kafka_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner-blink_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-scala-bridge_2.12
            ${flink.version}
        

        
        
            org.apache.kafka
            kafka_2.10
            0.8.2.1
        

        
        
            org.scala-lang
            scala-library
            2.12.8
        

        
        
            org.scala-lang
            scala-reflect
            2.12.8
        

        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        
        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
        

        
            com.alibaba
            fastjson
            1.2.19
        

        
            org.apache.hbase
            hbase-client
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-common
            ${hbase.version}
        
        
            org.apache.flink
            flink-hbase_2.12
            1.7.2
        
    

代码

package cl.flink

import com.alibaba.fastjson._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.{HbaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema

import java.time.LocalDate
import java.util.Properties
import scala.collection.mutable

object Kafka2Hbase{

  val propertiesC = new Properties()
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //  数据源为kafka
    propertiesC.load(this.getClass.getClassLoader.getResourceAsStream("kafka_test.properties"))

    val datas: DataStream[String] = environment.addSource(new FlinkKafkaConsumer[String]("idnum-topic-1", new SimpleStringSchema, propertiesC))


    datas.print()
    datas.addSink(new WriteHbaseRich)
    environment.execute()
  }
}


class WriteHbaseRich extends RichSinkFunction[String]{
  var conn: Connection = null
  val scan: Scan = null
  var mutator: BufferedMutator = null
  var count = 0
  var flushtime=System.currentTimeMillis()

  val propertiesHbase = new Properties()
  propertiesHbase.load(this.getClass.getClassLoader.getResourceAsStream("hbase.properties"))

  val WI=Array(7,9,10,5,8,4,2,1,6,3,7,9,10,5,8,4,2)
  val WI_check = Array('1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2')

  
  override def open(parameters: Configuration): Unit = {
    val config:org.apache.hadoop.conf.Configuration = HbaseConfiguration.create
    config.set(HConstants.ZOOKEEPER_QUORUM, propertiesHbase.getProperty("test.cluster"))
    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
    config.setInt(HConstants.Hbase_CLIENT_OPERATION_TIMEOUT, 30000)
    config.setInt(HConstants.Hbase_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
    conn = ConnectionFactory.createConnection(config)

    val tableName: TableName = TableName.valueOf("cdp_user_base_info")
    val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
    //设置缓存1m,当达到1m时数据会自动刷到hbase
    params.writeBufferSize(1024 * 1024) //设置缓存的大小
    mutator = conn.getBufferedMutator(params)
    count = 0
  }

  
  override def invoke(value: String, context: SinkFunction.Context): Unit = {
    if(value.contains("idnum")){
      val datas: JSonObject = analysis_idcard(JSON.parseObject(value))

      println(datas)

      val put: Put = new Put(Bytes.toBytes(datas.getString("idnum")))
      import collection.JavaConverters._
      val keys: mutable.Set[String] = datas.keySet().asScala
      val strings: Array[String] = Array("idnum", "phone", "uid", "name","birthday","address_code","update_time","sex","address","address")
      for(key <- keys if strings contains key ){
        put.addColumn("info".getBytes(),key.getBytes(),datas.getString(key).getBytes())
      }
      mutator.mutate(put)
      //每满2000条刷新一下数据
      if (count >= 100 || System.currentTimeMillis()-flushtime > 20000){
        mutator.flush()
        count = 0
        flushtime=System.currentTimeMillis()
        println("flush")
      }
      count = count + 1
    }
  }

  
  override def close(): Unit = {
    if (conn != null) conn.close()
  }

  def check_idcard(idcards:String): Boolean ={
    val b: Array[Char] = idcards.toCharArray
    var flag=false
    if(b.size==18){
      var x=0
      for(i <- 0 to 16){
        x+=(WI(i))*(b(i)-'0')
      }
      flag=WI_check(x%11).equals(b(17))
    }else{
      println(s"错误的身份证号:'${idcards}'")
    }
    flag
  }

  def analysis_idcard(datas:JSONObject):JSonObject= {

      val idnum: String = datas.getString("idnum")
      if (idnum != null && idnum != "") {
        val idcard: String = idnum.trim
        try {
          if (idcard != null && !idcard.equals("") && check_idcard(idnum)) {
            if (idcard.substring(16, 17).toInt % 2 == 0) {
              datas.put("sex", "女")
            } else {
              datas.put("sex", "男")
            }
            val bd: String = idcard.substring(6, 14)
            val birthday = s"${bd.substring(0, 4)}-${bd.substring(4, 6)}-${bd.substring(6, 8)}"
            datas.put("birthday", birthday)
            datas.put("address_code", idcard.substring(0, 6))
            datas.put("update_time", LocalDate.now())
          }
        } catch {
          case exception: Exception => println(s"错误身份z号:'${idnum}'")
        }
      }
      datas
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701884.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存