茶叶电子商务网站建设的结论,称多县公司网站建设,广东工程承包网站,中国免费建设网站网址背景
TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇#xff0c;但是想要实现精准一次的输出#xff0c;实际使用中需要注意几个方面#xff0c;否则不仅仅达不到精准一次输出#xff0c;反而可能导致数据丢失但是想要实现精准一次的输出实际使用中需要注意几个方面否则不仅仅达不到精准一次输出反而可能导致数据丢失连至少一次的语义都不能达到
TwoPhaseCommitSinkFunction注意事项
TwoPhaseCommitSinkFunction是通过在两阶段提交协议实现的事务大概简化为一下步骤 1 在收到检查点分隔符的时候开启事务并把记录都写到开启的事务中 2. 开始进行状态的保存时把检查点id对应的事务结束掉做好准备提交的准备并开启下一个事务
public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder ! null,bug: no transaction object when performing state snapshot);long checkpointId context.getCheckpointId();LOG.debug({} - checkpoint {} triggered, flushing transaction {},name(),context.getCheckpointId(),currentTransactionHolder);//当前检查点对应的事务做好准备比如进行stream.flush等准备好提交事务preCommit(currentTransactionHolder.handle);// 把当前检查点id对应的事务添加到状态中pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug({} - stored pending transactions {}, name(), pendingCommitTransactions);currentTransactionHolder beginTransactionInternal();LOG.debug({} - started new transaction {}, name(), currentTransactionHolder);// 把当前检查点id对应的事务添加到状态中state.clear();state.add(new State(this.currentTransactionHolder,new ArrayList(pendingCommitTransactions.values()),userContext));}
收到检查点完成的通知notify方法提交第二步中检查点id对应的事务注意这一步不是每次flink在进行检查点的时候都会通知这种情况下某一次的notify方法就需要把前几次的事务一起进行提交了另外如果提交某个检查点的事务失败那么应用会重启并且在重启后的initSnapshot方法中再次进行事务提交如果还是失败这个过程一直持续 public final void notifyCheckpointComplete(long checkpointId) throws Exception {IteratorMap.EntryLong, TransactionHolderTXN pendingTransactionIterator pendingCommitTransactions.entrySet().iterator();Throwable firstError null;while (pendingTransactionIterator.hasNext()) {Map.EntryLong, TransactionHolderTXN entry pendingTransactionIterator.next();Long pendingTransactionCheckpointId entry.getKey();TransactionHolderTXN pendingTransaction entry.getValue();if (pendingTransactionCheckpointId checkpointId) {continue;}LOG.info({} - checkpoint {} complete, committing transaction {} from checkpoint {},name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {// 提交事务commit(pendingTransaction.handle);} catch (Throwable t) {//事务失败时记录异常后面会把异常抛出导致应用重启if (firstError null) {firstError t;}}LOG.debug({} - committed checkpoint transaction {}, name(), pendingTransaction);// 事务成功后移除当前的事务pendingTransactionIterator.remove();}if (firstError ! null) {// 事务提交失败会抛出异常导致job异常中止throw new FlinkRuntimeException(Committing one of transactions failed, logging first encountered failure,firstError);}}总结
1。事务不能提交失败如果失败会导致作业失败然后重新提交如果最终没有成功提交那么数据会丢失 2。数据库服务端的事务超时时间不能设置太短不能仅仅大于检查点的间隔大小原因是上面说的flink有可能丢失检查点完成后的通知消息所以服务端的事务超时时间要设置的足够大.