POM
org.apache.flink flink-streaming-scala_${scala.version}${flink.version} org.apache.flink flink-scala_${scala.version}${flink.version} org.apache.flink flink-clients_${scala.version}${flink.version} org.apache.flink flink-connector-kafka_2.12${flink.version} org.apache.flink flink-table-planner_2.12${flink.version} org.apache.flink flink-table-planner-blink_2.12${flink.version} org.apache.flink flink-table-api-scala-bridge_2.12${flink.version} org.apache.kafka kafka_2.100.8.2.1 org.scala-lang scala-library2.12.8 org.scala-lang scala-reflect2.12.8 org.apache.hadoop hadoop-common${hadoop.version} org.apache.hadoop hadoop-client${hadoop.version} org.apache.hadoop hadoop-hdfs${hadoop.version} com.alibaba fastjson1.2.19 org.apache.hbase hbase-client${hbase.version} org.apache.hbase hbase-common${hbase.version} org.apache.flink flink-hbase_2.121.7.2
代码
package cl.flink import com.alibaba.fastjson._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.hadoop.hbase.{HbaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import java.time.LocalDate import java.util.Properties import scala.collection.mutable object Kafka2Hbase{ val propertiesC = new Properties() def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 数据源为kafka propertiesC.load(this.getClass.getClassLoader.getResourceAsStream("kafka_test.properties")) val datas: DataStream[String] = environment.addSource(new FlinkKafkaConsumer[String]("idnum-topic-1", new SimpleStringSchema, propertiesC)) datas.print() datas.addSink(new WriteHbaseRich) environment.execute() } } class WriteHbaseRich extends RichSinkFunction[String]{ var conn: Connection = null val scan: Scan = null var mutator: BufferedMutator = null var count = 0 var flushtime=System.currentTimeMillis() val propertiesHbase = new Properties() propertiesHbase.load(this.getClass.getClassLoader.getResourceAsStream("hbase.properties")) val WI=Array(7,9,10,5,8,4,2,1,6,3,7,9,10,5,8,4,2) val WI_check = Array('1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2') override def open(parameters: Configuration): Unit = { val config:org.apache.hadoop.conf.Configuration = HbaseConfiguration.create config.set(HConstants.ZOOKEEPER_QUORUM, propertiesHbase.getProperty("test.cluster")) config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") config.setInt(HConstants.Hbase_CLIENT_OPERATION_TIMEOUT, 30000) config.setInt(HConstants.Hbase_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000) conn = ConnectionFactory.createConnection(config) val tableName: TableName = TableName.valueOf("cdp_user_base_info") val params: BufferedMutatorParams = new BufferedMutatorParams(tableName) //设置缓存1m,当达到1m时数据会自动刷到hbase params.writeBufferSize(1024 * 1024) //设置缓存的大小 mutator = conn.getBufferedMutator(params) count = 0 } override def invoke(value: String, context: SinkFunction.Context): Unit = { if(value.contains("idnum")){ val datas: JSonObject = analysis_idcard(JSON.parseObject(value)) println(datas) val put: Put = new Put(Bytes.toBytes(datas.getString("idnum"))) import collection.JavaConverters._ val keys: mutable.Set[String] = datas.keySet().asScala val strings: Array[String] = Array("idnum", "phone", "uid", "name","birthday","address_code","update_time","sex","address","address") for(key <- keys if strings contains key ){ put.addColumn("info".getBytes(),key.getBytes(),datas.getString(key).getBytes()) } mutator.mutate(put) //每满2000条刷新一下数据 if (count >= 100 || System.currentTimeMillis()-flushtime > 20000){ mutator.flush() count = 0 flushtime=System.currentTimeMillis() println("flush") } count = count + 1 } } override def close(): Unit = { if (conn != null) conn.close() } def check_idcard(idcards:String): Boolean ={ val b: Array[Char] = idcards.toCharArray var flag=false if(b.size==18){ var x=0 for(i <- 0 to 16){ x+=(WI(i))*(b(i)-'0') } flag=WI_check(x%11).equals(b(17)) }else{ println(s"错误的身份证号:'${idcards}'") } flag } def analysis_idcard(datas:JSONObject):JSonObject= { val idnum: String = datas.getString("idnum") if (idnum != null && idnum != "") { val idcard: String = idnum.trim try { if (idcard != null && !idcard.equals("") && check_idcard(idnum)) { if (idcard.substring(16, 17).toInt % 2 == 0) { datas.put("sex", "女") } else { datas.put("sex", "男") } val bd: String = idcard.substring(6, 14) val birthday = s"${bd.substring(0, 4)}-${bd.substring(4, 6)}-${bd.substring(6, 8)}" datas.put("birthday", birthday) datas.put("address_code", idcard.substring(0, 6)) datas.put("update_time", LocalDate.now()) } } catch { case exception: Exception => println(s"错误的身份z号:'${idnum}'") } } datas } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)