spark->es快速导入数据

spark->es快速导入数据,第1张

elasticsearch-spark 提供了saveToEs api以支持快速导入数据。但es集群线程池有限,在大量写入数据的同时,对cpu的压力非常大,影响线上es的查询服务。如果能参考hbase 的bulkload方法,对es也采用“bulkload”模式,写入性能会有巨大提升。核心思想是通过spark作业生成es的lucene文件,并通过网络传输,写入es的数据文件。

本方案参考滴滴的fastIndex: 滴滴FastIndex

采用spark改写,部分特性适应了公司的原始流程,会有不一样的地方。如您采用的是spark saveToEs需要通过该方法进行改写,可参考。

git地址为: https://github.com/Dengyu123/fast-es-rdd

hadoop和mongodb的连接器

<dependency>

<groupId>org.mongodb.mongo-hadoop</groupId>

<artifactId>mongo-hadoop-core</artifactId>

<version>1.4.2</version>

</dependency>

java连接mongodb连接器

<dependency>

<groupId>org.mongodb</groupId>

<artifactId>mongo-java-driver</artifactId>

<version>2.13.0</version>

</dependency>

2.使用示例

import com.mongodb.hadoop.MongoOutputFormat

import org.apache.hadoop.conf.Configuration

import org.apache.spark.api.java.JavaPairRDD

import org.apache.spark.api.java.JavaRDD

import org.apache.spark.api.java.JavaSparkContext

import org.apache.spark.api.java.function.Function

import org.bson.BSONObject

import scala.Tuple2

import java.util.Date

import java.util.List

/**

* Created by Administrator on 2015/12/8.

*/

public class ConnectMongo {

public static void main(String args[]){

JavaSparkContext sc =new JavaSparkContext("local","test")

Configuration config =new Configuration()

//解释 主机:端口号/数据库名.Collection名

config.set("mongo.input.uri","mongodb://127.0.0.1:27017/lang.sanlu")

config.set("mongo.output.uri", "mongodb://127.0.0.1:27017/lang.output")

//读取

JavaPairRDD<Object, BSONObject>mongoRDD = sc.newAPIHadoopRDD(config, com.mongodb.hadoop.MongoInputFormat.class, Object.class, BSONObject.class)

//BasonObject->text

JavaRDD<text>result = mongoRDD.map(

new Function<Tuple2<Object, BSONObject>, text>() {

public text call(Tuple2<Object, BSONObject>v1) throws Exception {

String title = (String) v1._2().get("title")

Date date =(Date) v1._2().get("date")

List<String>paragraph = (List<String>) v1._2().get("paragraph")

return new text(title,date,paragraph)

}

}

)

//copy lang.sanlu to lang.output

mongoRDD.saveAsNewAPIHadoopFile("file:///copy",Object.class, Object.class, MongoOutputFormat.class, config)

}

}


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/sjk/6699717.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-27
下一篇 2023-03-27

发表评论

登录后才能评论

评论列表(0条)

保存