优秀的编程知识分享平台

网站首页 > 技术文章 正文

聊聊flink StreamOperator的initializeState方法

nanyue 2024-08-30 20:44:59 技术文章 5 ℃

本文主要研究一下flink StreamOperator的initializeState方法

Task.run

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {
?
 public void run() {
?
 // ----------------------------
 // Initial State transition
 // ----------------------------
 while (true) {
 ExecutionState current = this.executionState;
 if (current == ExecutionState.CREATED) {
 if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
 // success, we can start our work
 break;
 }
 }
 else if (current == ExecutionState.FAILED) {
 // we were immediately failed. tell the TaskManager that we reached our final state
 notifyFinalState();
 if (metrics != null) {
 metrics.close();
 }
 return;
 }
 else if (current == ExecutionState.CANCELING) {
 if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
 // we were immediately canceled. tell the TaskManager that we reached our final state
 notifyFinalState();
 if (metrics != null) {
 metrics.close();
 }
 return;
 }
 }
 else {
 if (metrics != null) {
 metrics.close();
 }
 throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
 }
 }
?
 // all resource acquisitions and registrations from here on
 // need to be undone in the end
 Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
 AbstractInvokable invokable = null;
?
 try {
 // ----------------------------
 // Task Bootstrap - We periodically
 // check for canceling as a shortcut
 // ----------------------------
?
 //......
?
 // ----------------------------------------------------------------
 // call the user code initialization methods
 // ----------------------------------------------------------------
?
 TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
?
 Environment env = new RuntimeEnvironment(
 jobId,
 vertexId,
 executionId,
 executionConfig,
 taskInfo,
 jobConfiguration,
 taskConfiguration,
 userCodeClassLoader,
 memoryManager,
 ioManager,
 broadcastVariableManager,
 taskStateManager,
 accumulatorRegistry,
 kvStateRegistry,
 inputSplitProvider,
 distributedCacheEntries,
 producedPartitions,
 inputGates,
 network.getTaskEventDispatcher(),
 checkpointResponder,
 taskManagerConfig,
 metrics,
 this);
?
 // now load and instantiate the task's invokable code
 invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
?
 // ----------------------------------------------------------------
 // actual task core work
 // ----------------------------------------------------------------
?
 // we must make strictly sure that the invokable is accessible to the cancel() call
 // by the time we switched to running.
 this.invokable = invokable;
?
 // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
 if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
 throw new CancelTaskException();
 }
?
 // notify everyone that we switched to running
 taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
?
 // make sure the user code classloader is accessible thread-locally
 executingThread.setContextClassLoader(userCodeClassLoader);
?
 // run the invokable
 invokable.invoke();
?
 // make sure, we enter the catch block if the task leaves the invoke() method due
 // to the fact that it has been canceled
 if (isCanceledOrFailed()) {
 throw new CancelTaskException();
 }
?
 // ----------------------------------------------------------------
 // finalization of a successful execution
 // ----------------------------------------------------------------
?
 // finish the produced partitions. if this fails, we consider the execution failed.
 for (ResultPartition partition : producedPartitions) {
 if (partition != null) {
 partition.finish();
 }
 }
?
 // try to mark the task as finished
 // if that fails, the task was canceled/failed in the meantime
 if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
 throw new CancelTaskException();
 }
 }
 catch (Throwable t) {
 //......
 }
 finally {
 //......
 }
 }
 
 //......
}
  • Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask

StreamTask.invoke

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 extends AbstractInvokable
 implements AsyncExceptionHandler {
?
 @Override
 public final void invoke() throws Exception {
?
 boolean disposed = false;
 try {
 // -------- Initialize ---------
 LOG.debug("Initializing {}.", getName());
?
 asyncOperationsThreadPool = Executors.newCachedThreadPool();
?
 CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
?
 synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
 getExecutionConfig().isFailTaskOnCheckpointError(),
 getEnvironment());
?
 asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);
?
 stateBackend = createStateBackend();
 checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
?
 // if the clock is not already set, then assign a default TimeServiceProvider
 if (timerService == null) {
 ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
 "Time Trigger for " + getName(), getUserCodeClassLoader());
?
 timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
 }
