2020-08-28

2020-08-28  本文已影响0人  loukey_j

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class FlinkCheckpointTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment steamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
steamEnv.enableCheckpointing(1000L*2);
steamEnv
.addSource(new FSource()).setParallelism(4)
.transform("开始事务", Types.STRING,new FStart()).setParallelism(1)
.process(new FCombine()).name("事务预处理").setParallelism(4)
.addSink(new FSubmit()).name("提交事务").setParallelism(1)
;
steamEnv.execute("test");
}

static class FSource extends RichParallelSourceFunction<String>{
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
int I =0;
while (true){
I = I + 1;
sourceContext.collect(Thread.currentThread().getId() +"-" +I);
Thread.sleep(1000);
}
}
@Override
public void cancel() {}
}

static class FStart extends AbstractStreamOperator<String> implements OneInputStreamOperator<String,String>{
   volatile Long ckid = 0L;
    @Override
    public void processElement(StreamRecord<String> streamRecord) throws Exception {
        String value = streamRecord.getValue() + "-" + ckid;
        streamRecord.replace(value);
        log("收到数据: " + value + "..ckid:" + ckid);
        output.collect(streamRecord);
    }
    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        log("开启事务: " + checkpointId);
        ckid = checkpointId;
        super.prepareSnapshotPreBarrier(checkpointId);
    }
}

static class FCombine extends ProcessFunction<String,String> implements CheckpointedFunction {
    List ls = new ArrayList<String>();
    Collector<String> collector =null;
    volatile Long ckid = 0L;

    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        StringBuffer sb = new StringBuffer();
        ls.forEach(x->{sb.append(x).append(";");});
        log("批处理 " + functionSnapshotContext.getCheckpointId() + ": 时收到数据:" + sb.toString());
        Thread.sleep(5*1000);
        collector.collect("["+sb.toString() + "-"+ckid+"]");
        ls.clear();
        log("批处理 " + functionSnapshotContext.getCheckpointId() + " 完成");
        //Thread.sleep(5*1000);
        //Thread.sleep(20*1000);
    }
    @Override
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {        }
    @Override
    public void processElement(String s, Context context, Collector<String> out) throws Exception {
        if(StringUtils.isNotBlank(s)){
            ls.add(s);
        }
        log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" + ckid);
        if(collector ==null){
            collector = out;
        }
    }
}

static class FSubmit extends RichSinkFunction<String> implements  CheckpointedFunction, CheckpointListener {
    List ls = new ArrayList<String>();
    volatile Long ckid = 0L;
    @Override
    public void notifyCheckpointComplete(long l) throws Exception {
        ckid = l;
        StringBuffer sb = new StringBuffer();
        ls.forEach(x->{sb.append(x).append("||");});
        log("submit notifyCheckpointComplete " + l + " over data:list size" + ls.size()+ "; detail" + sb.toString());
        Thread.sleep(100000);
        ls.clear();
    }
    @Override
    public void invoke(String value, Context context) throws Exception {
        if(StringUtils.isNotBlank(value)){
            ls.add(value);
        }
        log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" + ckid);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        StringBuffer sb = new StringBuffer();
        ls.forEach(x->{sb.append(x).append("||");});
        log("submit snapshotState " + context.getCheckpointId() + " over data:list size" + ls.size()+ "; detail" + sb.toString());
        ls.clear();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

    }
}
public static void log(String s){
    String name = Thread.currentThread().getName();
    System.out.println(new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())+":"+name + ":" + s);
}

}

上一篇 下一篇

猜你喜欢

热点阅读