FLINK CDC 源码 & 时序图 一

2022-03-24  本文已影响0人  loukey_j

FLINK CDC 源码
时序文件在 https://www.processon.com/view/623d93751efad40756c5ab8b

FLINK CDC 源码时序图-对外.png

SourceEvent

SourceSplit

SplitEnumerator

MySqlSourceEnumerator

MySqlSplitAssigner

MySqlBinlogSplitAssigner

MySqlSnapshotSplitAssigner

MySqlHybridSplitAssigner

final int splitMetaGroupSize; 来源于 chunk-meta.group.size 这个配置 默认是 1000
boolean isBinlogSplitAssigned;
final MySqlSnapshotSplitAssigner snapshotSplitAssigner;

createBinlogSplit 异同

private MySqlBinlogSplit createBinlogSplit() {
        try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) {
            return new MySqlBinlogSplit(
                    BINLOG_SPLIT_ID,
                    currentBinlogOffset(jdbc),
                    BinlogOffset.NO_STOPPING_OFFSET,
                    new ArrayList<>(),
                    new HashMap<>(),
                    0);
        } catch (Exception e) {
            throw new FlinkRuntimeException("Read the binlog offset error", e);
        }
    }

 private MySqlBinlogSplit createBinlogSplit() {
        final List<MySqlSnapshotSplit> assignedSnapshotSplit =
                snapshotSplitAssigner.getAssignedSplits().values().stream()
                        .sorted(Comparator.comparing(MySqlSplit::splitId))
                        .collect(Collectors.toList());

        Map<String, BinlogOffset> splitFinishedOffsets =
                snapshotSplitAssigner.getSplitFinishedOffsets();
        final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();

        BinlogOffset minBinlogOffset = null;
        for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
            // find the min binlog offset
            BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
            if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
                minBinlogOffset = binlogOffset;
            }
            finishedSnapshotSplitInfos.add(
                    new FinishedSnapshotSplitInfo(
                            split.getTableId(),
                            split.splitId(),
                            split.getSplitStart(),
                            split.getSplitEnd(),
                            binlogOffset));
        }

        // the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and
        // then transfer them

        boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
        return new MySqlBinlogSplit(
                BINLOG_SPLIT_ID,
                minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
                BinlogOffset.NO_STOPPING_OFFSET,
                divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
                new HashMap<>(),
                finishedSnapshotSplitInfos.size());
    }

SourceReader

InputStatus

SourceReaderBase

SingleThreadMultiplexSourceReaderBase

MySqlSourceReader

RecordsWithSplitIds

fetchers 和 source reader 之间数据传递的接口

MySqlRecords

MySqlSplitState

MySqlBinlogSplitState

MySqlSnapshotSplitState

SourceSplit

MySqlSplit

MySqlBinlogSplit

MySqlSnapshotSplit

SplitReader

SplitFetcher

Source

MySqlSource

WatermarkOutput

SourceOutput

ReaderOutput

MySqlSplitSerializer

SplitEnumeratorContext

PendingSplitsState

BinlogPendingSplitsState

SnapshotPendingSplitsState

HybridPendingSplitsState

ChunkSplitter

AssignerStatus

/**
 * The state of split assigner finite state machine, tips: we use word status instead of word state
 * to avoid conflict with Flink state keyword. The assigner finite state machine goes this way.
 *
 * <pre>
 *        INITIAL_ASSIGNING(start)
 *              |
 *              |
 *          onFinish()
 *              |
 *              ↓
 *    INITIAL_ASSIGNING_FINISHED(end)
 *              |
 *              |
 *        suspend() // found newly added tables
 *              |
 *              ↓
 *          SUSPENDED --- wakeup() --→ NEWLY_ADDED_ASSIGNING --- onFinish() --→ NEWLY_ADDED_ASSIGNING_FINISHED(end)
 *              ↑                                                                  |
 *              |                                                                  |
 *              |----------------- suspend() //found newly added tables -----------|
 * </pre>
 */
上一篇 下一篇

猜你喜欢

热点阅读