1.状态计算概述
2.flink state
(1)flink state是什么?
state就是task/operator计算的中间结果
无状态计算:相同的输入得到相同的结果,只根据输入的数据无需借助其他数据就可以计算出我们想要的结果
有状态计算:相同的输入得到可能是不同的结果,计算过程会需要中间结果或者历史结果进行联合处理。
(2)有状态计算的场景
(3)state状态的类型
Manageed State & Raw State
从flink收接管角度分类:
1)ManageState(托管状态)是Flink自动管理的State(推荐使用)
2)RawState(原始状态)是原生态State需要用户自己管理
从是否与key相关分类:
ManageState:
1)keyed state 都是基于keyedstream,只有keyedstream可以使用,与key绑定
2)operator state 是非keyedstream时候使用,与operator绑定
RawState:operator state
keyedstate:
operatestate:
flink状态管理-ManageState-KeyedState-ValueState
1.value state案例
总结:使用valuestate可以帮助我们存储keyedstream中key对象的value数据,而且无需关心kv之间的映射。
代码:
package cn.itcast.flink.state.keyedstate
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
object ValueStateDemo {
def main(args: Array[String]): Unit = {
// 创建流运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度,cpu核数
env.setParallelism(1)
// 使用socket数据源
val socketDs: DataStream[String] = env.socketTextStream("127.0.0.1", 9999)
// 使用keyBy分组
val tupleDs: DataStream[(String, Int)] = socketDs.map(line => {
val arr: Array[String] = line.split(",")
(arr(0), arr(1).trim.toInt)
})
// 分组
val keyStream: KeyedStream[(String, Int), Tuple] = tupleDs.keyBy(0)
//keyStream.maxBy(1)
// 使用valuestate来存储两两比较之后的最大值,新数据到来之后如果比原来的数据大,则把该值更新为状态值,保证状态中一直存储的是最大值
// 需要通过上下文来获取keyedstate valuestate
val maxDs: DataStream[(String, Int)] = keyStream.map(
// 使用richfunction操作,需要通过上下文
new RichMapFunction[(String, Int), (String, Int)] {
// 声明一个valuestate(不是创建),value state无需要关心key是谁以及kv之间的映射,flink维护
var maxValueState: ValueState[Int] = _
// 通过上下文才能获取真正的state,上下文这种操作在执行一次的方法中使用并获取真正的状态对象
override def open(parameters: Configuration): Unit = {
// 定义一个state描述其 参数:state的名称,数据类型的字节码文件
val maxValueDesc: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("myValue", classOf[Int])
// 根据上线文基于描述器获取state
maxValueState = getRuntimeContext.getState(maxValueDesc)
}
// 业务逻辑,可以获取到state数据
override def map(value: (String, Int)): (String, Int) = {
// value是一条新数据,需要与原来最大值(valuestate)进行比较判断
// 获取valuestate中的数据
val maxNumInState: Int = maxValueState.value()
if (value._2 > maxNumInState) {
maxValueState.update(value._2)
}
(value._1, maxValueState.value())
}
}
)
// 打印数据
maxDs.print()
// 启动
env.execute()
}
}
2.map state案例
与valuestate类似,只是更改了描述器。
代码:
package cn.itcast.flink.state.keyedstate
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
object MapStateDemo {
def main(args: Array[String]): Unit = {
// 创建流运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 加载数据
val collectionDs: DataStream[(String, Int)] = env.fromCollection(List(
("java", 1),
("scala", 1),
("go", 1),
("java", 1),
("java", 2),
("go", 2),
("java", 1),
("scala", 2)
))
// 数据分组
val keyStream: KeyedStream[(String, Int), Tuple] = collectionDs.keyBy(0)
// 数据转换
val mapStateDs: DataStream[(String, Int)] = keyStream.map(
new RichMapFunction[(String, Int), (String, Int)] {
var sumMapState: MapState[String, Int] = null
override def open(parameters: Configuration): Unit = {
val mapStateDesc: MapStateDescriptor[String, Int] = new MapStateDescriptor[String, Int]("sumMap", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))
sumMapState = getRuntimeContext.getMapState(mapStateDesc)
}
override def map(value: (String, Int)): (String, Int) = {
val key: String = value._1
val stateValue: Int = sumMapState.get(key)
sumMapState.put(key, stateValue + value._2)
(key, sumMapState.get(key))
}
}
)
// 打印数据
mapStateDs.print()
// 启动
env.execute()
}
}
总结:
上面mapstate案例也可以使用valuestate实现,选择不同的数据结构主要是考虑取数据方便以及结合你的业务。
flink状态管理-RawState-OperatorState-ListState
operatorState
总结:
1.operator staet 需要实现checkpointfunction
重写两个方法:
a:初始化状态的方法:initalizeState,通过定义liststate描述器来获取状态
b:集合checkpoint用法,snapshotState,我们定义把之前历史数据清空,然后加入新的state数据
2.run方法中我们通过liststate来获取之前的存储的state数据
listState
案例:模仿kafkaconsumer定义一个operator state
需求:自定义一个source实现checkpointedfunction,定义一个liststate存储我们source中自定义的一个偏移量数据(没法送一条数据该值(offset)加1);让程序出现异常看是否能从liststate中恢复该偏移量数据继续发送。
步骤:
1.获取执行环境
2.设置检查点机制;路径,重启策略
3.自定义数据源
(1)需要继承SourceFunction和CheckpointedFunction
(2)设置liststate,通过上下文对象context获取
(3)数据处理,保留offset
(4)制作快照
4.数据打印
5.触发执行
总结:
1.实现operator state需要实现Checkpoint的function重写两个方法,一个是初始化方法,另一个是进行Checkpoint操作时我们把之前状态清空,加入新的状态数据
2.ListState获取其中数据,使用get获取到iterator,遍历iterator可以获取到其中的数据