Flink实践

flink之jobgraph---JobGraph

2019-11-26  本文已影响0人  神奇的考拉

关于JobGraph的解读

关于源码

一。定义的属性或字段

/** List of task vertices included in this job graph. */
// 定义当前job相关的vertex,采用链表的方式存储,采用vertextid作为key,保障vertexid在job唯一
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();

/** The job configuration attached to this job. */
// 定义当前job的配置信息
private final Configuration jobConfiguration = new Configuration();

/** ID of this job. May be set if specific job id is desired (e.g. session management) */
// job id用于区别不同的job
private final JobID jobID;

/** Name of this job. */
// job执行时的名称
private final String jobName;

/** The number of seconds after which the corresponding ExecutionGraph is removed at the
 * job manager after it has been executed. */
// 指定对应的ExecutionGraph在jobmanager执行有效期多久,超时后会被removed。 
private long sessionTimeout = 0;

/** flag to enable queued scheduling */
// job schedule使用采用queue的方式按序执行
private boolean allowQueuedScheduling;

/** The mode in which the job is scheduled */
// job被执行模式:默认lazy模式
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;

// --- checkpointing ---
// 以下进行checkpoint设置
/** Job specific execution config */
// 执行job execution相关配置
private SerializedValue<ExecutionConfig> serializedExecutionConfig;

/** The settings for the job checkpoints */
// 设置当前job的checkpoint设置
private JobCheckpointingSettings snapshotSettings;

/** Savepoint restore settings. */
// 设置job的savepoint的恢复设置:常用语job恢复,默认为none
private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();

// --- attached resources ---
// job执行时需要的一些附加的设置
/** Set of JAR files required to run this job. */
// 当用户使用附加的jar时 可以通过操作userJars来指定额外jar的路径
private final List<Path> userJars = new ArrayList<Path>();

/** Set of custom files required to run this job. */
// 当job执行需要一些额外的自定义files,通过userArtifacts来进行配置
private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();

/** Set of blob keys identifying the JAR files required to run this job. */
private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();

/** List of classpaths required to run this job. */
// 指定job时需要指定的classpath
private List<URL> classpaths = Collections.emptyList();

二。关于方法或函数

在JobGraph有两个构造JobGprah的函数
构造函数一:

/**
 * Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed),
 * the given name and the given execution configuration (see {@link ExecutionConfig}).
 * The ExecutionConfig will be serialized and can't be modified afterwards.
 *
 * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
 * @param jobName The name of the job.
 */
 // 通过指定的jobId和jobName构造jobgraph:未指定jobid或jobname时,都有对应的默认值
 // 针对该job来设置execution config相关配置内容,默认直接产生初始的
 // 1.关于jobid随机生成:jobid可人工指定,也可自动随机生成 
 // 2.关于jobname未指定:直接使用“(unnamed job)”代替
 // 3.设置ExecutionConfig:通常用来配置program执行的行为,比较常用配置选项:
 //  3.1 parallelism:program执行默认的并行度;
 //  3.2 retries:执行时失败重试的次数
 //  3.3 delay:通常是和retires有关,两次重试需要的间隔,延迟时间
 //  3.4 execution mode:通常是batch和pipelined。默认是pipelined
 //  3.5 是否开启“closure cleaner”:关于“closure cleaner”用于对function实现进行预处理,比如“closure”是匿名的存在类的内部(内部类),它会移除一些未使用的“closure”的引用,来修复一些和序列化有关的问题并减少“closure”的大小
 // 3.6 注册type和serializer:通常为了提升处理“泛型”和“pojo”的性能,不仅仅需要返回声明的类型,还包括这些类型的subclass才会需要我们手动进行type和serializer注册/声明。
public JobGraph(JobID jobId, String jobName) {
   this.jobID = jobId == null ? new JobID() : jobId;
   this.jobName = jobName == null ? "(unnamed job)" : jobName;

   try {
      setExecutionConfig(new ExecutionConfig());
   } catch (IOException e) {
      // this should never happen, since an empty execution config is always serializable
      throw new RuntimeException("bug, empty execution config is not serializable");
   }
}

