-- 创建数据源表映射 create table source( id bigint, name STRING )with (); -- 创建目标表映射 create table sink( id bigint, name STRING, PRIMARY KEY (id) NOT ENFORCED )with (); -- 将source表的数据插入到sink表 insert into sink select * from source;问题:我执行完这个之后把原来sink表中的数据覆盖了 原因:因为我们在DDL中定义了主键PRIMARY KEY,所以导致该任务触发了upsert *** 作。
看官方给的定义:如果在 DDL 上定义了主键,则 JDBC sink 以 upsert 模式运行,用于与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。
官方链接:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
总结:所以说不是insert into导致的数据覆盖,而是因为我们在DDL中定义主键出发upsert *** 作的问题。
upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE *** 作编码为 upsert message,将 DELETE *** 作编码为 delete message ,将具有唯一键的动态表转换为流。
消费流的算子需要知道唯一键的属性,以便正确地应用 message。
与 retract 流的主要区别在于 UPDATE *** 作是用单个 message 编码的,因此upsert的效率比retraction的效率更高。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)