flink设置watermark以及事件时间字段源码分析
背景
1.1、提取时间戳字段,用于事件事件语义处理数据
1.2、设置水位线(水印)watermark
TimestampAssigner 核心接口介绍
TimestampAssigner 时间分配器接口 实现类关系图:提取时间戳字段方法:
TimestampAssigner 时间戳分配器, 提取数据流中的时间戳字段,
AssignerWithPeriodicWatermarks  //周期性的生成水印
AssignerWithPunctuatedWatermarks //打断式的生成,也就是可以每一条数据都生成
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成
AscendingTimestampExtractor //升序数据周期性生成
IngestionTimeExtractor  //进入flink系统时间分配器TimestampAssigner 实现类
AssignerWithPeriodicWatermarks  //周期性的生成水印
AssignerWithPunctuatedWatermarks //打断式的生成,也就是可以每一条数据都生成
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成
AscendingTimestampExtractor //升序数据周期性生成
IngestionTimeExtractor  //进入flink系统时间分配器设置时间戳、水印方法
DataStream类设置时间戳的方法:assignTimestampsAndWatermarks,指定watermark
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
        // match parallelism to input, otherwise dop=1 sources could lead to some strange
        // behaviour: the watermark will creep along very slowly because the elements
        // from the source go to each extraction operator round robin.
        final int inputParallelism = getTransformation().getParallelism();
        final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
        TimestampsAndPeriodicWatermarksOperator<T> operator =
                new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                .setParallelism(inputParallelism);
    }1、AssignerWithPeriodicWatermarks接口:
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
    @Nullable
    Watermark getCurrentWatermark();
}
    官方实现类:
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成 AscendingTimestampExtractor //升序数据周期性生成 IngestionTimeExtractor //进入flink系统时间分配器
因此我们一般选择使用实现类即可
2、AscendingTimestampExtractor 周期性生成watermark,升序数据
//实现类提取时间戳字段方法,调用者实现
  public abstract long extractAscendingTimestamp(T element);//根据数据流时间戳,计算watermark的时间戳 --升序处理数据
@Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
    //数据流中获取的时间戳
        final long newTimestamp = extractAscendingTimestamp(element);
        //如果当前数据的时俱戳大于当前已知的时间戳中的,则更新watermark中的时间戳
        if (newTimestamp >= this.currentTimestamp) {
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
        //否则打印日志
            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }
打印日志处理方法:
public static final class LoggingHandler implements MonotonyViolationHandler {
        private static final long serialVersionUID = 1L;
        private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);
        @Override
        public void handleViolation(long elementTimestamp, long lastTimestamp) {
            LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);
        }
    }获取当前watermark的方法
    @Override
    public final Watermark getCurrentWatermark() {
        //默认延迟1毫秒
        //如果当前时间戳等于Long.MIN_VALUE 则返回Long.MIN_VALUE,否则返回最大时间戳-1
        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
    }AscendingTimestampExtractor 继承 AscendingTimestampExtractor