构造函数二:

// 从source开始来构建topology
// 1.根据jobgraph中已被注册的vertex构建一个list用于
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
   // early out on empty lists
   if (this.taskVertices.isEmpty()) {
      return Collections.emptyList();
   }

   List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
   Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());

   // start by finding the vertices with no input edges
   // and the ones with disconnected inputs (that refer to some standalone data set)
   // 首先找到jobgraph中已注册的source vertex 
   {
     // 通过set来将重复的vertex进行去重 防止某个vertex重复被记录 形成循环
     // 故而使用LinkedHashMap来存放对应的vertex 以链表的形式存储
     // 1.首先遍历set中每个jobvertex
     // 2.判断该jobvertex是否有input 没有的话 即认为是source
     // 3.将所有的source存放list中,同时将对应的set中该jovvertex remove,防止重复记录
      Iterator<JobVertex> iter = remaining.iterator();
      while (iter.hasNext()) {
         JobVertex vertex = iter.next();

         if (vertex.hasNoConnectedInputs()) {
            sorted.add(vertex);
            iter.remove();
         }
      }
   }

   int startNodePos = 0;

   // traverse from the nodes that were added until we found all elements
   // 在前面的迭代循环中提取所有的source以及孤立的dataset,接下来要对sort中剩余的非source或孤立的dataset进行处理
   // 1.通过移动startNodePos的值 防止jobgraph形成循环: 不超过list中记录的source及孤立dataset个数
   // 2.以下操作的目的就是将jobgraph中属于source或孤立dataset提取出来,最终在streamgraph基础上形成jobgraph的topology
   while (!remaining.isEmpty()) {

      // first check if we have more candidates to start traversing from. if not, then the
      // graph is cyclic, which is not permitted
      // 首先要保证开始遍历的node位置仍处于list中,否则会导致对应的jobgraph就变循环的,是不能形成DAG topology的
      if (startNodePos >= sorted.size()) {
         throw new InvalidProgramException("The job graph is cyclic.");
      }

      JobVertex current = sorted.get(startNodePos++);
      addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
   }

   return sorted;
}

// 
// 1.首先收到所有的source输出的IntermediateDataSet
// 2.接着循环遍历每个IntermediateDataSet对应的consumer:JobEdge
// 3.再获取对应的JobEdge输出的target,并检查其target是否存在set中,不存在需要抛弃对应的jobedge
//  3.1 针对在set中的target ,则需要获取该target所有的input,同时需要移除重复的JobEdge
//  3.2 通过前面的3.1循环遍历出set中vertex没有predecessor的添加到target
//  3.3 循环迭代针对每个jobvertex进行如上的处理 直至所有的jobvertex遍历完
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
  
   // forward traverse over all produced data sets and all their consumers
   // 以当前的jobvertex为主,获取其相关的输出结果IntermediateDataset,
   // 接着获取每个IntermediateDataset对应的consumer:JobEdge
   // 最终获取每个jobedge的输出接收的目标vertex:jobvertex(也就是我们需要遍历提取的jobvertex)
   // 接下来再对获取到的jobvertex判断是否存在set集合中
   // 不存在的 则直接忽略
   // 存在的则需要 需要进行逆向检查,从目标target的JobVertex开始逆向寻找“长辈级”jobvertex(需要剔除当前jobvertex,因为目标jobvertex就是从当前jobvertex获取的,没有必要重复检查)
   for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
      for (JobEdge edge : dataSet.getConsumers()) {

         // a vertex can be added, if it has no predecessors that are still in the 'remaining' set
         JobVertex v = edge.getTarget();
         if (!remaining.contains(v)) {
            continue;
         }

         boolean hasNewPredecessors = false;
          
         // 要检查
         for (JobEdge e : v.getInputs()) {
            // skip the edge through which we came
            // 防止jobedge重复,主要还是为了剔除从当前jobvertex流向目标jobvertex,没有必要重复检查
            if (e == edge) {
               continue;
            }

            // 获取该jobedge的source
            // 获取目标jobvertex所有的关联的jobedge对应的IntermediateDataSets
           // 这样就可以获取目标jobvertex对应的提供source的vertex
            IntermediateDataSet source = e.getSource();
            if (remaining.contains(source.getProducer())) {
               hasNewPredecessors = true;
               break;
            }
         }
         
         // 添加存在set中且没有predecessor的jobvertex,同时需要完成添加到target中并清除set中记录
         if (!hasNewPredecessors) {
            target.add(v);
            remaining.remove(v);
            addNodesThatHaveNoNewPredecessors(v, target, remaining);
         }
      }
   }
}