?
 operatorChain = new OperatorChain<>(this, streamRecordWriters);
 headOperator = operatorChain.getHeadOperator();
?
 // task specific initialization
 init();
?
 // save the work of reloading state, etc, if the task is already canceled
 if (canceled) {
 throw new CancelTaskException();
 }
?
 // -------- Invoke --------
 LOG.debug("Invoking {}", getName());
?
 // we need to make sure that any triggers scheduled in open() cannot be
 // executed before all operators are opened
 synchronized (lock) {
?
 // both the following operations are protected by the lock
 // so that we avoid race conditions in the case that initializeState()
 // registers a timer, that fires before the open() is called.
?
 initializeState();
 openAllOperators();
 }
?
 // final check to exit early before starting to run
 if (canceled) {
 throw new CancelTaskException();
 }
?
 // let the task do its work
 isRunning = true;
 run();
?
 // if this left the run() method cleanly despite the fact that this was canceled,
 // make sure the "clean shutdown" is not attempted
 if (canceled) {
 throw new CancelTaskException();
 }
?
 LOG.debug("Finished task {}", getName());
?
 // make sure no further checkpoint and notification actions happen.
 // we make sure that no other thread is currently in the locked scope before
 // we close the operators by trying to acquire the checkpoint scope lock
 // we also need to make sure that no triggers fire concurrently with the close logic
 // at the same time, this makes sure that during any "regular" exit where still
 synchronized (lock) {
 // this is part of the main logic, so if this fails, the task is considered failed
 closeAllOperators();
?
 // make sure no new timers can come
 timerService.quiesce();
?
 // only set the StreamTask to not running after all operators have been closed!
 // See FLINK-7430
 isRunning = false;
 }
?
 // make sure all timers finish
 timerService.awaitPendingAfterQuiesce();
?
 LOG.debug("Closed operators for task {}", getName());
?
 // make sure all buffered data is flushed
 operatorChain.flushOutputs();
?
 // make an attempt to dispose the operators such that failures in the dispose call
 // still let the computation fail
 tryDisposeAllOperators();
 disposed = true;
 }
 finally {
 //......
 }
 }
?
 private void initializeState() throws Exception {
?
 StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
?
 for (StreamOperator<?> operator : allOperators) {
 if (null != operator) {
 operator.initializeState();
 }
 }
 }
?
 //......
}
  • StreamTask的invoke方法会调用initializeState方法,该方法会遍历operatorChain上的allOperators(StreamOperator),调用其initializeState方法;比如这里的operator为StreamSource

StreamOperator.initializeState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamOperator.java

@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
 /**
 * Provides a context to initialize all state in the operator.
 */
 void initializeState() throws Exception;
?
 //......
}
  • StreamOperator接口定义了initializeState方法用于初始化operator的state

StreamSource.initializeState

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
?
 //......
}
  • StreamSource继承了AbstractUdfStreamOperator,它没有覆盖initializeState,而AbstractUdfStreamOperator也没有覆盖initializeState方法,因而是执行的是AbstractUdfStreamOperator的父类AbstractStreamOperator的initializeState

