优秀的编程知识分享平台

网站首页 > 技术文章 正文

Flink状态管理state(flink状态管理内部原理)

nanyue 2024-08-23 18:40:04 技术文章 5 ℃

1.状态计算概述

2.flink state

(1)flink state是什么?

state就是task/operator计算的中间结果

无状态计算:相同的输入得到相同的结果,只根据输入的数据无需借助其他数据就可以计算出我们想要的结果

有状态计算:相同的输入得到可能是不同的结果,计算过程会需要中间结果或者历史结果进行联合处理。

(2)有状态计算的场景

(3)state状态的类型

Manageed State & Raw State

从flink收接管角度分类:

1)ManageState(托管状态)是Flink自动管理的State(推荐使用)

2)RawState(原始状态)是原生态State需要用户自己管理


从是否与key相关分类:

ManageState:

1)keyed state 都是基于keyedstream,只有keyedstream可以使用,与key绑定

2)operator state 是非keyedstream时候使用,与operator绑定

RawState:operator state

keyedstate:

operatestate:


flink状态管理-ManageState-KeyedState-ValueState

1.value state案例

总结:使用valuestate可以帮助我们存储keyedstream中key对象的value数据,而且无需关心kv之间的映射。

代码:

package cn.itcast.flink.state.keyedstate

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration

object ValueStateDemo {
  def main(args: Array[String]): Unit = {
    // 创建流运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置并行度,cpu核数
    env.setParallelism(1)
    // 使用socket数据源
    val socketDs: DataStream[String] = env.socketTextStream("127.0.0.1", 9999)
    // 使用keyBy分组
    val tupleDs: DataStream[(String, Int)] = socketDs.map(line => {
      val arr: Array[String] = line.split(",")
      (arr(0), arr(1).trim.toInt)
    })

    // 分组
    val keyStream: KeyedStream[(String, Int), Tuple] = tupleDs.keyBy(0)

    //keyStream.maxBy(1)
    // 使用valuestate来存储两两比较之后的最大值,新数据到来之后如果比原来的数据大,则把该值更新为状态值,保证状态中一直存储的是最大值
    // 需要通过上下文来获取keyedstate valuestate
    val maxDs: DataStream[(String, Int)] = keyStream.map(
      // 使用richfunction操作,需要通过上下文
      new RichMapFunction[(String, Int), (String, Int)] {
        // 声明一个valuestate(不是创建),value state无需要关心key是谁以及kv之间的映射,flink维护
        var maxValueState: ValueState[Int] = _
        // 通过上下文才能获取真正的state,上下文这种操作在执行一次的方法中使用并获取真正的状态对象
        override def open(parameters: Configuration): Unit = {
          // 定义一个state描述其 参数:state的名称,数据类型的字节码文件
          val maxValueDesc: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("myValue", classOf[Int])
          // 根据上线文基于描述器获取state
          maxValueState = getRuntimeContext.getState(maxValueDesc)
        }
        // 业务逻辑,可以获取到state数据
        override def map(value: (String, Int)): (String, Int) = {
          // value是一条新数据,需要与原来最大值(valuestate)进行比较判断
          // 获取valuestate中的数据
          val maxNumInState: Int = maxValueState.value()

          if (value._2 > maxNumInState) {
            maxValueState.update(value._2)
          }
          (value._1, maxValueState.value())
        }
      }
    )

    // 打印数据
    maxDs.print()
    // 启动
    env.execute()
  }
}

2.map state案例

与valuestate类似,只是更改了描述器。

代码:

package cn.itcast.flink.state.keyedstate

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}

object MapStateDemo {
  def main(args: Array[String]): Unit = {

    // 创建流运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 加载数据
    val collectionDs: DataStream[(String, Int)] = env.fromCollection(List(
      ("java", 1),
      ("scala", 1),
      ("go", 1),
      ("java", 1),
      ("java", 2),
      ("go", 2),
      ("java", 1),
      ("scala", 2)
    ))

    // 数据分组
    val keyStream: KeyedStream[(String, Int), Tuple] = collectionDs.keyBy(0)

    // 数据转换
    val mapStateDs: DataStream[(String, Int)] = keyStream.map(
      new RichMapFunction[(String, Int), (String, Int)] {
        var sumMapState: MapState[String, Int] = null

        override def open(parameters: Configuration): Unit = {
          val mapStateDesc: MapStateDescriptor[String, Int] = new MapStateDescriptor[String, Int]("sumMap", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))

          sumMapState = getRuntimeContext.getMapState(mapStateDesc)
        }

        override def map(value: (String, Int)): (String, Int) = {
          val key: String = value._1

          val stateValue: Int = sumMapState.get(key)
          sumMapState.put(key, stateValue + value._2)

          (key, sumMapState.get(key))
        }
      }
    )
    // 打印数据
    mapStateDs.print()
    // 启动
    env.execute()
  }

}

总结:

上面mapstate案例也可以使用valuestate实现,选择不同的数据结构主要是考虑取数据方便以及结合你的业务。


flink状态管理-RawState-OperatorState-ListState

operatorState

总结:

1.operator staet 需要实现checkpointfunction

重写两个方法:

a:初始化状态的方法:initalizeState,通过定义liststate描述器来获取状态

b:集合checkpoint用法,snapshotState,我们定义把之前历史数据清空,然后加入新的state数据

2.run方法中我们通过liststate来获取之前的存储的state数据

listState

案例:模仿kafkaconsumer定义一个operator state

需求:自定义一个source实现checkpointedfunction,定义一个liststate存储我们source中自定义的一个偏移量数据(没法送一条数据该值(offset)加1);让程序出现异常看是否能从liststate中恢复该偏移量数据继续发送。

步骤:

1.获取执行环境

2.设置检查点机制;路径,重启策略

3.自定义数据源

(1)需要继承SourceFunction和CheckpointedFunction

(2)设置liststate,通过上下文对象context获取

(3)数据处理,保留offset

(4)制作快照

4.数据打印

5.触发执行

总结:

1.实现operator state需要实现Checkpoint的function重写两个方法,一个是初始化方法,另一个是进行Checkpoint操作时我们把之前状态清空,加入新的状态数据

2.ListState获取其中数据,使用get获取到iterator,遍历iterator可以获取到其中的数据

Tags:

最近发表
标签列表