如果
id用于选择行,则不能这样做。您有2种选择,
每次都选择所有行,然后使用query将它们发送到ES
SELECt * FROM blog_pro
,根据您的情况,我认为这不是一个好选择。创建一个新列
last_modified_time
,其中包含记录(行)的最后修改的时间戳。然后使用它来过滤行。注意属性tracking_column_type => "timestamp"
`statement =>”SELECt * FROM blog_pro WHERe last_modiefied_time
:sql_last_value” use_column_value =>true tracking_column =>last_modified_time
tracking_column_type => “timestamp”`
这是完整的logstash配置
input { jdbc { jdbc_connection_string =>"jdbc:mysql://192.168.3.57:3306/blog_pro" jdbc_user =>"dush" jdbc_password =>"dush" jdbc_driver_library =>"F:logstash-6.2.2binmysql-connector-java-5.1.6.jar" jdbc_driver_class =>"com.mysql.jdbc.Driver" schedule =>"* * * * *" statement =>"SELECt * FROM blog_pro WHERe last_modified_time >:sql_last_value" use_column_value =>true tracking_column =>last_modified_time tracking_column_type => "timestamp" } }output { #output to elasticsearch elasticsearch { hosts => [ "192.168.1.245:9201" ] action=>update # "%{id}" - > primary key of the table document_id => "%{id}" doc_as_upsert =>true }}
请注意,您可能需要清除索引并使用此配置开始索引。我对此进行了测试,并且工作正常。
Elasticsearch版本= 5.xx
logstash版本= 6.2.2
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)