Flink(1.13) 中的分区器
前言
flink中有七大官方定义的分区器以及一个用于自定义的分区器(共八个)。
org.apache.flink.streaming.runtime.partitioner.StreamPartitioner
是所有分区器的父类,是一个抽象类
@Internal
public abstract class StreamPartitioner<T>
implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
- ChannelSelector
public interface ChannelSelector<T extends IOReadableWritable> {
/**
* Initializes the channel selector with the number of output channels.
*
* @param numberOfChannels the total number of output channels which are attached to respective
* output gate.
*/
void setup(int numberOfChannels);
/**
* Returns the logical channel index, to which the given record should be written. It is illegal
* to call this method for broadcast channel selectors and this method can remain not
* implemented in that case (for example by throwing {@link UnsupportedOperationException}).
*
* @param record the record to determine the output channels for.
* @return an integer number which indicates the index of the output channel through which the
* record shall be forwarded.
*/
int selectChannel(T record);
/**
* Returns whether the channel selector always selects all the output channels.
*
* @return true if the selector is for broadcast mode.
*/
boolean isBroadcast();
}
- 底层实现类:
-
RebalancePartitioner
(org.apache.flink.streaming.runtime.partitioner) -
RescalePartitioner
(org.apache.flink.streaming.runtime.partitioner) -
KeyGroupStreamPartitioner
(org.apache.flink.streaming.runtime.partitioner) -
GlobalPartitioner
(org.apache.flink.streaming.runtime.partitioner) -
ShufflePartitioner
(org.apache.flink.streaming.runtime.partitioner) -
ForwardPartitioner
(org.apache.flink.streaming.runtime.partitioner) -
CustomPartitionerWrapper
(org.apache.flink.streaming.runtime.partitioner) -
BroadcastPartitioner
(org.apache.flink.streaming.runtime.partitioner)
对流重新分区的几个算子
-
KeyBy
先按照key分组, 按照key的双重hash
来选择后面的分区
分区器:KeyGroupStreamPartitioner
-
shuffle
对流中的元素随机
分区
分区器:ShufflePartitioner
-
rebalance
对流中的元素平均分布
到每个区.当处理倾斜数据的时候, 进行性能优化。

若并发度一样时,就是1:1,只有上游并行度<
下游并行度时,会出现轮询

程序
@Test
public void rebalance() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 全局设置并发度
env.setParallelism(3);
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
source.print("print>>>");
// 进行轮询
source.rebalance().print("rebalance>>>");
System.out.println("-----------------------");
env.execute();
}
结果
print>>>:3> zhangsan
rebalance>>>:2> zhangsan
print>>>:1> lisi
rebalance>>>:3> lisi
print>>>:2> wangwu
rebalance>>>:1> wangwu
print>>>:3> zhaoliu
rebalance>>>:2> zhaoliu
print>>>:1> sunqi
rebalance>>>:3> sunqi
print>>>:2> tianba
rebalance>>>:1> tianba
print>>>:3> zhaosanfeng
rebalance>>>:2> zhaosanfeng
print>>>:1> abcd
rebalance>>>:3> abcd
print>>>:2> wangfefe
rebalance>>>:1> wangfefe
分区器:RebalancePartitioner
- rescale
同 rebalance一样, 也是平均循环的分布数据. 但是要比rebalance更高效
, 因为rescale不需要通过网络, 完全走的"管道"
如何理解 rescale 可以减少网络传输?
rebalance 的轮询方式

假设上游并行度为2,下游并行度为4,总发送数则是2*4=8次。
rescale的轮询方式


他们之间会进行分组,每人负责其中一部分。
源码解释
通过在输出通道中循环来平均分配数据的分区器
Partitioner that distributes the data equally by cycling through the output channels.
这仅分发到下游节点的子集,因为 {@link org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator}
在遇到 {@code RescalePartitioner} 时会实例化 {@link DistributionPatternPOINTWISE} 分发模式
This distributes only to a subset of downstream nodes because
{@linkorg.apache.flink.streaming.api.graph.StreamingJobGraphGenerator} instantiates a {@link
DistributionPattern#POINTWISE} distribution pattern when encountering {@code RescalePartitioner}.
上游操作向其发送元素的下游操作子集取决于上游和下游操作的并行度
<p>The subset of downstream operations to which the upstream operation sends elements depends on
the degree of parallelism of both the upstream and downstream operation.
例如,如果上游操作的并行度为 2,下游操作的并行度为 4,那么一个上游操作会将元素分发给两个下游操作,而另一个上游操作将分发给其他两个下游操作。
For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one
upstream operation would distribute elements to two downstream operations while the other
upstream operation would distribute to the other two downstream operations.
另一方面,如果下游操作的并行度为 2,而上游操作的并行度为 4,那么两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。
If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4
then two upstream operations will distribute to one downstream operation while the other two
upstream operations will distribute to the other downstream operations.
在不同并行度不是彼此的倍数的情况下,一个或多个下游操作将具有来自上游操作的不同数量的输入。
<p>In cases where the different parallelisms are not multiples of each other one or several
downstream operations will have a differing number of inputs from upstream operations.
总结:rescale 性能优于 rebalance
业务场景:解决某些场景下的数据倾斜问题(数据来源就是倾斜的)。
分区器:RescalePartitioner
RebalancePartitioner
// 默认为0
private int nextChannelToSendTo;
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
numberOfChannels :下游并行度,假设为3
第一次运行:nextChannelToSendTo = 1
nextChannelToSendTo = (0 + 1) % 3;
第二次运行:nextChannelToSendTo = 2
nextChannelToSendTo = (1 + 1) % 3;
第三次运行:nextChannelToSendTo = 0
nextChannelToSendTo = (2+ 1) % 3;
第四次运行:nextChannelToSendTo = 1
nextChannelToSendTo = (0+ 1) % 3;
RescalePartitioner
private int nextChannelToSendTo = -1;
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}
numberOfChannels :下游并行度,假设为3
分区方式:
0,1,2,0...
KeyGroupStreamPartitioner
核心逻辑
new KeyGroupStreamPartitioner<>(keySelector,StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException(
"Could not extract key from " + record.getInstance().getValue(), e);
}
/**
* key:按什么分组的值
* maxParallelism:最大支持的并行度 1<<7 =128(系统定义的)
* numberOfChannels:下游并行度,自己定义的。
*/
return KeyGroupRangeAssignment.assignKeyToParallelOperator(
key, maxParallelism, numberOfChannels);
}
第一次hash
public static int assignToKeyGroup(Object key, int maxParallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
// 'key.hashCode()' 对key进行一次hash,
// maxParallelism 最大并行度 1<<7 =128
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
第二次hash
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
// keyHash : key.hashCode() hash 结果
// murmurHash:使用 murmur 使数据更散列。
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
总结:对key进行两次hash
% 128
/**
* maxParallelism:最大并行度 1<<7=128
* parallelism:下游并行度
* keyGroupId :对key进行两次`hash`% maxParallelism 的结果
*/
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
案例演示
@Test
public void a(){
String key = "a";
int maxParallelism = 1 << 7;
int parallelism = 16;
int keyGroupId = MathUtils.murmurHash(key.hashCode()) % maxParallelism;
int r = keyGroupId * parallelism / maxParallelism;
System.out.println(r);
}
结果
10
通过上面程序,就能将key具体分配到某个slot中执行。下游的并行度是可以通过
.setParallelism()
进行设置。
GlobalPartitioner
ShufflePartitioner
shffle分区的逻辑比较简单
private Random random = new Random();
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}
numberOfChannels : 下游并行度,若并行度为12,那么分发范围就是1~12