Elasticsearch7.15.2 mysql8.0.26 logstash-input-jdbc 数据增量索引构建

Elasticsearch7.15.2 mysql8.0.26 logstash-input-jdbc 数据增量索引构建,第1张

Elasticsearch7.15.2 mysql8.0.26 logstash-input-jdbc 数据增量索引构建




文章目录
          • 一、基础软件安装
            • 1. 安装mysql
            • 2. Elasticsearch7.15.2 安装部署
            • 3. kibana 安装部署
            • 4. logstash-input-jdbc 安装部署
          • 二、数据库准备
            • 2.1. 创建数据库
            • 2.2. 表结构初始化
            • 2.3. 数据初始化
          • 三、logstash 配置mysql
            • 3.1. 创建目录
            • 3.2. 上传mysql驱动
            • 3.3. 创建jdbc.conf
            • 3.4. 创建jdbc.sql
            • 3.5. 创建last_value_meta
            • 3.6. 赋予权限
          • 四、启动中间件
            • 4.1. 启动mysql
            • 4.2. 启动es7
            • 4.3. 启动kibana
            • 4.4. 点评搜索索引定义
            • 4.5. 启动logstash
            • 4.6. 验证全量索引
            • 4.7. 增量索引同步

一、基础软件安装 1. 安装mysql

Mysql 8.0 安装教程 Linux Centos7

2. Elasticsearch7.15.2 安装部署

Elasticsearch7.15.2 安装、部署(linux环境)

3. kibana 安装部署

kibana 一分钟下载、安装、部署linux

4. logstash-input-jdbc 安装部署

logstash-input-jdbc 下载安装 linux

二、数据库准备 2.1. 创建数据库

创建dianpingdb数据库

2.2. 表结构初始化

表结构sql

2.3. 数据初始化

数据初始化

三、logstash 配置mysql 3.1. 创建目录
cd /app/logstash-7.15.2/bin
mkdir mysql
3.2. 上传mysql驱动

将mysql驱动上传到mysql目录下面

cd mysql
ll

3.3. 创建jdbc.conf
vim jdbc.conf

添加内容:

input {
     jdbc {
        # 设置 timezone 统一时间
        jdbc_default_timezone => "Asia/Shanghai"
     	# mysql 数据库连接,dianpingdb为数据库名称
     	jdbc_connection_string => "jdbc:mysql://localhost:3306/dianpingdb?useUnicode=true&characterEncoding=UTF8&autoReconnect=true"
     	# 用户名和密码
     	jdbc_user => "root"
     	jdbc_password => "123456"
     	# 驱动
     	jdbc_driver_library => "/app/logstash-7.15.2/bin/mysql/mysql-connector-java-8.0.27.jar"
     	# 驱动类名
     	jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
     	jdbc_page_size => "50000"
        # 记录上一次更新时间
        last_run_metadata_path => "/app/logstash-7.15.2/bin/mysql/last_value_meta"
     	# 执行的sql 文件路径+名称
     	statement_filepath => "/app/logstash-7.15.2/bin/mysql/jdbc.sql"
     	# 设置监听间隔 字段含义(由左至右)分、时、天、月、年 全部* 默认每分钟实时更新
     	schedule => "* * * * *"
     }
}

output {
	elasticsearch {
	   # ES 的IP地址及端口
	   hosts => ["localhost:9200"]
	   # 索引名称
	   index => "shop"
	   document_type => "_doc"
	   # 自增ID 需要关联数据库中有一个id字段,对应索引的id号
	   document_id => "%{id}"
	}
	stdout {
	   # JSON格式输出
	   codec => json_lines
	}
}
3.4. 创建jdbc.sql
vim jdbc.conf

添加内容:

SELECt a.id,a.name,a.tags,CONCAT(a.latitude,',',a.longitude) AS location,a.remark_score,a.price_per_man,a.category_id,b.`name` as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag FROM shop a INNER JOIN category b on a.category_id =b.id INNER JOIN seller c on c.id = a.seller_id where a.updated_at > :sql_last_value or b.updated_at > :sql_last_value or c.updated_at > :sql_last_value

格式化后:

SELECT
	a.id,
	a.NAME,
	a.tags,
	CONCAT( a.latitude, ',', a.longitude ) AS location,
	a.remark_score,
	a.price_per_man,
	a.category_id,
	b.`name` AS category_name,
	a.seller_id,
	c.remark_score AS seller_remark_score,
	c.disabled_flag AS seller_disabled_flag 
FROM
	shop a
	INNER JOIN category b ON a.category_id = b.id
	INNER JOIN seller c ON c.id = a.seller_id 
WHERe
	a.updated_at > : sql_last_value 
	OR b.updated_at > : sql_last_value 
	OR c.updated_at > : sql_last_value
3.5. 创建last_value_meta
vim last_value_meta

添加内容:
数据是2019年的,因此,填写一个早一点的日期即可

2010-11-11 11-11-11
3.6. 赋予权限

*** 作ELK均使用es用户,在安装es7是此用户已经创建,企业中不允许使用root和软件限制root用户 *** 作;因此,这样可以做到权限分明

chown es.es /app/logstash-7.15.2/ -R
四、启动中间件 4.1. 启动mysql

linux7.x

systemctl start mysql
systemctl status mysql

linux6.x

service mysql start
service mysql status 
4.2. 启动es7
su - es
cd /app/elasticsearch-7.15.2/
bin/elasticsearch -d
4.3. 启动kibana

启动方式任选一种

su - es
cd /app/kibana-7.15.2-linux-x86_64/
  • 前台启动
bin/kibana
  • 后台启动
nohup bin/kibana &

注意,由于kibana是前台启动因此,此命令窗口不能关闭

4.4. 点评搜索索引定义



# 点评搜索索引定义初始化es7
PUT /shop
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  }
  , "mappings": {
    "properties": {
      "id":{"type": "integer"},
      "name":{"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},
      "tags":{"type": "text","analyzer": "whitespace","fielddata": true},
       "location":{"type": "geo_point"},
       "remark_score":{"type": "double"},
       "price_per_man":{"type": "integer"},
       "category_id":{"type": "integer"},
       "category_name":{"type": "keyword"},
       "seller_id":{"type": "integer"},
       "seller_remark_score":{"type": "double"},
       "seller_disabled_flag":{"type": "integer"}
    }
  }
}


#搜索shop文档索引
GET /shop/_search


4.5. 启动logstash
su - es
cd /app/logstash-7.15.2/bin
./logstash -f mysql/jdbc.conf

控制台日志:红色部分是咱们的sql,蓝色部分是同步的数据

4.6. 验证全量索引
# 搜索shop文档索引
GET /shop/_search

4.7. 增量索引同步

手动修改数据库数据,模拟增量同步
原数据

修改后数据

提交事务,观察logstash控制台和es

# 搜索shop文档索引
GET /shop/_search
{
  "query": {
    "match": {
      "name": "面馆"
    }
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5574604.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存