flink 找到最近,job 的checkpoint 目录,外带

2022-09-15  本文已影响0人  川流不息attitude

思路就是 根据文件 最后修改时间排序,然后 遍历 目录,找到 后就返回,是不是很简单啊,不过很多人应该不会写 其实就是递归,随带 反向生成 shell 脚本,重启 flink job 。

public class RestartJob {
    public static void main(String[] args) throws IOException {
        String dir = "D:\\job";
        String checkpointDir = "D:\\checkpoint\\";

        String flinkBin = "/opt/app/flink-1.14.5/bin";

        String jarPath = "/opt/app/dby-flink-job-data.jar";

        if(args != null && args.length >= 2){
            if(  null != args[0]){
                dir = args[0];
            }
            if( null != args[1]){
                checkpointDir = args[1];
            }
            if(args.length >= 3){
                if( null != args[2]){
                    flinkBin = args[2];
                }
            }

            if(args.length >= 4){
                if( null != args[3]){
                    jarPath = args[3];
                }
            }
        }

        List<String> jobs = new ArrayList<>();
        List<JobData> restartJob = new ArrayList<>();
        File file = new File(dir);

        if(file.isDirectory()){
            // 获取job 存放目录
            File[] listFiles = file.listFiles();

            for (int i = 0; i < listFiles.length; i++) {
                File listFile = listFiles[i];
                Job job = JobUtils.read(listFile.getPath());

                String name = job.getName();
                String ck = checkpointDir + name;
                // 获取所有 保存点
                File[] files = new File(ck).listFiles();

                List<File> collect = Arrays.stream(files).collect(Collectors.toList());
                List<File> sorted = collect.stream().sorted((f1,f2) -> (int) (f1.lastModified()-f2.lastModified())).collect(Collectors.toList());
                // 获取数据
                //System.out.println("job"+ck);

                String path = getPath(sorted);

                if(!"".equals(path)){
                    //System.out.println(path);
                    JobData jobData = new JobData();
                    jobData.setPath(path);
                    jobData.setName(listFile.getAbsolutePath());

                    restartJob.add(jobData);
                }else {
                    jobs.add(listFile.getAbsolutePath());
                }
            }

            System.out.println(restartJob.size() + " " + restartJob);
            System.out.println("=============");
            System.out.println(jobs.size() +" " +jobs);
        }

        writeRestartJob(restartJob,flinkBin,jarPath);
        writeStartJob(jobs,flinkBin,jarPath);
    }

    @Data
    public static class JobData{
        String path;

        String name;
    }

