flink批量 async io 写入hbase

flink批量 async io 写入hbase,第1张

flink批量 async io 写入hbase

flink 采用批量 async io 方式写入hbase

    一条一条数据写入hbase太慢了,故采用批量的方式,每2000条数据一个批量写入hbase,提高写入性能

    设置一个三秒的翻滚窗口, 将数据聚合在一起, 然后批量的方式, Async IO 异步写入hbase

val RunDataDS: DataStream[FdcData[RunData]] = getDatas()

    
    class RunDataProcessWindowFunction extends ProcessWindowFunction[RunData, List[RunData], String, TimeWindow] {

      def process(key: String, context: Context, input: Iterable[RunData], out: Collector[List[RunData]]): Unit = {
        var res: ListBuffer[RunData] = new ListBuffer()
        for (in <- input) {
          res += in
        }
        out.collect(res.toList)
      }
    }

    // 设置翻滚窗口,聚合数据
    val alarmRuleResultStream: DataStream[List[RunData]] = RunDataDS
      .map(_.datas)
      .keyBy(_.toolName)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      .process(new RunDataProcessWindowFunction())

    // 异步IO 写入hbase
    AsyncDataStream.orderedWait(
      alarmRuleResultStream,
      new TestDataHbaseSink(ProjectConfig.Hbase_RUNDATA_TABLE),
      6000,
      TimeUnit.MILLISECONDS,
      100)
      .name("Hbase Sink")

自定义sink: TestDataHbaseSink 继承 RichAsyncFunction

val table: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(tableName))
批量写入hbase

import com.hzw.fdc.scalabean.{RunData, RunEventData}
import com.hzw.fdc.util.{ExceptionInfo, ProjectConfig}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
import org.apache.hadoop.hbase.client.{BufferedMutator, Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HbaseConfiguration, TableName}
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer


class TestDataHbaseSink(tableName: String) extends RichAsyncFunction[List[RunData], String] {
  var connection: Connection = _
  private val logger: Logger = LoggerFactory.getLogger(classOf[WindowEndRunDataHbaseSink])

  override def open(parameters: Configuration): Unit = {
    // 获取全局配置变量
    val parameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
    ProjectConfig.getConfig(parameters)


    //创建hbase连接
    val conf = HbaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", ProjectConfig.Hbase_ZOOKEEPER_QUORUM)
    conf.set("hbase.zookeeper.property.clientPort", ProjectConfig.Hbase_ZOOKEEPER_PROPERTY_CLIENTPORT)
    connection = ConnectionFactory.createConnection(conf)
  }

  
  override def asyncInvoke(runEventDataList: List[RunData], resultFuture: ResultFuture[String]): Unit = {

    val table: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(tableName))

    val puts: ListBuffer[Put] = new ListBuffer()
    var count = 0

    try {

      for (runStartEvent <- runEventDataList) {
        try {

          val runid = s"${runStartEvent.toolName}--${runStartEvent.chamberName}--${runStartEvent.runStartTime}"

          val key = s"${runStartEvent.toolName}_${runStartEvent.chamberName}".hashCode % 10
          val put = new Put(Bytes.toBytes(s"${key}_${runStartEvent.toolName}_${runStartEvent.chamberName}_${runStartEvent.runStartTime}"))
          put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RUN_ID"), Bytes.toBytes(runid))
          put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("TOOL_NAME"), Bytes.toBytes(runStartEvent.toolName))
          put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("CHAMBER_NAME"), Bytes.toBytes(runStartEvent.chamberName))
          put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RUN_START_TIME"), Bytes.toBytes(runStartEvent.runStartTime))

          puts.append(put)
          count = count + 1

          if (count % 2000 == 0) {

            table.mutate(puts.asJava)
            puts.clear()
            count = 0
          }

        }catch {
          case ex: Exception => logger.warn(s" Exception:${ExceptionInfo.getExceptionInfo(ex)} data: $runStartEvent")
        }
      }
      table.mutate(puts.asJava)
    }catch {
      case ex: Exception => logger.warn(s" Exception:${ExceptionInfo.getExceptionInfo(ex)}")
    }finally {
      table.close()
    }

  }


  
  override def close(): Unit = {
    connection.close()
    super.close()

  }


  def hasLength(str: String): String = {
    if (str != null) {
      str
    } else {
      ""
    }
  }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5638596.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存