@PublicEvolving
@Deprecated
public abstract class AscendingTimestampExtractor<T>
    extends org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor<T> {
}
3、BoundedOutOfOrdernessTimestampExtractor 周期性的乱序数据
1、在创建对象时,默认给了一个最大的时间戳, Long.MIN_VALUE + this.maxOutOfOrderness;
2、来一条数据,判断当前时间戳和最大时间戳的大小,如果当前时间戳大于最大时间戳,则更新
3、生成watermark,用最大时间戳减去最大延迟,也就是watermark中的时间戳调慢的时间,比如原本是3点结束的窗口,延迟为1分钟,那么watermark中的时间应该为2分59秒
4、默认是 Long.MIN_VALUE是防止出现最大的时间戳减去最大延迟为负数,watermark中的时间戳为负数,出现时间倒转
BoundedOutOfOrdernessTimestampExtractor 有参构造函数:
    /** The current maximum timestamp seen so far. */
    private long currentMaxTimestamp; //截至目前最大的时间戳
    /** The timestamp of the last emitted watermark. */
    private long lastEmittedWatermark = Long.MIN_VALUE; //上次watermark中时间戳
    /**
     * The (fixed) interval between the maximum seen timestamp seen in the records
     * and that of the watermark to be emitted.
     */
    private final long maxOutOfOrderness; //最大延迟时间
    //构造函数  maxOutOfOrderness为乱序可容忍的最大程度,单位可以为milliseconds  seconds minutes等等
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        //如果延迟时间小于0,抛出异常
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        //最大延迟转换为毫秒数
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        //计算最大的默认的时间戳 防止数据溢出,这里要要加上最大延迟
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }
实现类重写提取时间戳字段的方法: 调用者使用,提取出指定字段的数据并返回当前时间戳的大小
public abstract long extractTimestamp(T element);extractTimestamp重载方法,用于更新最大的时间戳,每来一条数据进行判断
@Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        //获取当前数据的时间戳大小
        long timestamp = extractTimestamp(element);
        //如果当前数据的时间戳大小大于目前最大的时间戳,则赋值
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        //如果当前数据的时间戳小于目前最大的时间戳,则不变
        return timestamp;
    }获取watermark中的时间戳:
@Override
public final Watermark getCurrentWatermark() {
    // this guarantees that the watermark never goes backwards.
    //当前时间的最大时间戳 - 最大延迟时间 =watermark中的时间戳
    long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
    // 如果当前的最大时间戳延迟后的时间戳大于上次的watermark中的时间戳,则更新watermark
    if (potentialWM >= lastEmittedWatermark) {
        lastEmittedWatermark = potentialWM;
    }
    return new Watermark(lastEmittedWatermark);
}4、接口AssignerWithPunctuatedWatermarks
每一条数据都生成watermark的接口
    public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
    @Nullable
    Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}没有实现类,需我们自己实现
 public static class Test implements AssignerWithPunctuatedWatermarks {
        /**
         * 生成Watermark
         *
         * @param lastElement        上一条数据
         * @param extractedTimestamp 水印的时间戳
         * @return
         */
        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(Object lastElement, long extractedTimestamp) {
            return null;
        }
        //提取时间戳字段
        @Override
        public long extractTimestamp(Object element, long previousElementTimestamp) {
            return 0;
        }
    }
Watermark类介绍
@PublicEvolving
public final class Watermark extends StreamElement {
    /** The watermark that signifies end-of-event-time. */
    public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
    // ------------------------------------------------------------------------
    /** The timestamp of the watermark in milliseconds. */
    private final long timestamp;
    /**
     * Creates a new watermark with the given timestamp in milliseconds.
     */
    //构造函数,传入时间戳
    public Watermark(long timestamp) {
        this.timestamp = timestamp;
    }
    /**
     * Returns the timestamp associated with this {@link Watermark} in milliseconds.
     */
    //获取当前水印的时间戳大小
    public long getTimestamp() {
        return timestamp;
    }
    // ------------------------------------------------------------------------
    @Override
    public boolean equals(Object o) {
        return this == o ||
                o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp;
    }
    @Override
    public int hashCode() {
        return (int) (timestamp ^ (timestamp >>> 32));
    }
    @Override
    public String toString() {
        return "Watermark @ " + timestamp;
    }
}
总结:
1、Watermark可以理解为一个带着时间戳的空数据或者带着时间戳的标志数据,和其他数据一样,一条一条的处理
2、Watermark只能一直递增
3、Watermark计算方式为当前时间戳减去延迟时间 ,实现窗口延迟
4、window的执行由watermark触发,watermark机制结合window实现
5、升序数据-AscendingTimestampExtractor
乱序数据-BoundedOutOfOrdernessTimestampExtractor
6、BoundedOutOfOrdernessTimestampExtractor比AscendingTimestampExtractor区别就在于,使用了一个最大的时间戳的值,
来对每个数据进行判断,大于则更新,不大于则不更新。而AscendingTimestampExtractor后面的数据如果小于则会出现预警日志
以上仅为个人学习时的理解,如果不确定,麻烦大佬指正!
