Elasticsearch集群部署 客户端连接小入门

Elasticsearch集群部署 客户端连接小入门,第1张

Elasticsearch集群部署 客户端连接小入门

部署版本: elasticsearch-6.5.3

准备三台服务器 19.168.174.204,192.168.174.205,192.168.174.206

第一步 : 每台服务器 都创建一个es 用户, elasticsearch不支持root 启动

useradd es

第二步: 在root 用户下 ,修改es 用户文件打开数

ulimit -Hn 查看硬限制

vi /etc/security/limits.conf

es soft nofile 65536
es hard nofile 65536

注意一定要重启虚拟机,使其生效

修改 vi /etc/security/limits.d/20-nproc.conf ~> 文件名以 -nproc.conf结尾

soft nproc 1024 修改为 soft nproc 2048

修改 vi /etc/sysctl.conf ~>在root用户下进行修改

添加下面配置:
vm.max_map_count=655360
并执行命令:
sysctl -p

第三步:上传tar 包

elasticsearch-6.5.3.tar.gz

tar -zxvf elasticsearch-6.5.3.tar.gz 

mv elasticsearch-6.5.3 elasticsearch

第四步:修改配置文件 elasticsearch.yml

# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: bigdata
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: es-1
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /opt/soft/elasticsearch/data
#
# Path to log files:
#
path.logs: /opt/soft/elasticsearch/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: hdp0
node.master: true
node.data: true


#
# Set a custom port for HTTP:
#
#http.port: 9200
#
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when new node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.zen.ping.unicast.hosts: ["hdp0", "hdp1","hdp2"]
#
# Prevent the "split brain" by configuring the majority of nodes (total number of master-eligible nodes / 2 + 1):
#
discovery.zen.minimum_master_nodes: 2

bootstrap.memory_lock: false
http.cors.enabled: true
http.cors.allow-origin: "*"
# For more information, consult the zen discovery module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true

分发到其他服务器

scp -r elasticsearch   hdp1:$PWD
scp -r elasticsearch   hdp2:$PWD

三台 root 赋权  否则会报权限不够 es 无法启动

chmod 777 -R elasticsearch

修改其他的node

hdp1服务器配置文件修改:
node.name: es-2
network.host: hdp1
hdp2服务器配置文件修改:
node.name: es-3
network.host: hdp2

切换到用es 

./elasticsearch -d

web 测试

 由于 上面的web 显示是一个JSON页面,实在不美观.那就使用 elasticsearch-head.运行elasticsearch-head会用到grunt,而grunt需要npm包管理器,所以nodejs是必须要安装的。选择一台服务器部署

第一步:下载node-v12.16.1-linux-x64.tar.xz

tar -xvf node-v12.16.1-linux-x64.tar.xz  ##解压

ln -s /opt/soft/node-v12.16.1-linux64/bin/node /usr/bin/node  ## 创建软连接
ln -s /opt/soft/node-v12.16.1-linux64/bin/npm /usr/bin/npm  ## 创建软连接

##查看版本信息
node -v 
npm -v
##这里一定配置下淘宝镜像,这样很快

npm config set registry http://registry.cnpmjs.org ##注册官网镜像(使用淘宝较快)
npm config set registry https://registry.npm.taobao.org ##注册淘宝镜像
npm install -g grunt-cli

下载phontomjs

##下载
https://bitbucket.org/ariya/phantomjs/downloads/phantomjs-2.1.1-linux-x86_64.tar.bz2

##安装解压的工具
yum install -y bzip2

##安装Git
yum install -y git
##使用git下载head插件:
cd /usr/local
git clone git://github.com/mobz/elasticsearch-head.git

## 安装 elasticsearch-head 依赖的包
cd ./elasticsearch-head
rm -rf ./node_modules ##重新安装时一定要删除
npm install --unsafe-perm

修改配置文件

添加hostname
1、修改 Gruntfile.js 在connect-->server-->options下面添加:hostname:'*' ,允许所有IP可
以访问
[root@hadoop01 elasticsearch-head]# vi ./Gruntfile.js
修改如下:
connect: {
server: {
options: {
hostname: '*', ###新增
port: 9100,
base: '.',
keepalive: true
}
}
}

2、修改默认连接
[root@hadoop01 elasticsearch-head]# vi ./_site/app.js
修改如下:
this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") ||
"http://hdp0:9200"; #localhost修改成hdp0

 启动

1、先使用es用户启动es集群
2、使用root用户启动head插件
./node_modules/grunt/bin/grunt server #前端

启动
nohup ./node_modules/grunt/bin/grunt server> /var/log/head.log 2>&1 & #前端启动

测试进程

ps -ef | grep server

使用web浏览器访问:http://hdp0:9100

 scala程序编写,连接es 尝试添加数据

pom.xml

        
     6.5.3




       
        
            org.elasticsearch
            elasticsearch
            ${es.version}
            ${scope}
        

        
        
            org.elasticsearch.client
            transport
            ${es.version}
            ${scope}
        

        
        
            io.netty
            netty-all
            4.1.25.Final
        

es-config.json

{
  "config": {
    "cluster.name": "bigdata",
    "client.transport.sniff": true
  },
  "address": [
    {
      "ip": "hdp0",
      "port": 9300
    },
    {
      "ip": "hdp1",
      "port": 9300
    },
    {
      "ip": "hdp2",
      "port": 9300
    }
  ]
}
ESConfigUtil.scala 读取配置文件
package com.qianfeng.bigdata.realtime.flink.util.es

import java.net.{InetAddress, InetSocketAddress}

import com.qianfeng.bigdata.realtime.flink.constant.QRealTimeConstant
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.http.HttpHost


