网站首页 > 技术文章 正文
然后我们来看代码,可以看到,首先,我们说这个dataStream,这里,数据流,我们可以直接
.assignTimestampsAndWatermarks() 这里设置waterMark
首先我们去看可以看到参数中,这里是
AssignerWithPeriodicWatermarks这个参数.
我们可以去看一下这个参数的源码
我们去看一下这个
AssignerWithPeriodicWatermarks可以看到这个是,继承至TimestampAssigner
而这个TimestampAssigner是个,可以看到意思是个时间提取器,意思就是,提取了时间戳以后,然后
把时间戳封装成一个waterMark对吧.
然后我们这里传入一个,实现了上面的TimestampAssigner这个接口的类就可以了,可以看到上面是,new
BoundedOutOfOrdernessTimestampExtractor 这个可以看到字面上是,有界的乱序时间戳提取器对吧,Orderness有序,OutOfOrderness是无序.
然后这里我们做什么?我们只需要把数据流中的时间戳,提取出来就可以了对吧?
只是提取出来就可以了吗?
注意这里的方法我们可以去看一下
BoundedOutOfOrdernessTimestampExtractor的底层
我们去看看extractTimestamp,这个提取时间戳方法,我们继续看底层
去看实现的类,
AssignerWithPeriodicWatermarks
然后再看这个接口,继承的TimestampAssigner
可以看到这个继承的最终的,TimestampAssigner中的extractTimestamp这个方法
上面有个注释说,这个参数需要是用毫秒的单位才行.
可以看到这个值,如果他提取的时候,提取不到时间值的话,他会默认给出一个最小值.
这个最小值可以看到就是Long型的最小值对吧.
然后我们去看我们的数据,我们的数据是秒为单位的
所以这里,我们element.getTimestamp() * 1000L;这样就是毫秒了.
然后我们再看,还在报错对吧.是
BoundedOutOfOrdernessTimestampExtractor这个构造方法,缺少参数
我们去看这个参数是maxOutOfOrderness对吧
这个表示乱序程度
还记得我们之前说吗?
这里要使用乱序程度最大的,才能把尽可能多的迟到数据包含进去.
比如上面5-2=3 5-3=2 所以这里就要选择3作为延迟时间对吧.
然后这里我们,就暂时随便写一个比如:
Time.seconds(2)
我们写成2秒,把延迟两秒的数据,纳入进来对吧
所以解决乱序问题实际上就是,1.首先
指定
env.setStreamTimeCharacteristic这里指定,数据的时间按照
TimeCharacteristic.EventTime,也就是数据的生成时间.
然后再就是直接
.dataStream.assignTimestampsAndWatermarks()
这个方法,中,去new
BoundedOutOfOrdernessTimestampExtractor,然后重写
extractTimestamp方法,来提取数据中的,时间戳
然后,再
BoundedOutOfOrdernessTimestampExtractor给这个构造方法,指定数据的延迟时间
就可以了.
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>(){
}) 然后我们再看,除了用上面的
BoundedOutOfOrdernessTimestampExtractor,这个以外,
我们还可以使用,new
AscendingTimestampExtractor这个类对吧,这个是干嘛的呢?
我们可以看到asc是升序对吧.
我们去看,假如数据,真的是不乱序的,上面的那样,1,2,3,4,5...这样流过来的,这个时候
我们就可以直接用这个
AscendingTimestampExtractor这个了,可以看到
他的这个构造方法中并没有乱序程度那个参数对吧,因为他不是用来处理乱序数据的
他是用来处理正常顺序数据的.
然后我们再来看,这里我写上注释
可以看到给升序数据设置waterMark就按照上面就行,就是正常顺序数据的处理.
然后我们继续看,
给乱序数据设置waterMark就是对乱序数据的处理.
然后我们再看他底层是怎么弄的呢?
我们看dataStream的
assignTimestampsAndWatermarks这个方法,里面
AssignerWithPeriodicWatermarks这个参数
可以从名字上看的出来:
AssignerWithPeriodicWatermarks这个是周期性的水位线.
就是说,每隔一定的周期就设置一个waterMark对吧,并不是,每个数据来了以后
都设置一个waterMark对吧.
然后我们再看如果我们想每来一个数据,就设置一个waterMark呢?
这样我们怎么做?
可以看到有个,
AssignerWithPunctuatedWatermarks,这个类
可以看到这个是,Punctuated断点似的添加waterMark
可以看到他也是同样继承TimestampAssigner的,里面有提取extractTimestamp时间戳的方法对吧
然后我们再看
AssignerWithPunctuatedWatermarks里面还包含一个
checkAndGetNextWatermark对吧,这个方法
这个方法有lastElement这个是流过来的上一个数据,和当前的extractedTimestamp时间戳
得到一个waterMark对吧,也就是一个数据插入一个waterMark了
然后看到这个TimestampAssigner,里面有extractTimestamp方法用来提取时间戳
可以看到waterMark中最主要的其实就是有timestamp这个时间戳对吧
然后我们再看
AssignerWithPeriodicWatermarks 这个类可以看到
里面有的方法是getCurrentWatermark对吧,这个方法可以看到,就没有参数了
因为他周期性的去插入waterMark,不需要根据上一个数据和已经提取的时间戳对吧.
然后我们可以去看看这个
AssignerWithPeriodicWatermarks周期性提取waterMark
去看一下他这个方法,底层怎么实现的
首先这个类
BoundedOutOfOrdernessTimestampExtractor,是继承至
AssignerWithPeriodicWatermarks类的,在这个类中
重要的一个变量:currentMaxTimestamp,当前最大的时间戳
然后lastEmittedWatermark 这个是上一次发出的这个waterMark对吧.
这个默认值是long型的最小值.可以看到这里他并没有用waterMark作为自己的类型,而是
直接使用的long对吧.
然后我们看这里还要式maxOutOfOrderness这个就是最大乱序程度,也就是延迟时间对吧.
然后我们来看这部分的源码,首先看构造方法,可以看到这里,
首先得到这个乱序程度maxOutOfOrderness,拿到这个乱序程度以后,然后
先记录下来.
然后再去:currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
可以看到当前最大的waterMark,这里currentMaxTimestamp 为什么开始的时候,要去
用最小的long值取加上maxOutOfOrderness.
我们知道根据之前我们的分析,在处理乱序的时候,我们需要用当前的waterMark 减去 -
最大乱序程度对吧,但是上面没有减去,而是加上了,为什么?
可以看到他是在构造方法中做的,就是为了避免,如果一开始的时候,就需要生产waterMark,
但这个时候,我们知道本来应该是,lastEmittedWatermark - maxOutOfOrderness对吧,但是
刚开始的时候,这个还没有waterMark的时候,这个lastEmittedWatermark 是long型的最小值,这个
最小值如果再去,减去maxOutOfOrderness 减去一个值的话,就会导致溢出,变成一个很大的值
了,这样就会报错.所以他这里初始化的时候,在构造方法中没有减去,而是加,为了避免报错.
然后我们还可以看到,
BoundedOutOfOrdernessTimestampExtractor中的,继承过来的这个
extractTimestamp这个是抽象的对吧,这个是需要我们去实现,然后把时间戳拿出来返回的
然后我们主要看这个,getCurrentWatermark
可以看到他的代码,跟我们之前的分析一样,就是用当前最大的waterMark,减去乱序程度
对吧,然后再判断这个减去以后得到的值,和上一个waterMark比较,是否比原来的大,如果
比原来的大,那么就把上一个waterMark更新成这个大的最新的值,如果不是,那么就
还是保持原来的waterMark不变对吧.
这个就是这个
BoundedOutOfOrdernessTimestampExtractor的waterMark生成的底层原理.
用来处理乱序数据.
然后我们再去看
AscendingTimestampExtractor,看这个
可以看到他也是实现了
AssignerWithPeriodicWatermarks周期性的waterMark对吧
可以看到
AscendingTimestampExtractor里面的getCurrentWatermark
这个升序处理的waterMark,其实就是每个数据来了后面都跟一个waterMark对吧
看看他的这个getCurrentWatermark怎么实现的,可以看到,判断
currentTimestamp看看他和long的最小值对比,如果是最小值就返回最小值,初始化的时候,用对吧
否则可以看到,就返回currentTimestamp-1对吧.
这里减1实际上就是减去1毫秒对吧.
然后我们再看一下如何我们自己去定义一个waterMark,可以看到首先
我们自己定义了一个周期性的waterMark对吧,MyPeriodicAssigner对吧
可以看到我这里定义了延迟是延迟60*1000L毫秒,也就是延迟1分钟.
然后这里getCurrentWatermark,我们返回了一个,waterMark(maxTs - bound);
其实就是当前的值,减去我们设置的周期对吧.
然后我们看一下我们实现的extractTimestamp,这里可以看到就是,从数据中获取的时间戳和
maxTs做对比,从中获取大的那个,然后赋值给,maxTs,这个最大的时间戳,然后再返回
数据的时间戳就可以了,这里maxTs拿到最新的最大值,然后减去延迟的1分钟,然后
就可以返回waterMark了.
然后我们再看一个,我们自己定义的断点式的,Assigner,可以看到首先我们还是,声明了一个延迟
时间是60 * 1000L毫秒.
,然后我们实现checkAndGetNextWaterMark,这里面可以看到,判断,上一个数据,可以看到
上一个数据的id,如果是传感器1的话,sensor_1的话,我们再返回waterMark,而且返回的时候
也是用,提取的时间戳,减去延迟的1分钟时间,如果不是sensor_1这个传感器的话,就
不添加waterMark,也就是返回null
然后我们再实现提取extractTimestamp,提取时间戳的处理,这里
直接返回数据的getTimestamp就可以了.
这样我们也实现了自定义的断点式的处理乱序数据了.
- 上一篇: 第三篇:数据类型——1基本数据类型
- 下一篇: 特气系统设计工程中常用的中英文对照表
猜你喜欢
- 2025-09-06 特气系统设计工程中常用的中英文对照表
- 2025-09-06 第三篇:数据类型——1基本数据类型
- 2025-09-06 Java语言基础(一)-java主类结构_java的主类结构
- 2025-09-06 @程序员,你真的了解内存吗?_程序员内存重要还是cpu重要
- 2025-09-06 【C语言·003】基本数据类型的字节表示与取值范围边界
- 2025-09-06 如何选择自己喜欢的颜色,这个VBA颜色选择器一定要学会
- 2025-09-06 Spring Boot3 中解决返回接口 Long 类型数据精度损失问题全解析
- 2025-09-06 C语言入门:数据类型_c语言数据类型总结
- 2025-06-24 C语言的数据类型(c语言的数据类型说明保留字)
- 2025-06-24 java语言怎么学习?(java语言入门知识)
- 最近发表
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- apt-getinstall-y (100)
- node_modules怎么生成 (87)
- chromepost (71)
- flexdirection (73)
- c++int转char (80)
- mysqlany_value (79)
- static函数和普通函数 (84)
- el-date-picker开始日期早于结束日期 (76)
- js判断是否是json字符串 (75)
- asynccallback (71)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)