Maxwell Bootstrap可以将MySQL中已经存在的数据批量同步到Kafka中, *** 作步骤如下:
修改“/opt/maxwell-1.28.2/config.properties”配置文件停止maxwell进程,在当前config.properties配置文件最后一行添加配置“client_id”,此配置项是指定当前maxwell启动后连接mysql的实例id,名字自取,在全量同步数据时需要使用到。
#指定maxwell 当前连接mysql的实例id,名字自取 client_id=maxwell_first将testdb数据库中person表全量同步数据
mysql> select * from person; +------+------+------+ | id | name | age | +------+------+------+ | 1 | zs | 100 | | 2 | ls | 19 | | 3 | ww | 20 | | 4 | s1 | 21 | | 5 | s2 | 22 | | 6 | s3 | 23 | | 4 | s1 | 21 | | 5 | s2 | 22 | | 6 | s3 | 23 | +------+------+------+ 9 rows in set (0.00 sec重新启动Maxwell,然后启动maxwell-bootstrap全量导数据
maxwell-bootstrap脚本可以指定MySQL数据库及表参数,同步MySQL指定库下对应表的全量数据,同时可以指定where条件。
#重启maxwell [root@node4 bin]# maxwell --config ../config.properties
同步testdb.person表的全量数据
[root@node4 bin]# ./maxwell-bootstrap --database testdb --table person --host node1 --user root --password 123456 --client_id maxwell_first connecting to jdbc:mysql://node1:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&zeroDateTimeBehavior=convertToNull connecting to jdbc:mysql://node1:3306 user: root, passwd: '123456'
查看kafka中数据
[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic maxwell_topic {"database":"maxwell","table":"bootstrap","type":"insert","ts":1641296971,"xid":11953,"commit":true,"data":{"id":6,"database_name":"testdb","table_name":"person","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":9,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell_first","comment":null}} {"database":"testdb","table":"person","type":"bootstrap-start","ts":1641296971,"data":{}} {"database":"testdb","table":"person","type":"bootstrap-insert","ts":1641296971,"data":{"id":1,"name":"zs","age":100}} {"database":"testdb","table":"person","type":"bootstrap-insert","ts":1641296971,"data":{"id":2,"name":"ls","age":19}} {"database":"testdb","table":"person","type":"bootstrap-insert","ts":1641296971,"data":{"id":3,"name":"ww","age":20}} {"database":"testdb","table":"person","type":"bootstrap-insert","ts":1641296971,"data":{"id":4,"name":"s1","age":21}} {"database":"testdb","table":"person","type":"bootstrap-insert","ts":1641296971,"data":{"id":5,"name":"s2","age":22}} {"database":"testdb","table":"person","type":"bootstrap-insert","ts":1641296971,"data":{"id":6,"name":"s3","age":23}} {"database":"testdb","table":"person","type":"bootstrap-insert","ts":1641296971,"data":{"id":4,"name":"s1","age":21}} {"database":"testdb","table":"person","type":"bootstrap-insert","ts":1641296971,"data":{"id":5,"name":"s2","age":22}} {"database":"testdb","table":"person","type":"bootstrap-insert","ts":1641296971,"data":{"id":6,"name":"s3","age":23}} {"database":"testdb","table":"person","type":"bootstrap-complete","ts":1641296971,"data":{}}
注意:maxwell-bootstrap命令执行后,可以在对应的topic中查看到数据,这里对应的topic是maxwell进程对应config.properties文件中配置的topic。同时maxwell-bootstrap命令指定的client_id 需要与maxwell进行对应的config.properties配置文件中配置的一样。–where是指定条件,只会全量导入满足条件的数据,有了where条件可以使maxwell-bootstrap进程配合maxwell实时同步进程将一张表数据无缝同步到Kafka中。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)