flink 找到最近,job 的checkpoint 目录,外带
2022-09-15 本文已影响0人
川流不息attitude
思路就是 根据文件 最后修改时间排序,然后 遍历 目录,找到 后就返回,是不是很简单啊,不过很多人应该不会写 其实就是递归,随带 反向生成 shell 脚本,重启 flink job 。
public class RestartJob {
public static void main(String[] args) throws IOException {
String dir = "D:\\job";
String checkpointDir = "D:\\checkpoint\\";
String flinkBin = "/opt/app/flink-1.14.5/bin";
String jarPath = "/opt/app/dby-flink-job-data.jar";
if(args != null && args.length >= 2){
if( null != args[0]){
dir = args[0];
}
if( null != args[1]){
checkpointDir = args[1];
}
if(args.length >= 3){
if( null != args[2]){
flinkBin = args[2];
}
}
if(args.length >= 4){
if( null != args[3]){
jarPath = args[3];
}
}
}
List<String> jobs = new ArrayList<>();
List<JobData> restartJob = new ArrayList<>();
File file = new File(dir);
if(file.isDirectory()){
// 获取job 存放目录
File[] listFiles = file.listFiles();
for (int i = 0; i < listFiles.length; i++) {
File listFile = listFiles[i];
Job job = JobUtils.read(listFile.getPath());
String name = job.getName();
String ck = checkpointDir + name;
// 获取所有 保存点
File[] files = new File(ck).listFiles();
List<File> collect = Arrays.stream(files).collect(Collectors.toList());
List<File> sorted = collect.stream().sorted((f1,f2) -> (int) (f1.lastModified()-f2.lastModified())).collect(Collectors.toList());
// 获取数据
//System.out.println("job"+ck);
String path = getPath(sorted);
if(!"".equals(path)){
//System.out.println(path);
JobData jobData = new JobData();
jobData.setPath(path);
jobData.setName(listFile.getAbsolutePath());
restartJob.add(jobData);
}else {
jobs.add(listFile.getAbsolutePath());
}
}
System.out.println(restartJob.size() + " " + restartJob);
System.out.println("=============");
System.out.println(jobs.size() +" " +jobs);
}
writeRestartJob(restartJob,flinkBin,jarPath);
writeStartJob(jobs,flinkBin,jarPath);
}
@Data
public static class JobData{
String path;
String name;
}
public static void writeStartJob(List<String> jobs,String flinkBin,String jarPath) {
File restartJobSh = new File("startJob.sh");
FileOutputStream fileOutputStream = null;
try {
fileOutputStream = new FileOutputStream(restartJobSh);
String line = System.getProperty("line.separator");
fileOutputStream.write("#!/bin/bash".getBytes(StandardCharsets.UTF_8));
fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
for (int i = 0; i < jobs.size(); i++) {
fileOutputStream.write((flinkBin+"/flink run "+" "+jarPath+" "+jobs.get(i)).getBytes(StandardCharsets.UTF_8));
if (i != jobs.size() - 1) {
fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
fileOutputStream.write("sleep 10s".getBytes(StandardCharsets.UTF_8));
fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
}
}
fileOutputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void writeRestartJob(List<JobData> restartJob,String flinkBin,String jarPath) {
File restartJobSh = new File("restartJob.sh");
FileOutputStream fileOutputStream = null;
try {
fileOutputStream = new FileOutputStream(restartJobSh);
String line = System.getProperty("line.separator");
fileOutputStream.write("#!/bin/bash".getBytes(StandardCharsets.UTF_8));
fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
for (int i = 0; i < restartJob.size(); i++) {
fileOutputStream.write((flinkBin+"/flink run -s "+restartJob.get(i).getPath() +" "+jarPath+" "+restartJob.get(i).getName()).getBytes(StandardCharsets.UTF_8));
if (i != restartJob.size() - 1) {
fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
fileOutputStream.write("sleep 10s".getBytes(StandardCharsets.UTF_8));
fileOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
}
}
fileOutputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static String getPath(List<File> files){
int i = files.size()-1;
while (i >= 0){
File[] listFiles = files.get(i).listFiles();
List<File> chk = Arrays.stream(listFiles).filter(e -> e.getName().startsWith("chk")).collect(Collectors.toList());
if(chk != null && chk.size() > 0){
String path = files.get(i).getPath()+"\\"+chk.get(0).getName();
return path.replace("\\","/");
}
i--;
}
return "";
}
}
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.error.YAMLException;
import java.io.FileInputStream;
import java.io.IOException;
/**
* @Author liyichuan
* @Date 2022/7/26 16:47
*/
public final class JobUtils {
private JobUtils() {
throw new IllegalStateException("Utils");
}
public static void valid(Job job) {
if (job == null) {
throw new IllegalArgumentException("配置 [job] 不能为空");
}
if (job.getParallelism() == null) {
throw new IllegalArgumentException("配置 [job.parallelism] 不能为空");
}
if (job.getParallelism() <= 0) {
throw new IllegalArgumentException("配置 [job.parallelism] 必须大于0");
}
if (job.getCheckpoint() == null) {
job.setCheckpoint(new Job.Checkpoint());
}
if (job.getCheckpoint().getInterval() == null) {
throw new IllegalArgumentException("配置 [job.checkpoint.interval] 不能为空");
}
if (job.getCheckpoint().getInterval() <= 0) {
throw new IllegalArgumentException("配置 [job.checkpoint.interval] 必须大于0");
}
if (job.getCheckpoint().getTimeout() == null) {
throw new IllegalArgumentException("配置 [job.checkpoint.timeout] 不能为空");
}
if (job.getCheckpoint().getTimeout() <= 0) {
throw new IllegalArgumentException("配置 [job.checkpoint.timeout] 必须大于0");
}
if (job.getCheckpoint().getMinPauseBetweenCheckpoints() == null) {
throw new IllegalArgumentException("配置 [job.checkpoint.minPauseBetweenCheckpoints] 不能为空");
}
if (job.getCheckpoint().getMinPauseBetweenCheckpoints() <= 0) {
throw new IllegalArgumentException("配置 [job.checkpoint.minPauseBetweenCheckpoints] 必须大于0");
}
if (job.getCheckpoint().getMaxConcurrentCheckpoints() == null) {
throw new IllegalArgumentException("配置 [job.checkpoint.maxConcurrentCheckpoints] 不能为空");
}
if (job.getCheckpoint().getMaxConcurrentCheckpoints() <= 0) {
throw new IllegalArgumentException("配置 [job.checkpoint.maxConcurrentCheckpoints] 必须大于0");
}
if (job.getSavepoint() == null) {
job.setSavepoint(new Job.Savepoint());
}
if (job.getSavepoint().getEnabled() == null) {
job.getSavepoint().setEnabled(false);
}
}
public static Job read(String jobPath) {
Job job;
try (FileInputStream in = new FileInputStream(jobPath)) {
job = new Yaml().loadAs(in, Job.class);
} catch (IOException e) {
throw new YAMLException("读取配置文件错误", e);
}
return job;
}
}
/**
* @Author liyichuan
* @Date 2022/7/26 16:47
*/
@Data
public class Job {
private String name;
private Integer parallelism = 1;
private Checkpoint checkpoint = new Checkpoint();
private Savepoint savepoint = new Savepoint();
private List<String> pipeline;
@Data
public static class Checkpoint {
private String dir;
private Long interval = 60_000L;
private Long minPauseBetweenCheckpoints = 5000L;
private Long timeout = 600_000L;
private Integer maxConcurrentCheckpoints = 1;
}
@Data
public static class Savepoint {
private Boolean enabled = true;
}
}
彩蛋 mvn 打可执行 jar 插件
命令 mvn assembly:assembly
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.example.RestartJob</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>