flink 采用批量 async io 方式写入hbase
一条一条数据写入hbase太慢了,故采用批量的方式,每2000条数据一个批量写入hbase,提高写入性能
设置一个三秒的翻滚窗口, 将数据聚合在一起, 然后批量的方式, Async IO 异步写入hbase
val RunDataDS: DataStream[FdcData[RunData]] = getDatas() class RunDataProcessWindowFunction extends ProcessWindowFunction[RunData, List[RunData], String, TimeWindow] { def process(key: String, context: Context, input: Iterable[RunData], out: Collector[List[RunData]]): Unit = { var res: ListBuffer[RunData] = new ListBuffer() for (in <- input) { res += in } out.collect(res.toList) } } // 设置翻滚窗口,聚合数据 val alarmRuleResultStream: DataStream[List[RunData]] = RunDataDS .map(_.datas) .keyBy(_.toolName) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .process(new RunDataProcessWindowFunction()) // 异步IO 写入hbase AsyncDataStream.orderedWait( alarmRuleResultStream, new TestDataHbaseSink(ProjectConfig.Hbase_RUNDATA_TABLE), 6000, TimeUnit.MILLISECONDS, 100) .name("Hbase Sink")
自定义sink: TestDataHbaseSink 继承 RichAsyncFunction
val table: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(tableName))
批量写入hbase
import com.hzw.fdc.scalabean.{RunData, RunEventData} import com.hzw.fdc.util.{ExceptionInfo, ProjectConfig} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction} import org.apache.hadoop.hbase.client.{BufferedMutator, Connection, ConnectionFactory, Put} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HbaseConfiguration, TableName} import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer class TestDataHbaseSink(tableName: String) extends RichAsyncFunction[List[RunData], String] { var connection: Connection = _ private val logger: Logger = LoggerFactory.getLogger(classOf[WindowEndRunDataHbaseSink]) override def open(parameters: Configuration): Unit = { // 获取全局配置变量 val parameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool] ProjectConfig.getConfig(parameters) //创建hbase连接 val conf = HbaseConfiguration.create() conf.set("hbase.zookeeper.quorum", ProjectConfig.Hbase_ZOOKEEPER_QUORUM) conf.set("hbase.zookeeper.property.clientPort", ProjectConfig.Hbase_ZOOKEEPER_PROPERTY_CLIENTPORT) connection = ConnectionFactory.createConnection(conf) } override def asyncInvoke(runEventDataList: List[RunData], resultFuture: ResultFuture[String]): Unit = { val table: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(tableName)) val puts: ListBuffer[Put] = new ListBuffer() var count = 0 try { for (runStartEvent <- runEventDataList) { try { val runid = s"${runStartEvent.toolName}--${runStartEvent.chamberName}--${runStartEvent.runStartTime}" val key = s"${runStartEvent.toolName}_${runStartEvent.chamberName}".hashCode % 10 val put = new Put(Bytes.toBytes(s"${key}_${runStartEvent.toolName}_${runStartEvent.chamberName}_${runStartEvent.runStartTime}")) put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RUN_ID"), Bytes.toBytes(runid)) put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("TOOL_NAME"), Bytes.toBytes(runStartEvent.toolName)) put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("CHAMBER_NAME"), Bytes.toBytes(runStartEvent.chamberName)) put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RUN_START_TIME"), Bytes.toBytes(runStartEvent.runStartTime)) puts.append(put) count = count + 1 if (count % 2000 == 0) { table.mutate(puts.asJava) puts.clear() count = 0 } }catch { case ex: Exception => logger.warn(s" Exception:${ExceptionInfo.getExceptionInfo(ex)} data: $runStartEvent") } } table.mutate(puts.asJava) }catch { case ex: Exception => logger.warn(s" Exception:${ExceptionInfo.getExceptionInfo(ex)}") }finally { table.close() } } override def close(): Unit = { connection.close() super.close() } def hasLength(str: String): String = { if (str != null) { str } else { "" } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)