AbstractStreamOperator.initializeState

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
 implements StreamOperator<OUT>, Serializable {
?
 @Override
 public final void initializeState() throws Exception {
?
 final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
?
 final StreamTask<?, ?> containingTask =
 Preconditions.checkNotNull(getContainingTask());
 final CloseableRegistry streamTaskCloseableRegistry =
 Preconditions.checkNotNull(containingTask.getCancelables());
 final StreamTaskStateInitializer streamTaskStateManager =
 Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
?
 final StreamOperatorStateContext context =
 streamTaskStateManager.streamOperatorStateContext(
 getOperatorID(),
 getClass().getSimpleName(),
 this,
 keySerializer,
 streamTaskCloseableRegistry,
 metrics);
?
 this.operatorStateBackend = context.operatorStateBackend();
 this.keyedStateBackend = context.keyedStateBackend();
?
 if (keyedStateBackend != null) {
 this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
 }
?
 timeServiceManager = context.internalTimerServiceManager();
?
 CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
 CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
?
 try {
 StateInitializationContext initializationContext = new StateInitializationContextImpl(
 context.isRestored(), // information whether we restore or start for the first time
 operatorStateBackend, // access to operator state backend
 keyedStateStore, // access to keyed state backend
 keyedStateInputs, // access to keyed state stream
 operatorStateInputs); // access to operator state stream
?
 initializeState(initializationContext);
 } finally {
 closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
 closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
 }
 }
?
 /**
 * Stream operators with state which can be restored need to override this hook method.
 *
 * @param context context that allows to register different states.
 */
 public void initializeState(StateInitializationContext context) throws Exception {
?
 }
?
 //......
}
  • AbstractStreamOperator实现了StreamOperator接口定义的initializeState方法,该方法会调用initializeState(initializationContext)方法,其子类AbstractUdfStreamOperator对该方法进行了覆盖

AbstractUdfStreamOperator.initializeState(initializationContext)

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 extends AbstractStreamOperator<OUT>
 implements OutputTypeConfigurable<OUT> {
?
 @Override
 public void initializeState(StateInitializationContext context) throws Exception {
 super.initializeState(context);
 StreamingFunctionUtils.restoreFunctionState(context, userFunction);
 }
 
 //......
}
  • initializeState(initializationContext)方法这里调用了StreamingFunctionUtils.restoreFunctionState

StreamingFunctionUtils.restoreFunctionState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java

 public static void restoreFunctionState(
 StateInitializationContext context,
 Function userFunction) throws Exception {
?
 Preconditions.checkNotNull(context);
?
 while (true) {
?
 if (tryRestoreFunction(context, userFunction)) {
 break;
 }
?
 // inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function
 if (userFunction instanceof WrappingFunction) {
 userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
 } else {
 break;
 }
 }
 }
?
 private static boolean tryRestoreFunction(
 StateInitializationContext context,
 Function userFunction) throws Exception {
?
 if (userFunction instanceof CheckpointedFunction) {
 ((CheckpointedFunction) userFunction).initializeState(context);
?
 return true;
 }
?
 if (context.isRestored() && userFunction instanceof ListCheckpointed) {
 @SuppressWarnings("unchecked")
 ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
?
 ListState<Serializable> listState = context.getOperatorStateStore().
 getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
?
 List<Serializable> list = new ArrayList<>();
?
 for (Serializable serializable : listState.get()) {
 list.add(serializable);
 }
?
 try {
 listCheckpointedFun.restoreState(list);
 } catch (Exception e) {
?
 throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
 }
?
 return true;
 }
?
 return false;
 }
  • restoreFunctionState主要是调用了tryRestoreFunction方法,而该方法会判断,如果userFunction实现了CheckpointedFunction接口则调用其initializeState方法,如果userFunction实现了ListCheckpointed接口,而且是context.isRestored()为true,那么就会从OperatorStateStore获取ListState,将里头的值转换为List,调用ListCheckpointed.restoreState方法

小结

  • Task的run方法会触发invokable.invoke(),这里的invokable为StreamTask,StreamTask的invoke方法会调用initializeState方法,该方法会遍历operatorChain上的allOperators(StreamOperator),调用其initializeState方法;比如这里的operator为StreamSource,它继承了AbstractUdfStreamOperator
  • StreamOperator接口定义了initializeState方法用于初始化operator的state,其抽象子类AbstractStreamOperator实现了initializeState方法,但是它内部会调用调用initializeState(initializationContext)方法,而其子类AbstractUdfStreamOperator对该方法进行了覆盖
  • AbstractUdfStreamOperator的initializeState(initializationContext)方法调用了StreamingFunctionUtils.restoreFunctionState,而后者会判断,如果userFunction实现了CheckpointedFunction接口则调用其initializeState方法,如果userFunction实现了ListCheckpointed接口,而且是context.isRestored()为true,那么就会从OperatorStateStore获取ListState,将里头的值转换为List,调用ListCheckpointed.restoreState方法

doc

  • Working with State
最近发表
标签列表