三。构建jobgraph

// 从source开始来构建topology
// 1.根据jobgraph中已被注册的vertex构建一个list用于
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
   // early out on empty lists
   if (this.taskVertices.isEmpty()) {
      return Collections.emptyList();
   }

   List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
   Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());

   // start by finding the vertices with no input edges
   // and the ones with disconnected inputs (that refer to some standalone data set)
   // 首先找到jobgraph中已注册的source vertex 
   {
     // 通过set来将重复的vertex进行去重 防止某个vertex重复被记录 形成循环
     // 故而使用LinkedHashMap来存放对应的vertex 以链表的形式存储
     // 1.首先遍历set中每个jobvertex
     // 2.判断该jobvertex是否有input 没有的话 即认为是source
     // 3.将所有的source存放list中,同时将对应的set中该jovvertex remove,防止重复记录
      Iterator<JobVertex> iter = remaining.iterator();
      while (iter.hasNext()) {
         JobVertex vertex = iter.next();

         if (vertex.hasNoConnectedInputs()) {
            sorted.add(vertex);
            iter.remove();
         }
      }
   }

   int startNodePos = 0;

   // traverse from the nodes that were added until we found all elements
   // 在前面的迭代循环中提取所有的source,接下来要对sort中剩余的非source进行处理
   // 1.通过移动startNodePos的值 防止jobgraph形成循环: 不超过list中记录的source个数
   // 2.
   while (!remaining.isEmpty()) {

      // first check if we have more candidates to start traversing from. if not, then the
      // graph is cyclic, which is not permitted
      if (startNodePos >= sorted.size()) {
         throw new InvalidProgramException("The job graph is cyclic.");
      }

      JobVertex current = sorted.get(startNodePos++);
      addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
   }

   return sorted;
}

// 
// 1.首先收到所有的source输出的IntermediateDataSet
// 2.接着循环遍历每个IntermediateDataSet对应的consumer:JobEdge
// 3.再获取对应的JobEdge输出的target,并检查其target是否存在set中,不存在需要抛弃对应的jobedge
//  3.1 针对在set中的target ,则需要获取该target所有的input,同时需要移除重复的JobEdge
//  3.2 通过前面的3.1循环遍历出set中vertex没有predecessor的添加到target
//  3.3 循环迭代针对每个jobvertex进行如上的处理 直至所有的jobvertex遍历完
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {

   // forward traverse over all produced data sets and all their consumers
   for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
      for (JobEdge edge : dataSet.getConsumers()) {

         // a vertex can be added, if it has no predecessors that are still in the 'remaining' set
         JobVertex v = edge.getTarget();
         if (!remaining.contains(v)) {
            continue;
         }

         boolean hasNewPredecessors = false;

         for (JobEdge e : v.getInputs()) {
            // skip the edge through which we came
            // 防止jobedge重复
            if (e == edge) {
               continue;
            }

            // 获取该jobedge的source
            IntermediateDataSet source = e.getSource();
            if (remaining.contains(source.getProducer())) {
               hasNewPredecessors = true;
               break;
            }
         }
         
         // 添加存在set中且没有predecessor的jobvertex,同时需要完成添加到target中并清除set中记录
         if (!hasNewPredecessors) {
            target.add(v);
            remaining.remove(v);
            addNodesThatHaveNoNewPredecessors(v, target, remaining);
         }
      }
   }
}

四。jobgraph结构

jobgraph

五。源码

JobGraph.java源码

上一篇下一篇

猜你喜欢

热点阅读