Flink 输出至 Elasticsearch

Flink 输出至 Elasticsearch,第1张

【1】引入

pom.xml

依赖

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.12</artifactId> <version>1.10.0</version> </dependency>

【2】

ES6 Scala

代码,自动导入的

scala

包需要修改为

scala._

否则会出现错误。

package com.zzx.flinkimport java.utilimport org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests object EsSinkTest { def main(args: Array[String]): Unit = { // 创建一个流处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //从文件中读取数据并转换为 类 val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt") //转换 val dataStream: DataStream[SensorReading] = inputStreamFromFile .map( data => { var dataArray = data.split(",") SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble) }) //定义一个 HttpHosts val httpHost = new util.ArrayList[HttpHost]() //默认 9200 我的修改为了 9201 httpHost.add(new HttpHost("192.168.1.12",9200,"http")) httpHost.add(new HttpHost("127.0.0.1",9200,"http")) //定义一个 ElasticSearchFuntion *** 作 es的function val esSinkFunc = new ElasticsearchSinkFunction[SensorReading] { //element 每一条数据 通过 index 发送 override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = { //包装写入 es 的数据 val dataSource = new util.HashMap[String,String]() dataSource.put("sensor_id",element.id) dataSource.put("temp",element.temperature.toString) dataSource.put("ts",element.timestamp.toString) //index val indexRequest = Requests.indexRequest() .index("sensor_temp") .`type`("readingdata") .source(dataSource) index.add(indexRequest) println("saved successfully " + element.toString) } } //输出值 es dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build()) env.execute("es") } }

【3】

ES6

输出展示

Flink 输出至 Elasticsearch,​ [点击并拖拽以移动] ​​,第2张

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/tougao/13518479.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2024-01-26
下一篇 2024-01-28

发表评论

登录后才能评论

评论列表(0条)

保存