一、通过Flink向ES中大批量跑入数据
public static void main(String[] args) throws Exception { // 构建Flink环境对象 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Source : 数据的输入 DataStreamSourcesource = env.socketTextStream("localhost", 9999); // 使用ESBuilder构建输出 List hosts = new ArrayList<>(); hosts.add(new HttpHost("127.0.0.1", 9200, "http")); ElasticsearchSink.Builder esBuilder = new ElasticsearchSink.Builder<>(hosts, new ElasticsearchSinkFunction () { @Override public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { Map jsonMap = new HashMap<>(); jsonMap.put("data", s); IndexRequest indexRequest = Requests.indexRequest(); indexRequest.index("flink-index"); indexRequest.id("9001"); indexRequest.source(jsonMap); requestIndexer.add(indexRequest); } }); // Sink : 数据的输出 esBuilder.setBulkFlushMaxActions(1); source.addSink(esBuilder.build()); // 执行 *** 作 env.execute("flink-es"); }
二、通过Sparkstreaming向ES中跑入大批量数据
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest") val ssc = new StreamingContext(sparkConf, Seconds(3)) val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) ds.foreachRDD( rdd => { rdd.foreach( data => { val client = new RestHighLevelClient( RestClient.builder(new HttpHost("localhost",9200, "http")) ) val ss = data.split(" ") val request = new IndexRequest() request.index("product").id(ss(0)) val json = s""" | { "data" : "${ss(1)}" } |""".stripMargin request.source(json, XContentType.JSON) val response: IndexResponse = client.index(request, RequestOptions.DEFAULT) println(response.getResult) client.close() } ) } ) ssc.start() ssc.awaitTermination() }
三、通过Kafka向ES中跑入数据
public static void main(String[] args) throws Exception { BulkRequest bulkRequest = new BulkRequest(); //初始化 ack List acks = new linkedList<>(); Messagemessage = messages.poll(); DataEsSinker.reduceQueueSize(message.getPayload()); Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); Map source = jackSon.readValue(message.getPayload(), Map.class); String id = source.get("_id") == null ? "" : source.get("_id").toString(); if (StringUtils.isNotBlank(id) ) { bulkRequest.add(new UpdateRequest(this.indexName, "doc", URLEncoder.encode(id, "utf-8")).doc(source).upsert(source)); acks.add(acknowledgment); } if (bulkRequest.numberOfActions() > 0) { if (bulkResponse.hasFailures()) { for (Acknowledgment acknowledgment : acks) { if (acknowledgment != null) { acknowledgment.acknowledge(); } } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)