Flink getRuntimeContext().getMa
2019-12-09 本文已影响0人
shengjk1
我们都知道,当使用 获取 Mapstate 的时候
public void open(Configuration cfg) {
state = getRuntimeContext().getMapState(
new MapStateDescriptor<>("sum", MyType.class, Long.class));
}
跟进,进入 DefaultKeyedStateStore
@Override
public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
requireNonNull(stateProperties, "The state properties must not be null");
try {
stateProperties.initializeSerializerUnlessSet(executionConfig);
//关键性方法,获得到原始的 state
MapState<UK, UV> originalState = getPartitionedState(stateProperties);
//返回一个包装之后的 MapState
return new UserFacingMapState<>(originalState);
} catch (Exception e) {
throw new RuntimeException("Error while getting state", e);
}
}
我们一起看一下,它是如何获取原始 state 的,跟进到 AbstractKeyedStateBackend
@Override
public <N, S extends State> S getPartitionedState(
final N namespace,
final TypeSerializer<N> namespaceSerializer,
final StateDescriptor<S, ?> stateDescriptor) throws Exception {
checkNotNull(namespace, "Namespace");
/*
如果 stateDescriptor name 与最新的 lastName 相同,则将最新的 state 返回
如若第一次访问,lastName==null
*/
if (lastName != null && lastName.equals(stateDescriptor.getName())) {
lastState.setCurrentNamespace(namespace);
return (S) lastState;
}
/*
第一次 previous ==null ,再次获取直接从缓冲中返回
*/
InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
if (previous != null) {
lastState = previous;
lastState.setCurrentNamespace(namespace);
lastName = stateDescriptor.getName();
return (S) previous;
}
//第一次会创建 对应的columnFamily,并返回相应的Rockdb State 对象 如 RocksDBListState
final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
//对 lastName 赋值
lastName = stateDescriptor.getName();
lastState = kvState;
kvState.setCurrentNamespace(namespace);
return state;
}
如果不是第一次访问,则直接从缓存中获取,若为第一个则创建。我们一起来看一下,具体的创建方法
@Override
@SuppressWarnings("unchecked")
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {
checkNotNull(namespaceSerializer, "Namespace serializer");
checkNotNull(keySerializerProvider, "State key serializer has not been configured in the config. " +
"This operation cannot use partitioned state.");
InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
//创建 state
kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
//添加至缓存
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
//将 state 注册到相应的 task 中,具体是 task run的时候用的
publishQueryableStateIfEnabled(stateDescriptor, kvState);
}
return (S) kvState;
}
继续看一下 kvState 具体是如何创建的
public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc,
KeyedStateBackend<K> stateBackend,
TtlTimeProvider timeProvider) throws Exception {
Preconditions.checkNotNull(namespaceSerializer);
Preconditions.checkNotNull(stateDesc);
Preconditions.checkNotNull(stateBackend);
Preconditions.checkNotNull(timeProvider);
return stateDesc.getTtlConfig().isEnabled() ?
new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
namespaceSerializer, stateDesc, stateBackend, timeProvider)
.createState() :
stateBackend.createInternalState(namespaceSerializer, stateDesc);
}
咱们就以 stateBackend.createInternalState 为例,二者有很多公用的逻辑
继续跟进至 RocksDBMapState
@SuppressWarnings("unchecked")
static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBMapState<>(
registerResult.f0,
registerResult.f1.getNamespaceSerializer(),
(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),
(Map<UK, UV>) stateDesc.getDefaultValue(),
backend);
}
至此为止 RocksDBMapState 创建完成,也就是说至此,第一次调用生成的 MapState 已完成。即
mapState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("sum", MyType.class, Long.class));
对应的 MapState 已生成,该方法调用完毕。