flink 计算资源 solt、Operator Chains、

2019-10-11  本文已影响0人  邵红晓

Chains 概念

  1. 上下游的并行度一致
  2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
  3. 上下游节点都在同一个 slot group 中(下面会解释 slot group)
  4. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
  5. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
  6. 两个节点间数据分区方式是 forward(该分区器将记录转发给在本地运行的下游的(归属于subtask)的operattion)
  7. 用户没有禁用 chain
  1. GlobalPartitioner,全局分区器,默认选择了索引为0的channel进行输出,数据倾斜。
  2. ForwardPartitioner,该分区器将记录转发给在本地运行的下游的(归属于subtask)的operator
  3. ShufflePartitioner,该分区器会在所有output channel中选择一个随机的进行输出。
public class ShufflePartitioner<T> extends StreamPartitioner<T> 
@Override
    public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
            int numberOfOutputChannels) {
        returnArray[0] = random.nextInt(numberOfOutputChannels);
        return returnArray;
    }
  1. HashPartitioner
    hash分区器,该分区器对key进行hash后计算得到channel索引。它通过构造器获得KeySelector的实例(该实例用来获取当前记录的key)。
  2. BroadcastPartitioner
    广播分区器,用于将该记录广播给下游的所有的subtask
  3. RebalancePartitioner
    重平衡分区器,正儿八经的解决数据倾斜的神器,所有数据都会采用被均衡的通过轮询的方式分配给所有下游channel
  4. RescalePartitioner
    根据平行度对数据进行分区,数据回被平行1分2给下游channel,不存在轮询round-robin

task manager solt概念

槽位共享组 SlotSharingGroup 与 CoLocationGroup(迭代流使用)

flink计算资源管理

private class ResourceActionsImpl implements ResourceActions {

        @Override
        public void releaseResource(InstanceID instanceId, Exception cause) {
            validateRunsInMainThread();

            ResourceManager.this.releaseResource(instanceId, cause);
        }

        @Override
        public Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) {
            validateRunsInMainThread();
            return startNewWorker(resourceProfile);
        }

        @Override
        public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
            validateRunsInMainThread();

            JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
            if (jobManagerRegistration != null) {
                jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
            }
        }
    }
/**
     * Gets the location preferences of the vertex's current task execution, as determined by the locations
     * of the predecessors from which it receives input data.
     * If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
     * method returns {@code null} to indicate no location preference.
     *
     * @return The preferred locations based in input streams, or an empty iterable,
     *         if there is no input-based preference.
     */
    public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
        // otherwise, base the preferred locations on the input connections
        if (inputEdges == null) {
            return Collections.emptySet();
        }
        else {
            Set<CompletableFuture<TaskManagerLocation>> locations = new HashSet<>(getTotalNumberOfParallelSubtasks());
            Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(getTotalNumberOfParallelSubtasks());

            // go over all inputs
            for (int i = 0; i < inputEdges.length; i++) {
                inputLocations.clear();
                ExecutionEdge[] sources = inputEdges[i];
                if (sources != null) {
                    // go over all input sources
                    for (int k = 0; k < sources.length; k++) {
                        // look-up assigned slot of input source
                        CompletableFuture<TaskManagerLocation> locationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
                        // add input location
                        inputLocations.add(locationFuture);
                        // inputs which have too many distinct sources are not considered
                        if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
                            inputLocations.clear();
                            break;
                        }
                    }
                }
                // keep the locations of the input with the least preferred locations
                if (locations.isEmpty() || // nothing assigned yet
                        (!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
                    // current input has fewer preferred locations
                    locations.clear();
                    locations.addAll(inputLocations);
                }
            }

            return locations.isEmpty() ? Collections.emptyList() : locations;
        }
    }
image.png

总结:

  1. 建议将 Number of slots per TaskManager 数设置为operator中最高并行度
  2. flink默认开启solt共享可以充分利用cpu和内存资源
    参考
    https://blog.jrwang.me/2019/flink-source-code-jobgraph/
    http://chenyuzhao.me/2017/02/09/flink-scheduler/
    https://blog.jrwang.me/2019/flink-source-code-resource-manager/#scheduler-%E5%92%8C-slotsharingmanager
上一篇 下一篇

猜你喜欢

热点阅读