object ESConfigUtil {

  //定义两个变量
  var esConfigSocket: ESConfigSocket = null
  var esConfigHttpHost : ESConfigHttpHost = null
  //定义封装socket的配置类
  class ESConfigSocket(var config: java.util.HashMap[String, String], var transportAddresses: java.util.ArrayList[InetSocketAddress])
  //定义封装http的配置类
  class ESConfigHttpHost(var config: java.util.HashMap[String, String], var transportAddresses: java.util.ArrayList[HttpHost])

  
  def getConfigSocket(configPath: String): ESConfigSocket = {
    //使用类加载器加载配置文件
    val configStream = this.getClass.getClassLoader.getResourceAsStream(configPath)
    if (null == esConfigSocket) {
      val mapper = new ObjectMapper()
      val configJsonObject = mapper.readTree(configStream)

      val configJsonNode = configJsonObject.get("config")

      val config = {
        val configJsonMap = new java.util.HashMap[String, String]
        val it = configJsonNode.fieldNames()
        while (it.hasNext) {
          val key = it.next()
          configJsonMap.put(key, configJsonNode.get(key).asText())
        }
        configJsonMap
      }

      val addressJsonNode = configJsonObject.get("address")
      val addressJsonArray = classOf[ArrayNode].cast(addressJsonNode)
      val transportAddresses = {
        val transportAddresses = new java.util.ArrayList[InetSocketAddress]
        val it = addressJsonArray.iterator()
        while (it.hasNext) {
          val detailJsonNode: JsonNode = it.next()
          val ip = detailJsonNode.get("ip").asText()
          val port = detailJsonNode.get("port").asInt()
          transportAddresses.add(new InetSocketAddress(InetAddress.getByName(ip), port))
        }
        transportAddresses
      }

      esConfigSocket = new ESConfigSocket(config, transportAddresses)
    }
    esConfigSocket
  }


  
  def getConfigHttpHost(configPath: String): ESConfigHttpHost = {
    val configStream = this.getClass.getClassLoader.getResourceAsStream(configPath)
    if (null == esConfigHttpHost) {
      val mapper = new ObjectMapper()
      val configJsonObject = mapper.readTree(configStream)

      val configJsonNode = configJsonObject.get("config")

      val config = {
        val configJsonMap = new java.util.HashMap[String, String]
        val it = configJsonNode.fieldNames()
        while (it.hasNext) {
          val key = it.next()
          configJsonMap.put(key, configJsonNode.get(key).asText())
        }
        configJsonMap
      }

      val addressJsonNode = configJsonObject.get("address")
      val addressJsonArray = classOf[ArrayNode].cast(addressJsonNode)
      val transportAddresses = {
        val httpHosts = new java.util.ArrayList[HttpHost]
        val it = addressJsonArray.iterator()
        while (it.hasNext) {
          val detailJsonNode: JsonNode = it.next()
          val ip = detailJsonNode.get("ip").asText()
          val port = detailJsonNode.get("port").asInt()
          val schema = "http"

          val httpHost = new HttpHost(ip, port, schema)
          httpHosts.add(httpHost)
        }
        httpHosts
      }

      esConfigHttpHost = new ESConfigHttpHost(config, transportAddresses)
    }
    esConfigHttpHost
  }

  //测试
  def main(args: Array[String]): Unit = {
    println(getConfigSocket(QRealTimeConstant.ES_CONFIG_PATH).config.get("cluster.name"))

    println(getConfigHttpHost(QRealTimeConstant.ES_CONFIG_PATH).transportAddresses.get(0).getHostName)
  }
}

连接es的客户端 

ES6ClientUtil
package com.qianfeng.bigdata.realtime.flink.util.es

import java.net.InetSocketAddress

import com.qianfeng.bigdata.realtime.flink.constant.QRealTimeConstant
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.TransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient
import org.slf4j.{Logger, LoggerFactory}
import java.util

import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.action.update.UpdateResponse

import scala.collection.JavaConverters._
import scala.collection.mutable

object ES6ClientUtil {
  private val logger: Logger = LoggerFactory.getLogger(ES6ClientUtil.getClass)


  def buildTransportClient(esConfigPath:String = QRealTimeConstant.ES_CONFIG_PATH):PreBuiltTransportClient={
    //判断esconfigpath 是否为空

    if(esConfigPath == null){ throw new RuntimeException("esconfigpth is null ")}

    var transportClient:PreBuiltTransportClient =null

    val esConfig: ESConfigUtil.ESConfigSocket = ESConfigUtil.getConfigSocket(esConfigPath)
    val transAddrs: mutable.Buffer[InetSocketAddress] = esConfig.transportAddresses.asScala

    val settings: Settings.Builder = Settings.builder()

    for ((key,value) <- esConfig.config.asScala){
      settings.put(key,value)
    }
    transportClient = new  PreBuiltTransportClient(settings.build())

    for (transAddr <- transAddrs) {
      val address = new  TransportAddress(transAddr)
      transportClient.addTransportAddress(address)
    }
    transportClient
  }

  def main(args: Array[String]): Unit = {
    
    val transportClient: PreBuiltTransportClient = buildTransportClient()

    val indexName ="user"
    val esID="007"

    var value = new util.HashMap[String,String]()
    value.put("name","daqu")
    value.put("age","301")
    
    val indexRequest: IndexRequest = new  IndexRequest(indexName,"book",esID).source(value)
    val response: UpdateResponse = transportClient.prepareUpdate(indexName,"book",esID).setDoc(value).setUpsert(indexRequest).get()

    println(response.status().getStatus)

  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5704869.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存