使用flink 中遇到的问题总结

2021-02-22  本文已影响0人  卫渐行


 * @Author:wenwei
 * @Date : 2020/9/8 22:15
 * 自定义分桶的规则
 * 1:按照什么格式定义文件名,默认为yyyy-MM-dd-HH
public class CustomBucketAssigner<IN> implements BucketAssigner<IN, String> {

    private static final long serialVersionUID = 1L;

    private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";

    private final   String formatString;

    private final ZoneId zoneId;

    private transient DateTimeFormatter dateTimeFormatter;

     * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.
    public CustomBucketAssigner() {

     * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.
     * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
     *                     the bucket id.
    public CustomBucketAssigner(String formatString) {
        this(formatString, ZoneId.systemDefault());

     * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.
     * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
    public CustomBucketAssigner(ZoneId zoneId) {
        this(DEFAULT_FORMAT_STRING, zoneId);

     * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.
     * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
     *                     the bucket path.
     * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
    public CustomBucketAssigner(String formatString, ZoneId zoneId) {
        this.formatString = Preconditions.checkNotNull(formatString);
        this.zoneId = Preconditions.checkNotNull(zoneId);
    public String getBucketId(IN element, BucketAssigner.Context context) {
        if (dateTimeFormatter == null) {
            dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
        return "p_data_day="+dateTimeFormatter.format(Instant.ofEpochMilli(context.currentWatermark()));

    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;

    public String toString() {
        return "DateTimeBucketAssigner{" +
                "formatString='" + formatString + '\'' +
                ", zoneId=" + zoneId +


问题二: flink 如何准确的划分窗口的?

如何正确定义window的窗口时间,保证数据都会准确的按照事件分区,不会将前一天的数据,落入到下一个时间分区里面;可以参考windows 中的源码,其中定义start时间,值得参考

     * Method to get the window start for a timestamp.
     * @param timestamp epoch millisecond to get the window start. 事件发生的时间 
     * @param offset The offset which window start would be shifted by.  定义TumblingEventTimeWindows 设置云讯的offset的值,默认都为零
     * @param windowSize The size of the generated windows.  窗口大小
     * @return window start
    例如 windows Size = 5s  ,offset = 0 ; 例如当前的 timestamp = 2s ; 7s 
    2 - (2-0+5) % 5 = 0 ,
    7 - (7 - 0 + 5) % 5 = 5 , 
    例如 windows Size = 7s  ,offset = 0 ; 例如当前的 timestamp = 2s ; 7s 
    2 - (2-0+7) % 7 = 0;
    7 - (7-0+7)%7= 7
    例如 windows Size = 5s  ,offset = 1s ; 例如当前的 timestamp = 2s ; 7s 
    2 - (2-1+5) % 5 = 1 ,
    7 - (7 - 0 + 5) % 5 = 6 , 
    例如 windows Size = 7s  ,offset = 0 ; 例如当前的 timestamp = 2s ; 7s 
    2 - (2-1+7) % 7 = 1;
    7 - (7-1+7)%7= 8
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;

问题三 : 由于数据量不断增大,解析IP地址的时候,导致文件句柄过多;

public class Ip2regionSingleton {

    private static Logger logger = LoggerFactory.getLogger(Ip2regionSingleton.class);

    private static Ip2regionSingleton instance = new Ip2regionSingleton();

    private static DbConfig config;
    private static DbSearcher searcher;

    public DbSearcher getSearcher() {
        return searcher;

    // 私有化构造方法
    private Ip2regionSingleton() {

        String path = Ip2regionSingleton.class.getResource("/").getPath();
        String dbPath =  path + "plugins/ip2region.db";
        File file = new File(dbPath);

        logger.info("singleton count:{}","-------------------------------------------------------");

        if ( file.exists()  ) {

                config = new DbConfig();
                searcher = new DbSearcher(config, dbPath);

            }catch (Exception e){

    public static Ip2regionSingleton getInstance() {
        return instance;


问题四: 如何解决flink pom文件中 ,包依赖的问题;

问题五: 如何保证flink中的,端到端数据的一致性,顺序性;


问题六: 如何保证在无事件数据更新的时候,更新watermark的值,然后触发窗口的计算

  private static class CustomWatermarksPeriodc<T> implements AssignerWithPeriodicWatermarks<ActivityInfoDO> {
        private static final long serialVersionUID = 1L;
        private Long allowDelayTime = 30000L;

        public long extractTimestamp(ActivityInfoDO topic, long l) {
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

                return System.currentTimeMillis();
            LocalDateTime localDateTime = LocalDateTime.parse(topic.getPush_time(), formatter);
            logger.info("extractTimestamp,currentWatermark:{}",localDateTime );
            return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();


        public Watermark getCurrentWatermark() {
            logger.info("getCurrentWatermark, currentWatermark:{}",System.currentTimeMillis() - allowDelayTime);
            return new Watermark(System.currentTimeMillis() - allowDelayTime);


问题八:sink to mysql 的时候,经常报错


flink 对于不是大规模的中间态的管理,可以选用 fsStateBackend ;StateBackend fsStateBackend = new FsStateBackend(parameter.get("flink.state.path"));

 StateTtlConfig   ttlConfig = StateTtlConfig
//                .cleanupInRocksdbCompactFilter(1000L)


提醒,使用的是flink 1.9的版本

