2、单条数据插入,使用 json 字符串作为数据源io.searchbox jest6.3.1 net.java.dev.jna jna4.5.2 org.codehaus.janino commons-compiler2.7.8
3、单条数据插入,使用样例类作为数据源package com.zyj.gmall.common import io.searchbox.client.JestClientFactory import io.searchbox.client.config.HttpClientConfig import io.searchbox.core.Index object ESUtil { // 端口号,服务器如果没有配置,默认是9200 val esUrl = "http://hadoop103:9200" def main(args: Array[String]): Unit = { // 向es写数据 //1. 先有es的客户端 //1.1 创建一个客户端工厂 val factory = new JestClientFactory val conf = new HttpClientConfig.Builder(esUrl) .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍 .connTimeout(10 * 1000) // 连接到es的超时时间 .readTimeout(10 * 1000) // 读取数据的最大超时时间 .multiThreaded(true) // 是否允许多线程 .build() factory.setHttpClientConfig(conf) //1.2 从工厂获取一个客户端 val client = factory.getObject //2. es需要的数据(json,样例类) val data = """ |{ | "name": "zs", | "age": 20 |} |""".stripMargin //3. 写入(单次,批次) val index = new Index.Builder(data) .index("user") .`type`("_doc") // .id("1") // 可选 如果没有设置 id自动生成 .build() client.execute(index) //4. 关闭客户端(其实是把客户端还给工厂) client.shutdownClient() // 虽然过时了,但比较稳定 } }
4、单条数据插入,将对数据库的连接进行封装package com.zyj.gmall.common import io.searchbox.client.JestClientFactory import io.searchbox.client.config.HttpClientConfig import io.searchbox.core.Index object ESUtil { // 端口号,服务器如果没有配置,默认是9200 val esUrl = "http://hadoop103:9200" def main(args: Array[String]): Unit = { // 向es写数据 //1. 先有es的客户端 //1.1 创建一个客户端工厂 val factory = new JestClientFactory val conf = new HttpClientConfig.Builder(esUrl) .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍 .connTimeout(10 * 1000) // 连接到es的超时时间 .readTimeout(10 * 1000) // 读取数据的最大超时时间 .multiThreaded(true) // 是否允许多线程 .build() factory.setHttpClientConfig(conf) //1.2 从工厂获取一个客户端 val client = factory.getObject //2. es需要的数据(json,样例类) val data = User(30, "ww") //3. 写入(单次,批次) val index = new Index.Builder(data) .index("user") .`type`("_doc") // .id("1") // 可选 如果没有设置 id自动生成 .build() client.execute(index) //4. 关闭客户端(其实是把客户端还给工厂) client.shutdownClient() // 虽然过时了,但比较稳定 } } case class User(age: Int, name: String)
5、批量插入,样例类作为数据源,并封装连接package com.zyj.gmall.common import io.searchbox.client.JestClientFactory import io.searchbox.client.config.HttpClientConfig import io.searchbox.core.Index object ESUtil { // 端口号,服务器如果没有配置,默认是9200 val esUrl = "http://hadoop103:9200" // 创建一个客户端工厂 val factory = new JestClientFactory val conf = new HttpClientConfig.Builder(esUrl) .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍 .connTimeout(10 * 1000) // 连接到es的超时时间 .readTimeout(10 * 1000) // 读取数据的最大超时时间 .multiThreaded(true) // 是否允许多线程 .build() factory.setHttpClientConfig(conf) def insertSingle(index: String, source: Object, id: String = null) = { // 从工厂获取一个客户端 val client = factory.getObject // 写入(单次,批次) val action = new Index.Builder(source) .index(index) .`type`("_doc") .id(id) // id 如果是null,相当于没有传 .build() client.execute(action) // 关闭客户端(其实是把客户端还给工厂) client.shutdownClient() } def main(args: Array[String]): Unit = { val data = User(30, "xx") insertSingle("user", data) } } case class User(age: Int, name: String)
package com.zyj.gmall.common import io.searchbox.client.JestClientFactory import io.searchbox.client.config.HttpClientConfig import io.searchbox.core.{Bulk, Index} object ESUtil { // 端口号,服务器如果没有配置,默认是9200 val esUrl = "http://hadoop103:9200" // 创建一个客户端工厂 val factory = new JestClientFactory val conf = new HttpClientConfig.Builder(esUrl) .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍 .connTimeout(10 * 1000) // 连接到es的超时时间 .readTimeout(10 * 1000) // 读取数据的最大超时时间 .multiThreaded(true) // 是否允许多线程 .build() factory.setHttpClientConfig(conf) def insertBulk(index: String, sources: Iterator[Any]) = { val client = factory.getObject val bulk = new Bulk.Builder() .defaultIndex(index) .defaultType("_doc") sources.foreach { case (id: String, data) => val action = new Index.Builder(data).id(id).build() bulk.addAction(action) case data => val action = new Index.Builder(data).build() bulk.addAction(action) } client.execute(bulk.build()) client.shutdownClient() } def main(args: Array[String]): Unit = { val list = User(1, "aa") :: User(2, "bb") :: User(3, "cc") :: Nil insertBulk("user", list.toIterator) val list2 = ("100", User(1, "a")) :: ("200", User(1, "b")) :: ("300", User(3, "c")) :: Nil insertBulk("user", list2.toIterator) } } case class User(age: Int, name: String)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)