大批量向Elasticsearch中跑入数据的方式

大批量向Elasticsearch中跑入数据的方式,第1张

大批量向Elasticsearch中跑入数据的方式

一、通过Flink向ES中大批量跑入数据

 public static void main(String[] args) throws Exception {

        // 构建Flink环境对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Source : 数据的输入
        DataStreamSource source = env.socketTextStream("localhost", 9999);

        // 使用ESBuilder构建输出
        List hosts = new ArrayList<>();
        hosts.add(new HttpHost("127.0.0.1", 9200, "http"));
        ElasticsearchSink.Builder esBuilder = new ElasticsearchSink.Builder<>(hosts,
                 new ElasticsearchSinkFunction() {

                     @Override
                     public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                         Map jsonMap = new HashMap<>();
                         jsonMap.put("data", s);

                         IndexRequest indexRequest = Requests.indexRequest();
                         indexRequest.index("flink-index");
                         indexRequest.id("9001");
                         indexRequest.source(jsonMap);

                         requestIndexer.add(indexRequest);
                     }
                 });

        // Sink : 数据的输出
        esBuilder.setBulkFlushMaxActions(1);
        source.addSink(esBuilder.build());

        // 执行 *** 作
        env.execute("flink-es");

    }

二、通过Sparkstreaming向ES中跑入大批量数据

 def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
        ds.foreachRDD(
            rdd => {
                rdd.foreach(
                    data => {
                        val client = new RestHighLevelClient(
                            RestClient.builder(new HttpHost("localhost",9200, "http"))
                        )

                        val ss = data.split(" ")

                        val request = new IndexRequest()
                        request.index("product").id(ss(0))
                        val json =
                            s"""
                              | {  "data" : "${ss(1)}" }
                              |""".stripMargin
                        request.source(json, XContentType.JSON)

                        val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)
                        println(response.getResult)
                        client.close()
                    }
                )
            }
        )

        ssc.start()
        ssc.awaitTermination()
    }

三、通过Kafka向ES中跑入数据

 public static void main(String[] args) throws Exception {
                   BulkRequest bulkRequest = new BulkRequest();
                    //初始化 ack
                    List acks = new linkedList<>();
                    Message message = messages.poll();
                    DataEsSinker.reduceQueueSize(message.getPayload());

                    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

                    Map source = jackSon.readValue(message.getPayload(), Map.class);
                    String id = source.get("_id") == null ? "" : source.get("_id").toString();
                  
                    if (StringUtils.isNotBlank(id) ) {
                            bulkRequest.add(new UpdateRequest(this.indexName, "doc", URLEncoder.encode(id, "utf-8")).doc(source).upsert(source));
                                acks.add(acknowledgment);
                     }
                   if (bulkRequest.numberOfActions() > 0) {  
                     if (bulkResponse.hasFailures()) {
                                    for (Acknowledgment acknowledgment : acks) {
                                        if (acknowledgment != null) {
                                            acknowledgment.acknowledge();
                                        }
                                    }  
                           }         
                     }               
 }                           

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5720227.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存