一文搞懂 flink state key 的设置方式
2020-07-03 本文已影响0人
shengjk1
1. 疑问
前一篇文章 一文搞懂 Flink window 元素的顺序问题 我们已经知道了,state 的获取、更新、清除等都与 key 相关。那么 key 是如何设置的呢?
2.解释
这需要从 StreamTask 的 run 方法说起。以 OneInputStreamTask 为例,当程序启动开始消费消息时,会进行 OneInputStreamTask 的 run 方法,
@Override
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
//处理输入的消息
while (running && inputProcessor.processInput()) {
// all the work happens in the "processInput" method
}
}
最终调用的是 inputProcessor.processInput() 方法,除了生成 watermark 之外,就是往下游发送记录
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
//set KeyContext setCurrentKey
//设置 keyContext (提供了用来查询和设置 keyed operation 的 current key 的接口)
streamOperator.setKeyContextElement1(record);
//这里开始调用用户自己的代码
streamOperator.processElement(record);
}
一路追踪下去,到 AbstractStreamOperator
//自定义的 KeySelector 在此处起作用
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
if (selector != null) {
Object key = selector.getKey(record.getValue());
setCurrentKey(key);
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void setCurrentKey(Object key) {
if (keyedStateBackend != null) {
try {
// need to work around type restrictions
@SuppressWarnings("unchecked,rawtypes")
AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;
//设置 keyedStateBackend currentKey
rawBackend.setCurrentKey(key);
} catch (Exception e) {
throw new RuntimeException("Exception occurred while setting the current key context.", e);
}
}
}
然后会调用 RocksDBKeyedStateBackend
@Override
public void setCurrentKey(K newKey) {
super.setCurrentKey(newKey);
// 每个 key 都会调用一次 将 key group and key 写入 byte[] 中,每次开始写入前都会清空,后续 state 的操作都会从这个 byte[] 中读
sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex());
}
当查询、更新以及清除 state 时,由 一文搞懂 Flink window 元素的顺序问题 我们可以知道 ,有一个 serializeCurrentKeyWithGroupAndNamespace() 方法,最终进入 buildCompositeKeyNamespace
@Nonnull
public <N> byte[] buildCompositeKeyNamespace(@Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
try {
// 每次真正操作时候,serializeNamespace
serializeNamespace(namespace, namespaceSerializer);
// 将已序列化的 key_group,key,namespace 作为一个整体 copy 出来,这也就是 state key
final byte[] result = keyOutView.getCopyOfBuffer();
// 重置,类似于重置游标,去除 namespace bytes
resetToKey();
return result;
} catch (IOException shouldNeverHappen) {
throw new FlinkRuntimeException(shouldNeverHappen);
}
}
至此 state key 就设置完成了,然后就可以按照新设置的 key 进行查询了。