部署版本: elasticsearch-6.5.3
第一步 : 每台服务器 都创建一个es 用户, elasticsearch不支持root 启动
useradd es
第二步: 在root 用户下 ,修改es 用户文件打开数
ulimit -Hn 查看硬限制
vi /etc/security/limits.conf
es soft nofile 65536 es hard nofile 65536
修改 vi /etc/security/limits.d/20-nproc.conf ~> 文件名以 -nproc.conf结尾
soft nproc 1024 修改为 soft nproc 2048
修改 vi /etc/sysctl.conf ~>在root用户下进行修改
添加下面配置: vm.max_map_count=655360 并执行命令: sysctl -p
第三步:上传tar 包
elasticsearch-6.5.3.tar.gz tar -zxvf elasticsearch-6.5.3.tar.gz mv elasticsearch-6.5.3 elasticsearch
第四步:修改配置文件 elasticsearch.yml
# ======================== Elasticsearch Configuration ========================= # # NOTE: Elasticsearch comes with reasonable defaults for most settings. # Before you set out to tweak and tune the configuration, make sure you # understand what are you trying to accomplish and the consequences. # # The primary way of configuring a node is via this file. This template lists # the most important settings you may want to configure for a production cluster. # # Please consult the documentation for further information on configuration options: # https://www.elastic.co/guide/en/elasticsearch/reference/index.html # # ---------------------------------- Cluster ----------------------------------- # # Use a descriptive name for your cluster: # cluster.name: bigdata # # ------------------------------------ Node ------------------------------------ # # Use a descriptive name for the node: # node.name: es-1 # # Add custom attributes to the node: # #node.attr.rack: r1 # # ----------------------------------- Paths ------------------------------------ # # Path to directory where to store the data (separate multiple locations by comma): # path.data: /opt/soft/elasticsearch/data # # Path to log files: # path.logs: /opt/soft/elasticsearch/logs # # ----------------------------------- Memory ----------------------------------- # # Lock the memory on startup: # #bootstrap.memory_lock: true # # Make sure that the heap size is set to about half the memory available # on the system and that the owner of the process is allowed to use this # limit. # # Elasticsearch performs poorly when the system is swapping the memory. # # ---------------------------------- Network ----------------------------------- # # Set the bind address to a specific IP (IPv4 or IPv6): # network.host: hdp0 node.master: true node.data: true # # Set a custom port for HTTP: # #http.port: 9200 # # For more information, consult the network module documentation. # # --------------------------------- Discovery ---------------------------------- # # Pass an initial list of hosts to perform discovery when new node is started: # The default list of hosts is ["", "[::1]"] # discovery.zen.ping.unicast.hosts: ["hdp0", "hdp1","hdp2"] # # Prevent the "split brain" by configuring the majority of nodes (total number of master-eligible nodes / 2 + 1): # discovery.zen.minimum_master_nodes: 2 bootstrap.memory_lock: false http.cors.enabled: true http.cors.allow-origin: "*" # For more information, consult the zen discovery module documentation. # # ---------------------------------- Gateway ----------------------------------- # # Block initial recovery after a full cluster restart until N nodes are started: # #gateway.recover_after_nodes: 3 # # For more information, consult the gateway module documentation. # # ---------------------------------- Various ----------------------------------- # # Require explicit names when deleting indices: # #action.destructive_requires_name: true
scp -r elasticsearch hdp1:$PWD scp -r elasticsearch hdp2:$PWD
三台 root 赋权 否则会报权限不够 es 无法启动
chmod 777 -R elasticsearch
hdp1服务器配置文件修改: node.name: es-2 network.host: hdp1 hdp2服务器配置文件修改: node.name: es-3 network.host: hdp2
./elasticsearch -d
web 测试
由于 上面的web 显示是一个JSON页面,实在不美观.那就使用 elasticsearch-head.运行elasticsearch-head会用到grunt,而grunt需要npm包管理器,所以nodejs是必须要安装的。选择一台服务器部署
tar -xvf node-v12.16.1-linux-x64.tar.xz ##解压 ln -s /opt/soft/node-v12.16.1-linux64/bin/node /usr/bin/node ## 创建软连接 ln -s /opt/soft/node-v12.16.1-linux64/bin/npm /usr/bin/npm ## 创建软连接 ##查看版本信息 node -v npm -v
##这里一定配置下淘宝镜像,这样很快 npm config set registry http://registry.cnpmjs.org ##注册官网镜像(使用淘宝较快) npm config set registry https://registry.npm.taobao.org ##注册淘宝镜像 npm install -g grunt-cli
##下载 https://bitbucket.org/ariya/phantomjs/downloads/phantomjs-2.1.1-linux-x86_64.tar.bz2 ##安装解压的工具 yum install -y bzip2 ##安装Git yum install -y git ##使用git下载head插件: cd /usr/local git clone git://github.com/mobz/elasticsearch-head.git ## 安装 elasticsearch-head 依赖的包 cd ./elasticsearch-head rm -rf ./node_modules ##重新安装时一定要删除 npm install --unsafe-perm
添加hostname 1、修改 Gruntfile.js 在connect-->server-->options下面添加:hostname:'*' ,允许所有IP可 以访问 [root@hadoop01 elasticsearch-head]# vi ./Gruntfile.js 修改如下: connect: { server: { options: { hostname: '*', ###新增 port: 9100, base: '.', keepalive: true } } } 2、修改默认连接 [root@hadoop01 elasticsearch-head]# vi ./_site/app.js 修改如下: this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://hdp0:9200"; #localhost修改成hdp0
1、先使用es用户启动es集群 2、使用root用户启动head插件 ./node_modules/grunt/bin/grunt server #前端 启动 nohup ./node_modules/grunt/bin/grunt server> /var/log/head.log 2>&1 & #前端启动
ps -ef | grep server
scala程序编写,连接es 尝试添加数据
6.5.3 org.elasticsearch elasticsearch${es.version} ${scope} org.elasticsearch.client transport${es.version} ${scope} io.netty netty-all4.1.25.Final
{ "config": { "cluster.name": "bigdata", "client.transport.sniff": true }, "address": [ { "ip": "hdp0", "port": 9300 }, { "ip": "hdp1", "port": 9300 }, { "ip": "hdp2", "port": 9300 } ] }
ESConfigUtil.scala 读取配置文件
package com.qianfeng.bigdata.realtime.flink.util.es import java.net.{InetAddress, InetSocketAddress} import com.qianfeng.bigdata.realtime.flink.constant.QRealTimeConstant import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import org.apache.http.HttpHost object ESConfigUtil { //定义两个变量 var esConfigSocket: ESConfigSocket = null var esConfigHttpHost : ESConfigHttpHost = null //定义封装socket的配置类 class ESConfigSocket(var config: java.util.HashMap[String, String], var transportAddresses: java.util.ArrayList[InetSocketAddress]) //定义封装http的配置类 class ESConfigHttpHost(var config: java.util.HashMap[String, String], var transportAddresses: java.util.ArrayList[HttpHost]) def getConfigSocket(configPath: String): ESConfigSocket = { //使用类加载器加载配置文件 val configStream = this.getClass.getClassLoader.getResourceAsStream(configPath) if (null == esConfigSocket) { val mapper = new ObjectMapper() val configJsonObject = mapper.readTree(configStream) val configJsonNode = configJsonObject.get("config") val config = { val configJsonMap = new java.util.HashMap[String, String] val it = configJsonNode.fieldNames() while (it.hasNext) { val key = it.next() configJsonMap.put(key, configJsonNode.get(key).asText()) } configJsonMap } val addressJsonNode = configJsonObject.get("address") val addressJsonArray = classOf[ArrayNode].cast(addressJsonNode) val transportAddresses = { val transportAddresses = new java.util.ArrayList[InetSocketAddress] val it = addressJsonArray.iterator() while (it.hasNext) { val detailJsonNode: JsonNode = it.next() val ip = detailJsonNode.get("ip").asText() val port = detailJsonNode.get("port").asInt() transportAddresses.add(new InetSocketAddress(InetAddress.getByName(ip), port)) } transportAddresses } esConfigSocket = new ESConfigSocket(config, transportAddresses) } esConfigSocket } def getConfigHttpHost(configPath: String): ESConfigHttpHost = { val configStream = this.getClass.getClassLoader.getResourceAsStream(configPath) if (null == esConfigHttpHost) { val mapper = new ObjectMapper() val configJsonObject = mapper.readTree(configStream) val configJsonNode = configJsonObject.get("config") val config = { val configJsonMap = new java.util.HashMap[String, String] val it = configJsonNode.fieldNames() while (it.hasNext) { val key = it.next() configJsonMap.put(key, configJsonNode.get(key).asText()) } configJsonMap } val addressJsonNode = configJsonObject.get("address") val addressJsonArray = classOf[ArrayNode].cast(addressJsonNode) val transportAddresses = { val httpHosts = new java.util.ArrayList[HttpHost] val it = addressJsonArray.iterator() while (it.hasNext) { val detailJsonNode: JsonNode = it.next() val ip = detailJsonNode.get("ip").asText() val port = detailJsonNode.get("port").asInt() val schema = "http" val httpHost = new HttpHost(ip, port, schema) httpHosts.add(httpHost) } httpHosts } esConfigHttpHost = new ESConfigHttpHost(config, transportAddresses) } esConfigHttpHost } //测试 def main(args: Array[String]): Unit = { println(getConfigSocket(QRealTimeConstant.ES_CONFIG_PATH).config.get("cluster.name")) println(getConfigHttpHost(QRealTimeConstant.ES_CONFIG_PATH).transportAddresses.get(0).getHostName) } }
package com.qianfeng.bigdata.realtime.flink.util.es import java.net.InetSocketAddress import com.qianfeng.bigdata.realtime.flink.constant.QRealTimeConstant import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.TransportAddress import org.elasticsearch.transport.client.PreBuiltTransportClient import org.slf4j.{Logger, LoggerFactory} import java.util import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.action.update.UpdateResponse import scala.collection.JavaConverters._ import scala.collection.mutable object ES6ClientUtil { private val logger: Logger = LoggerFactory.getLogger(ES6ClientUtil.getClass) def buildTransportClient(esConfigPath:String = QRealTimeConstant.ES_CONFIG_PATH):PreBuiltTransportClient={ //判断esconfigpath 是否为空 if(esConfigPath == null){ throw new RuntimeException("esconfigpth is null ")} var transportClient:PreBuiltTransportClient =null val esConfig: ESConfigUtil.ESConfigSocket = ESConfigUtil.getConfigSocket(esConfigPath) val transAddrs: mutable.Buffer[InetSocketAddress] = esConfig.transportAddresses.asScala val settings: Settings.Builder = Settings.builder() for ((key,value) <- esConfig.config.asScala){ settings.put(key,value) } transportClient = new PreBuiltTransportClient(settings.build()) for (transAddr <- transAddrs) { val address = new TransportAddress(transAddr) transportClient.addTransportAddress(address) } transportClient } def main(args: Array[String]): Unit = { val transportClient: PreBuiltTransportClient = buildTransportClient() val indexName ="user" val esID="007" var value = new util.HashMap[String,String]() value.put("name","daqu") value.put("age","301") val indexRequest: IndexRequest = new IndexRequest(indexName,"book",esID).source(value) val response: UpdateResponse = transportClient.prepareUpdate(indexName,"book",esID).setDoc(value).setUpsert(indexRequest).get() println(response.status().getStatus) } }