    public static void writeStartJob(List<String> jobs,String flinkBin,String jarPath) {
        File restartJobSh = new File("startJob.sh");
        FileOutputStream fileOutputStream = null;
        try {
            fileOutputStream = new FileOutputStream(restartJobSh);

            String line = System.getProperty("line.separator");
            fileOutputStream.write("#!/bin/bash".getBytes(StandardCharsets.UTF_8));
            fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
            for (int i = 0; i < jobs.size(); i++) {
                fileOutputStream.write((flinkBin+"/flink run "+" "+jarPath+" "+jobs.get(i)).getBytes(StandardCharsets.UTF_8));
                if (i != jobs.size() - 1) {
                    fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
                    fileOutputStream.write("sleep 10s".getBytes(StandardCharsets.UTF_8));
                    fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
                }
            }
            fileOutputStream.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public static void writeRestartJob(List<JobData> restartJob,String flinkBin,String jarPath) {
        File restartJobSh = new File("restartJob.sh");
        FileOutputStream fileOutputStream = null;
        try {
            fileOutputStream = new FileOutputStream(restartJobSh);

            String line = System.getProperty("line.separator");
            fileOutputStream.write("#!/bin/bash".getBytes(StandardCharsets.UTF_8));
            fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
            for (int i = 0; i < restartJob.size(); i++) {
                fileOutputStream.write((flinkBin+"/flink run -s "+restartJob.get(i).getPath() +" "+jarPath+" "+restartJob.get(i).getName()).getBytes(StandardCharsets.UTF_8));

                if (i != restartJob.size() - 1) {
                    fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
                    fileOutputStream.write("sleep 10s".getBytes(StandardCharsets.UTF_8));
                    fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
                }
            }
            fileOutputStream.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static String getPath(List<File> files){
        int i = files.size()-1;
        while (i >= 0){
            File[] listFiles = files.get(i).listFiles();
            List<File> chk = Arrays.stream(listFiles).filter(e -> e.getName().startsWith("chk")).collect(Collectors.toList());
            if(chk != null && chk.size() > 0){
                String path = files.get(i).getPath()+"\\"+chk.get(0).getName();
                return path.replace("\\","/");
            }
            i--;
        }
        return "";
    }
}

import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.error.YAMLException;

import java.io.FileInputStream;
import java.io.IOException;

/**
 * @Author liyichuan
 * @Date 2022/7/26 16:47
 */
public final class JobUtils {

    private JobUtils() {
        throw new IllegalStateException("Utils");
    }

    public static void valid(Job job) {
        if (job == null) {
            throw new IllegalArgumentException("配置 [job] 不能为空");
        }
        if (job.getParallelism() == null) {
            throw new IllegalArgumentException("配置 [job.parallelism] 不能为空");
        }
        if (job.getParallelism() <= 0) {
            throw new IllegalArgumentException("配置 [job.parallelism] 必须大于0");
        }
        if (job.getCheckpoint() == null) {
            job.setCheckpoint(new Job.Checkpoint());
        }

        if (job.getCheckpoint().getInterval() == null) {
            throw new IllegalArgumentException("配置 [job.checkpoint.interval] 不能为空");
        }
        if (job.getCheckpoint().getInterval() <= 0) {
            throw new IllegalArgumentException("配置 [job.checkpoint.interval] 必须大于0");
        }
        if (job.getCheckpoint().getTimeout() == null) {
            throw new IllegalArgumentException("配置 [job.checkpoint.timeout] 不能为空");
        }
        if (job.getCheckpoint().getTimeout() <= 0) {
            throw new IllegalArgumentException("配置 [job.checkpoint.timeout] 必须大于0");
        }
        if (job.getCheckpoint().getMinPauseBetweenCheckpoints() == null) {
            throw new IllegalArgumentException("配置 [job.checkpoint.minPauseBetweenCheckpoints] 不能为空");
        }
        if (job.getCheckpoint().getMinPauseBetweenCheckpoints() <= 0) {
            throw new IllegalArgumentException("配置 [job.checkpoint.minPauseBetweenCheckpoints] 必须大于0");
        }
        if (job.getCheckpoint().getMaxConcurrentCheckpoints() == null) {
            throw new IllegalArgumentException("配置 [job.checkpoint.maxConcurrentCheckpoints] 不能为空");
        }
        if (job.getCheckpoint().getMaxConcurrentCheckpoints() <= 0) {
            throw new IllegalArgumentException("配置 [job.checkpoint.maxConcurrentCheckpoints] 必须大于0");
        }
        if (job.getSavepoint() == null) {
            job.setSavepoint(new Job.Savepoint());
        }
        if (job.getSavepoint().getEnabled() == null) {
            job.getSavepoint().setEnabled(false);
        }

    }

    public static Job read(String jobPath) {
        Job job;
        try (FileInputStream in = new FileInputStream(jobPath)) {
                job = new Yaml().loadAs(in, Job.class);
        } catch (IOException e) {
                throw new YAMLException("读取配置文件错误", e);
        }
        return job;
    }

}

/**
 * @Author liyichuan
 * @Date 2022/7/26 16:47
 */
@Data
public class Job {

    private String name;

    private Integer parallelism = 1;

    private Checkpoint checkpoint = new Checkpoint();

    private Savepoint savepoint = new Savepoint();

    private List<String> pipeline;

    @Data
    public static class Checkpoint {

        private String dir;

        private Long interval = 60_000L;

        private Long minPauseBetweenCheckpoints = 5000L;

        private Long timeout = 600_000L;

        private Integer maxConcurrentCheckpoints = 1;

    }

    @Data
    public static class Savepoint {
        private Boolean enabled = true;
    }

}

彩蛋 mvn 打可执行 jar 插件

命令 mvn assembly:assembly

 <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <configuration>
            <archive>
              <manifest>
                <mainClass>org.example.RestartJob</mainClass>
              </manifest>
            </archive>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
上一篇 下一篇

猜你喜欢

热点阅读