logstash-input-jdbc 同步 elasticsearch

logstash-input-jdbc 同步 elasticsearch,第1张

logstash-input-jdbc 同步 elasticsearch

安装 logstash

https://www.elastic.co/cn/downloads/past-releases#logstash

yum安装logstash 6.8

编辑yum repo

vi /etc/yum.repos.d/logstash.repo

存入内容

[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

安装

 yum install logstash
 #通过yum安装的logstash
 #程序目录 
 /usr/share/logstash
 #配置文件目录:
 /etc/logstash

安装 logstash-input-jdbc

cd /usr/share/logstash
./bin/logstash-plugin install logstash-input-jdbc

安装成功

Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful

同步mysql表—全量
配置
vi config/conf.d/log2021_q4.conf

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.15.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => "test"
    jdbc_password => "GOftest"
    statement => "SELECt * FROM log2021_Q4"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "5000"
    schedule => "*/10 * * * *"
  }
}
 
filter {
   json {
        source => "message"
        remove_field => ["message"]
    }
}
 
output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => "localhost:8607"
    index => "log2021_q4"
    document_id => "%{id}"
  }        
} 

执行

cd 
nohup bin/logstash -f config/conf.d/log2021_q4.conf > logs/logstash.out &

同步mysql表—增量

配置文件
vi config/conf.d/log2021_q4_inc.conf

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.15.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => "test"
    jdbc_password => "GOftest"
    statement => "SELECt * FROM log2021_Q4 where id > :sql_last_value"
    use_column_value => true
    tracking_column => id
    record_last_run => true
    last_run_metadata_path => "/usr/share/logstash/inc_data/jc_app_log2021_q4.txt"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "5000"
    schedule => "* * * * *"
  }
}

filter {
   json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => "localhost:8607"
    index => "log2021_q4"
    document_id => "%{id}"
  }
}

logstash Could not index event to Elasticsearch 错误原因

1、index名必须小写

  elasticsearch {
    hosts => "localhost:8607"
    index => "log2021_q4"
    document_id => "%{id}"
  }   

2、因elasticsearch配置问题导致

解决方法:

action.auto_create_index: false #禁用自动创建index

将false改成true即可,或者删除这个配置,查阅官方文档改配置是默认开启的。

多个conf文件的执行

需要配置pipeline
vi conf/pipelines.yml

- pipeline.id: log2021_q3
 pipeline.workers: 1
 pipeline.batch.size: 1
 path.config: "/usr/share/logstash/config/conf.d/log2021_q3.conf"
- pipeline.id: log2021_q4_inc
 queue.type: persisted
 path.config: "/usr/share/logstash/config/conf.d/log2021_q4_inc.conf"

执行logstash

nohup ./bin/logstash > logs/out.log &

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4686653.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存