wal日志即write ahead log预写式日志,简称wal日志。wal日志可以说是PostgreSQL中十分重要的部分,相当于oracle中的redo日志。
当数据库中数据发生变更时:
change发生时:先要将变更后内容计入wal buffer中,再将变更后的数据写入data buffer;
commit发生时:wal buffer中数据刷新到磁盘;
checkpoint发生时:将所有data buffer刷新的磁盘。
可以想象,如果没有wal日志,那么数据库中将会发生什么?
首先,当我们在数据库中更新数据时,如果没有wal日志,那么每次更新都会将数据刷到磁盘上,并且这个动作是随机i/o,性能可想而知。并且没有wal日志,关系型数据库中事务的ACID如何保证呢?
因此wal日志重要性可想而知。其中心思想就是:先写入日志文件,再写入数据
什么是复制槽制槽(replication slot)在postgresql9.4版本中被引入,引入之初是为了防止备库需要的xlog日志在主库被删除,主库会会根据备库返回的信息确认哪些xlog已不再需要,,才能进行清理。同时主库不会移除那些导致恢复冲突的行,关于恢复冲突,前面有一篇文章讲到过可以通过设置hot_standby_feedback、max_standby_streaming_delay等参数进行预防,但是这些参数只有在主备关系正常时才能起到作用,而replication slot能够确保在主备断连后主库的xlog仍不被清理。
复制槽分为物理复制槽physical replication slot和逻辑复制槽logic replication slot。物理复制槽一般结合流复制一起使用,能够很好的保证备库需要的日志不会在主库删除,本文重点讨论逻辑复制槽。
Logic replication slots一般被用于逻辑异步复制,一个很好的应用就是用于异构数据库之间的逻辑复制。大致原理是将源端xlog进行解码,解析成具体sql,然后到目标端进行回放。支持逻辑解码需要将wal_level设置为logic,logic会在replica的基础上增加一些信息以支持逻辑解码,该模式会增大wal日志的数量,尤其是大量的update,delete *** 作的库。
需要关注的问题
对于逻辑复制槽,有下面几点需要注意:
①一个逻辑复制槽只能解码一个database,但是一个database可以有多个复制槽进行解码,同一个复制槽可能同时有多个接收端进行订阅。
②复制槽的消息只发送一次,同时它不关心接收端的状态,如果接收端执行失败,那么复制槽不会向前推进,接收端成功后继续从上次失败的位点继续进行消费。
③不支持DDL、列存、压缩表的解码,DDL一般需要需要创建额外的触发器来进行处理,但可以做到表级订阅。
④逻辑复制不能保证数据不丢失,不能用作数据容灾,但是可以用于数据迁移,在主库有大事务的情况下延迟较大。
⑤不使用的复制槽一定要及时删除。
通过复制槽,从库订阅主库,可以保证从库在没有收到主库的日志之前,主库不会删除从库未读的部分。也因此不用的槽要即时删除,否则会导致日志积压
***flink-cdc***的就是通过创建复制槽订阅PG来实现实时监控数据库变化的。
flink-cdc配置以及使用1.maven依赖
4.0.0 com.flink database1.0-SNAPSHOT 1.13.0 2.11 2.2.0 1.8 1.8 1.8 UTF-8 false false 1.8 1.8 2.5 2.11 2.4.2 2.3.1 org.apache.flink flink-scala_2.111.13.0 org.apache.flink flink-streaming-scala_2.111.13.0 com.alibaba.ververica flink-connector-postgres-cdc1.4.0 org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version}${flink.version} org.apache.flink flink-clients_2.11${flink.version} org.apache.flink flink-table-planner-blink_2.11${flink.version} com.alibaba fastjson1.2.75 org.springframework spring-beans5.2.8.RELEASE org.apache.maven.plugins maven-shade-plugin2.4.3 package shade *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA reference.conf PgsqlToMysqlTest false maven-compiler-plugin 3.5.1 ${maven.compiler.target} ${maven.compiler.source} true true org.apache.maven.plugins maven-surefire-plugin2.12.4 ${maven.test.skip} org.apache.rat apache-rat-plugin0.12 .gitignore .travis.yml .asf.yaml README.md org.jacoco jacoco-maven-plugin0.8.7 prepare-agent prepare-agent org.eluder.coveralls coveralls-maven-plugin4.3.0 javax.xml.bind jaxb-api${jaxb-api.version} maven-checkstyle-plugin 2.17 verify verify style/rmq_checkstyle.xml UTF-8 true true false false check org.apache.maven.plugins maven-javadoc-plugin2.10.4 true javadocs en
再看看代码怎么写
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PgsqlToMysqlTest { public static void main(String[] args) throws Exception { // 设置flink表环境变量 EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 获取flink流环境变量 StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); exeEnv.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); exeEnv.getCheckpointConfig().setCheckpointTimeout(60000); // make sure 500 ms of progress happen between checkpoints exeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // allow only one checkpoint to be in progress at the same time exeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation exeEnv.getCheckpointConfig() .enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); FsStateBackend stateBackend = new FsStateBackend("file:\\D:\fsdata"); // // stateBackend.resolveCheckpoint("D:\fsdata\dda9ae98c2b864ba8448d2c5eee2e5c3\chk-6"); // 固定延迟重启(最多尝试3次,每次间隔10s) // exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 10000L)); // 失败率重启(在10分钟内最多尝试3次,每次至少间隔1分钟) // exeEnv.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), // Time.minutes(1))); // exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); exeEnv.setStateBackend(stateBackend); // exeEnv.setDefaultSavepointDirectory(); exeEnv.setParallelism(2); // 表执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings); // 拼接souceDLL String sourceDDL = "CREATE TABLE pgsql_source (n" + " id int,n" + " name STRING,n" + " py_code STRING,n" + " seq_no int,n" + " description STRINGn" + ") WITH (n" + " 'connector' = 'postgres-cdc',n" + " 'hostname' = '192.168.159.100',n" + " 'port' = '5431',n" + " 'username' = 'postgres',n" + " 'password' = '123',n" + " 'database-name' = 'postgres',n" + " 'schema-name' = 'public',n" + " 'debezium.snapshot.mode' = 'initial',n" + " 'decoding.plugin.name' = 'pgoutput',n" + " 'debezium.slot.name' = 'pgsql_source_li2',n" + " 'table-name' = 'test_copy2_copy1'n" + ")"; // 执行source表ddl tableEnv.executeSql(sourceDDL); String transformSQL = "select * from pgsql_source"; Table tableResult = tableEnv.sqlQuery(transformSQL); DataStream> dataStream = tableEnv.toRetractStream(tableResult, Row.class); dataStream.print(); // StreamGraph graph = new StreamGraph() exeEnv.execute("jobname"); // 执行sink表ddl // 执行逻辑sql语句 // TableResult tableResult = tableEnv.executeSql(transformSQL); // tableEnv.execute(""); // 控制塔输出 // tableResult.print(); } }
说明两个配置
debezium.snapshot.mode = 'initial'
initial :默认设置,第一次启动创建数据库快照,后面根据记录偏移量继续读
never:从不建立快照,如果本地无偏移量,从最后的log开始读
always:每次启动都建立快照
exporter: 功能和inintial相同,不同之处在于其不会对表上锁,使用SET TRANSACTION ISOLATION LEVEL REPEATABLE READ,可重复读的隔离级别
实现类io.debezium.connector.postgresql.snapshot.ExportedSnapshotter
public OptionalsnapshotTableLockingStatement(Duration lockTimeout, Set tableIds) { return Optional.empty(); } public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { if (newSlotInfo != null) { String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName()); return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; n" + snapSet; } else { return super.snapshotTransactionIsolationLevelStatement(newSlotInfo); } }
custom :用户自定义 快照,配合debezium.snapshot.custom.class使用
什么是快照?
之前说过wal日志实际上会删除,因此单纯读wal日志,并不能读到全数据库的数据
,因此在第一次启动flink程序时,需要对数据库相应表做一个快照,将全表的数据拿到flink处理对应源码位置io.debezium.connector.postgresql.spi.Snapshotter
default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;"; } default OptionalsnapshotTableLockingStatement(Duration lockTimeout, Set tableIds) { String lineSeparator = System.lineSeparator(); StringBuilder statements = new StringBuilder(); statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator); tableIds.forEach((tableId) -> { statements.append("LOCK TABLE ").append(tableId.toDoubleQuotedString()).append(" IN ACCESS SHARE MODE;").append(lineSeparator); }); return Optional.of(statements.toString()); }
可以看出快照需要锁表(exporter除外),IN ACCESS SHARE MODE说明锁表不影响插入读写,但是如果有全表更新 *** 作, 会被阻塞。
开启串行,只读的事务。Snapshotter子类有这几种配置的实现,有兴趣的可以看看。
debezium.slot.name = 'pgsql_source_li2'
这就是flink-cdc创建的逻辑复制槽。
使用flink-cdc碰到的一些问题 1 能不能保证EXACTLY_ONCE一致性要求假设在默认snapshot.mode=initial下在第一次启动程序时,会对数据路进行快照读,读取当前全量数据,后面根据记录的偏移量继续读取数据,这样既不丢失数据,也不重复读,是保证了EXACTLY_ONCE一致性的。即使flink程序重启,在启动的时候指定savePoint Path也可以继续之前的偏移量,读取到未接收的数据。
这里分享一个技巧,flink在本地Idea运行没有提供设置savepPoint的方法。
***org.apache.flink.client.deployment.executors.LocalExecutor#execute***方法中断点设置
如果我们对数据开窗计算,那么乱序可能导致窗口提前关闭导致数据丢失,而在对表做快照时,会将全表数据全部拿到flink处理,就很可能导致乱序数据产生,那么flink-cdc有没有解决这个问题呢,我们知道waterMark时周期成的(源码位置org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator#onProcessingTime),
一种解决思路时,在waterMark还没生成之前,将全部快照数据处理掉,那么就不会丢失数据。对于单一slot是单线程处理任务的,如果突然来了一批数据,那么生成waterMark的任务必须等到这批数据全部处理完毕才能继续。因此在批数据未处理完毕时,尽管批数据乱序,但是不存在窗口关闭,丢失数据问题
如果有多并行度多槽生成watermrk呢?
多并行度情况下,数据被分散到多个槽,并且不再是一次处理一批数据,处理数据和waterMark会一起生成,如果一次读一批,就可能会有丢数据的风险
因此从读数据源到设置waterMark设置单并行度1,那么就可以避免数据乱序导致的丢失数据问题
看下io.debezium.connector.postgresql.spi.Snapshotter#buildSnapshotQuery快照查询的sql
public OptionalbuildSnapshotQuery(TableId tableId) { StringBuilder q = new StringBuilder(); q.append("SELECt * FROM "); q.append(tableId.toDoubleQuotedString()); return Optional.of(q.toString()); }
很遗憾,并未提供排序的接口。
但是就没办法了吗?
还记得之前的自定义快照custom吗。
继承InitialSnapshotter自定义快照做简单排序
import io.debezium.connector.postgresql.snapshot.InitialSnapshotter; import io.debezium.relational.TableId; import jdk.nashorn.internal.runtime.options.Option; import java.util.Optional; public class OrderSnapShoter extends InitialSnapshotter { @Override public OptionalbuildSnapshotQuery(TableId tableId) { Optional sql = super.buildSnapshotQuery(tableId); return Optional.of(sql.get() + "order by id"); } }
配置改一下
'debezium.snapshot.mode' = 'custom' 'debezium.snapshot.custom.class' = 'xxx.OrderSnapShoter'
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)