Flink-全局快照、还原、两阶段提交

Flink-全局快照、还原、两阶段提交,第1张

Flink-全局快照、还原、两阶段提交

全局快照原理及过程
  • 快照原理
    • 屏障原理
    • 对齐机制
  • 事务-两阶段提交
    • 简介
    • 过程
    • 2pc优缺点
      • 优点
      • 缺点
    • flink基于2PC应用
      • flink与kafka集成中2PC的具体流程

快照原理

1.Chandy-Lamport算法的一种变体,称为异步屏障快照
2.容错性机制- 屏障(barrier)实现原理
主要是持续产生快照的方式实现的,快照主要包含两部分数据,一部分是数据流,另一部分是状态数据。
对应的快照机制有两个组成部分,一个是屏障(Barrier),一个是状态(State)。因为数据在DAG中流动,所以要想快照,需要满足,这个时刻之前的数据全部处理完,之后的数据一个都没有处理

屏障原理

流屏障(stream barrier),插入到数据流中,并随着数据流动,带有快照id。
一旦一个sink的operator收到所有输入数据流的屏障n,会向checkpoint的协调器发送快照n确认。当所有的sink都确认了快照n,系统认为当前n的快照已经完成

对齐机制

快的数据流会存入到 operator 的input buffer中,等所有屏障都到了之后,再先处理buffer中的数据,再处理

事务-两阶段提交 简介
  1. 是最基础的分布式一致性协议
  2. 引入了一个中心节点来同意所有节点的执行逻辑和进度,(coordinator),其他节点叫做参与者
过程

请求阶段

  1. 协调者向所有参与者发送准备请求和事务内容,询问是否可以准备事务提交,并等待参与者的响应。
  2. 参与者执行事务中的 *** 作,并记录undo日志(用于回滚)和redo日志(用于重放),但是不真正提交。
  3. 参与者向协调者返回刚才事务的执行结果,执行成功返回yes,否则返回no.

提交阶段(分成成功和失败两种情况)
若所有的参与者都返回yes,说明事务可以提交。

  1. 协调者向所有参与者发送commit请求。
  2. 参与者收到commit 请求后,将事务真正的提交上去,并释放占用的事务资源,并向协调者返回ack。
  3. 协调者收到所有参与者ack消息,事务成功完成。
    若有参与者返回no或者超时未返回,说明事务中断,需要回滚。
  4. 协调者向所有参与者发送rollback请求。
  5. 参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack。
  6. 协调者收到所有参与者的ack消息,事务回滚完成。
2pc优缺点 优点

原理简单,容易理解及实现

缺点
  1. 协调者存在单点问题
  2. 大并发下有阻塞问题,因为是同步过程
  3. 存在commit失败导致的不一致问题
flink基于2PC应用

flink的内部意图检查点机制和轻量级分布式快照算法ABS 保证exactly once .。二我们要实现端到端的精确一次的输出逻辑,则需要施加以下两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。
TwoPhaseCommitSinkFunction (基于2PC),帮助我们做了一些基础的工作

flink 官方推荐所有需要保证exactly once 的sink 逻辑都继承该抽象类。它具体定义如下四个抽象方法。需要我们去在子类中实现。

	//开始一个事务,返回事务信息的句柄
	protected abstract TXN beginTransaction() throws Exception;
	//预提交(即提交请求)阶段的逻辑   
	protected abstract void preCommit(TXN transaction) throws Exception;
    //正式提交阶段的逻辑
    protected abstract void commit(TXN transaction);
    //取消事务
    protected abstract void abort(TXN transaction);
flink与kafka集成中2PC的具体流程

(kafka0.11及以上,才支持幂等producer以及事务性,从而2PC才有存在的意义)
kafka 的事务和幂等性参考。

1.flink实现

	//FlinkKafkaProducer011.commit()方法实际上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事务。
	@Override
    protected void commit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                transaction.producer.commitTransaction();
            } finally {
                recycleTransactionalProducer(transaction.producer);
            }
        }
    }
//该方法的调用点位于 TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中,顾名思义,当所有的检查点都成功后,会调用这个方法。
@Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        Iterator>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
        checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
        Throwable firstError = null;
        while (pendingTransactionIterator.hasNext()) {
            Map.Entry> entry = pendingTransactionIterator.next();
            Long pendingTransactionCheckpointId = entry.getKey();
            TransactionHolder pendingTransaction = entry.getValue();
            if (pendingTransactionCheckpointId > checkpointId) {
                continue;
            }

            LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

            logWarningIfTimeoutAlmostReached(pendingTransaction);
            try {
                commit(pendingTransaction.handle);
            } catch (Throwable t) {
                if (firstError == null) {
                    firstError = t;
                }
            }
            LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
            pendingTransactionIterator.remove();
        }
        if (firstError != null) {
            throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
                firstError);
        }
    }


从代码中可以看出,该方法每次从正在等待提交的事务句柄中取出一个,检查他的检查点ID,并调用commit()方法提交,这个阶段流程图为

可见,只有在所有的检查点都成功的这个前提下,写入才会成功。这符合前文描述2PC的流程。其中jobmanager为协调者,各个算子为参与者,并且中有sink一个参与者会执行提交。一旦有了检查点失败,notifyCheckpointComplete()方法不会执行,如果重试不成功的化。最后会调用abort()方法回滚事务。

@Override
    protected void abort(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            transaction.producer.abortTransaction();
            recycleTransactionalProducer(transaction.producer);
        }
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5687916.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存