Flink SQL 扩展维表 Keyby 的三种实现方式
背景
Flink LookupTableSource 通过使用流数据的一列或者多列的值,加载外部存储数据(维表数据),进而完成对流数据的字段扩展。在维表数据不频繁变更的情况下,为提高系统的处理能力,通常将流表数据缓存到TM内存中。
当前,Flink SQL 维表Join 生成的 Operator 数据下发方式为 Forward,意味着每个subTask中缓存着相同的数据,此时缓存命中率较低。如果把维表Join的key作为Hash的条件,这样就能保证下游每一个算子缓存不同的维表数据,从而有效提升缓存命中率。
我们希望,在DDL语句中新增属性信息来控制加载维表数据,是否进行KeyBy功能。当Join多张维表时,根据表对应属性信息,选择是否进行Key操作。
AST 转换过程
FlinkStreamProgram 定义了一些列优化规则,应用在执行树的各个阶段。维表JOIN涉及的主要阶段包含temporal_join_rewrite、logical、physical、physical_rewrite,physical_rewrite 主要是对最终的物理执行树节点添加一些Trait,例如ChangelogMod,MiniBatchInterval等。不同阶段生成的关系表达式树:
初始阶段 重写temporal_join阶段 逻辑优化阶段 物理优化阶段 最终生成的执行树
实现方法一
在physical_rewrite阶段添加优化规则。基于Flink 1.13.1版本进行扩展,以Join 多张mysql维表为例,完成维表KeyBy功能。
-
新增 LookupJoinHashRule 优化规则,添加到FlinkStreamRuleSets#PHYSICAL_REWRITE阶段。
在 PHYSICAL_REWRITE 阶段添加是因为,Flink对FlinkRelDistribution Trait的处理是创建了
StreamPhysicalExchange 物理执行节点,我们只需要在形成的物理执行计划的StreamPhysicalLookupJoin 节点前增加 StreamPhysicalExchange 即可。 -
为 JdbcDynamicTableFactory 新增 lookup.enable_hash 属性信息,进行KeyBy控制。
public static final ConfigOption<String> LOOKUP_ENABLE_HASH =
ConfigOptions.key("lookup.enable_hash")
.stringType()
.defaultValue("false")
.withDescription("Dimension table join enable hash.");
- 在 CommonPhysicalLookupJoin 新增获取维表 TableIdentifier 的方法。这样才能从CatalogManager中获取表的元数据信息。
CommonPhysicalLookupJoin#getTableIdentifier
def getTableIdentifier():ObjectIdentifier={
val tableIdentifier: ObjectIdentifier = temporalTable match {
case t: TableSourceTable => t.tableIdentifier
case t: LegacyTableSourceTable[_] => t.tableIdentifier
}
tableIdentifier
}
LookupJoinHashRule代码:
public class LookupJoinHashRule extends RelOptRule {
public static LookupJoinHashRule INSTANCE = new LookupJoinHashRule();
private LookupJoinHashRule() {
// note: 当前规则仅适用于 StreamPhysicalLookupJoin 节点。
super(operand(StreamPhysicalLookupJoin.class, any()), "LookupJoinHashRule");
}
@Override
public boolean matches(RelOptRuleCall call) {
ObjectIdentifier tableIdentifier = ((StreamPhysicalLookupJoin) call.rel(0)).getTableIdentifier();
CatalogManager catalogManager = call.getPlanner().getContext().unwrap(FlinkContext.class).getCatalogManager();
CatalogManager.TableLookupResult tableLookupResult = catalogManager.getTable(tableIdentifier).get();
// note: 读取维表的属性信息
Map<String, String> options = tableLookupResult.getTable().getOptions();
String enabledHash = options.getOrDefault(JdbcDynamicTableFactory.LOOKUP_ENABLE_HASH.key(), JdbcDynamicTableFactory.LOOKUP_ENABLE_HASH.defaultValue());
return BooleanUtils.toBoolean(enabledHash);
}
@Override
public void onMatch(RelOptRuleCall relOptRuleCall) {
RelNode streamPhysicalLookupJoin = relOptRuleCall.rel(0);
JoinInfo joinInfo = ((StreamPhysicalLookupJoin) streamPhysicalLookupJoin).joinInfo();
//note: 构建 FlinkRelDistribution Trait
FlinkRelDistribution requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys, true);
//note: 为StreamPhysicalLookupJoin的输入节点新增StreamPhysicalExchange
RelNode hashInput = FlinkExpandConversionRule.satisfyDistribution(
FlinkConventions.STREAM_PHYSICAL(),
((StreamPhysicalLookupJoin) streamPhysicalLookupJoin).getInput(),
requiredDistribution
);
// note: 使用新的物理执行节点
relOptRuleCall.transformTo(streamPhysicalLookupJoin.copy(streamPhysicalLookupJoin.getTraitSet(), Arrays.asList(hashInput)));
}
}
运行测试
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);
tableEnvironment.executeSql("CREATE TABLE kafka_table (\n" +
" user_id int,\n" +
" order_amount bigint,\n" +
" sname String,\n" +
" log_ts TIMESTAMP(3),\n" +
" proctime as PROCTIME()" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.kafka.max.poll.records' = '1',\n" +
" 'properties.max.poll.records ' = '1',\n" +
" 'topic' = 'mqTest02',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'latest-offset'\n" +
")");
// note: 开启HASH
tableEnvironment.executeSql("CREATE TABLE jdbc_table2 (\n" +
" id int,\n" +
" name varchar,\n" +
" description STRING,\n" +
" catalog STRING\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'scan.partition.column' = 'id',\n" +
" 'scan.partition.num' = '2',\n" +
" 'lookup.enable_hash' = 'true',\n" +
" 'scan.partition.lower-bound' = '1',\n" +
" 'scan.partition.upper-bound' = '1000',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/mqTest?useUnicode=true&characterEncoding=utf-8',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'table-name' = 'test1'\n" +
")");
// note: 不开启HASH
tableEnvironment.executeSql("CREATE TABLE jdbc_table3 (\n" +
" id int,\n" +
" name varchar,\n" +
" description STRING,\n" +
" catalog STRING\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'scan.partition.column' = 'id',\n" +
" 'scan.partition.num' = '2',\n" +
" 'lookup.enable_hash' = 'false',\n" +
" 'scan.partition.lower-bound' = '1',\n" +
" 'scan.partition.upper-bound' = '1000',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/mqTest?useUnicode=true&characterEncoding=utf-8',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'table-name' = 'test2'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE fs_table (\n" +
" id bigint,\n" +
" name STRING,\n" +
" s3Name STRING,\n" +
" order_amount bigint,\n" +
" description STRING\n" +
") WITH (\n" +
"'connector' = 'print'" +
")");
tableEnvironment.executeSql("INSERT INTO fs_table select s1.user_id,s2.name,s3.name,s1.order_amount,s2.description " +
" from kafka_table s1 " +
" join jdbc_table2 FOR SYSTEM_TIME AS OF s1.proctime AS s2 " +
" ON s1.user_id=s2.id " +
" join jdbc_table3 FOR SYSTEM_TIME AS OF s1.proctime AS s3 " +
" ON s1.user_id=s3.id" +
"");
}
两张维表都开启Hash操作后,运行在Yarn上的拓扑图:
两张维表都开启HASH
一张维表开启Hash,一张未开启Hash情况下,运行在Yarn上的拓扑图:
一张维表开启HASH
实现方法二
在ExecNode转Transformation时进扩展。修改执行节点 CommonExecLookupJoin 在 translateToPlanInternal 中添加 PartitionTransformation,这种方式形成的的物理执行计划树和不进行hash生成的数结构一样。
public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
RelOptTable temporalTable = temporalTableSourceSpec.getTemporalTable(planner);
// validate whether the node is valid and supported.
validate(temporalTable);
final ExecEdge inputEdge = getInputEdges().get(0);
RowType inputRowType = (RowType) inputEdge.getOutputType();
RowType tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
RowType resultRowType = (RowType) getOutputType();
validateLookupKeyType(lookupKeys, inputRowType, tableSourceRowType);
boolean isAsyncEnabled = false;
UserDefinedFunction userDefinedFunction =
LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
UserDefinedFunctionHelper.prepareInstance(
planner.getTableConfig().getConfiguration(), userDefinedFunction);
if (userDefinedFunction instanceof AsyncTableFunction) {
isAsyncEnabled = true;
}
boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
StreamOperatorFactory<RowData> operatorFactory;
if (isAsyncEnabled) {
operatorFactory =
createAsyncLookupJoin(
temporalTable,
planner.getTableConfig(),
lookupKeys,
(AsyncTableFunction<Object>) userDefinedFunction,
planner.getRelBuilder(),
inputRowType,
tableSourceRowType,
resultRowType,
isLeftOuterJoin);
} else {
operatorFactory =
createSyncLookupJoin(
temporalTable,
planner.getTableConfig(),
lookupKeys,
(TableFunction<Object>) userDefinedFunction,
planner.getRelBuilder(),
inputRowType,
tableSourceRowType,
resultRowType,
isLeftOuterJoin,
planner.getExecEnv().getConfig().isObjectReuseEnabled());
}
Transformation<RowData> inputTransformation =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
// note: 新增 partitionTransformation
int[] hashKeys = lookupKeys.keySet().stream().mapToInt(key -> key).toArray();
final RowDataKeySelector keySelector =
KeySelectorUtil.getRowDataSelector(hashKeys, InternalTypeInfo.of(inputRowType));
final StreamPartitioner<RowData> partitioner =
new KeyGroupStreamPartitioner<>(
keySelector, DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
final Transformation<RowData> partitionTransformation =
new PartitionTransformation<>(inputTransformation, partitioner);
// note: 并行度比上一个多2
partitionTransformation.setParallelism(inputTransformation.getParallelism() + 2);
OneInputTransformation<RowData, RowData> inputTransform = new OneInputTransformation<>(
partitionTransformation,
getDescription(),
operatorFactory,
InternalTypeInfo.of(resultRowType),
partitionTransformation.getParallelism());
inputTransform.setParallelism(partitionTransformation.getParallelism());
inputTransform.setOutputType(InternalTypeInfo.of(resultRowType));
return inputTransform;
return transformation;
}
生成的拓扑图:
维表hash.png
实现方法三
在logical阶段为节点添加FlinkRelDistribution特质,在physical阶段该特质生成 StreamPhysicalExchange。在StreamPhysicalLookupJoinRule中将FlinkLogicalRel中的默认FlinkRelDistribution Trait,替换成 hash。这样在对物理执行节点优化时,会为该Trait 生成Exchange 节点。
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalLookupJoinRule#doTransform
private def doTransform(
join: FlinkLogicalJoin,
input: FlinkLogicalRel,
temporalTable: RelOptTable,
calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = {
val joinInfo = join.analyzeCondition
val cluster = join.getCluster
val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
// note: 使用该方法获取维表配置信息, 是否进行HASH判断
val options = temporalTable.asInstanceOf[TableSourceTable].catalogTable.getOptions;
// note: 生成hash Distribution
val requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys, true)
val requiredTrait = input.getTraitSet
.replace(requiredDistribution) // 替换 FlinkRelDistributionTraitDef
.replace(FlinkConventions.STREAM_PHYSICAL)
val convInput = RelOptRule.convert(input, requiredTrait)
new StreamPhysicalLookupJoin(
cluster,
providedTrait,
convInput,
temporalTable,
calcProgram,
joinInfo,
join.getJoinType)
}
生成的拓扑图
维表hash