Flink源码解析

一文搞懂 flink state key 的设置方式

2020-07-03  本文已影响0人  shengjk1

1. 疑问

前一篇文章 一文搞懂 Flink window 元素的顺序问题 我们已经知道了,state 的获取、更新、清除等都与 key 相关。那么 key 是如何设置的呢?

2.解释

这需要从 StreamTask 的 run 方法说起。以 OneInputStreamTask 为例,当程序启动开始消费消息时,会进行 OneInputStreamTask 的 run 方法,

@Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
        //处理输入的消息
        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }

最终调用的是 inputProcessor.processInput() 方法,除了生成 watermark 之外,就是往下游发送记录


                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            //set KeyContext setCurrentKey
            //设置 keyContext (提供了用来查询和设置 keyed operation 的 current key 的接口)
                            streamOperator.setKeyContextElement1(record);
            //这里开始调用用户自己的代码
                            streamOperator.processElement(record);
                        }

一路追踪下去,到 AbstractStreamOperator

//自定义的 KeySelector 在此处起作用
    private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
        if (selector != null) {
            Object key = selector.getKey(record.getValue());
            setCurrentKey(key);
        }
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    public void setCurrentKey(Object key) {
        if (keyedStateBackend != null) {
            try {
                // need to work around type restrictions
                @SuppressWarnings("unchecked,rawtypes")
                AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;
                
                //设置 keyedStateBackend currentKey
                rawBackend.setCurrentKey(key);
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while setting the current key context.", e);
            }
        }
    }

然后会调用 RocksDBKeyedStateBackend

@Override
    public void setCurrentKey(K newKey) {
        super.setCurrentKey(newKey);
        // 每个 key 都会调用一次 将 key group and key 写入 byte[] 中,每次开始写入前都会清空,后续 state 的操作都会从这个 byte[] 中读
        sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex());
    }

当查询、更新以及清除 state 时,由 一文搞懂 Flink window 元素的顺序问题 我们可以知道 ,有一个 serializeCurrentKeyWithGroupAndNamespace() 方法,最终进入 buildCompositeKeyNamespace

@Nonnull
    public <N> byte[] buildCompositeKeyNamespace(@Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
        try {
            // 每次真正操作时候,serializeNamespace
            serializeNamespace(namespace, namespaceSerializer);
            // 将已序列化的 key_group,key,namespace 作为一个整体 copy 出来,这也就是 state key
            final byte[] result = keyOutView.getCopyOfBuffer();
            // 重置,类似于重置游标,去除 namespace bytes
            resetToKey();
            return result;
        } catch (IOException shouldNeverHappen) {
            throw new FlinkRuntimeException(shouldNeverHappen);
        }
    }

至此 state key 就设置完成了,然后就可以按照新设置的 key 进行查询了。

上一篇下一篇

猜你喜欢

热点阅读