序
本文主要研究一下flink StreamOperator的initializeState方法
Task.run
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
public class Task implements Runnable, TaskActions, CheckpointListener { ? public void run() { ? // ---------------------------- // Initial State transition // ---------------------------- while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.CREATED) { if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { // success, we can start our work break; } } else if (current == ExecutionState.FAILED) { // we were immediately failed. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } else if (current == ExecutionState.CANCELING) { if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } } else { if (metrics != null) { metrics.close(); } throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.'); } } ? // all resource acquisitions and registrations from here on // need to be undone in the end Map<String, Future<Path>> distributedCacheEntries = new HashMap<>(); AbstractInvokable invokable = null; ? try { // ---------------------------- // Task Bootstrap - We periodically // check for canceling as a shortcut // ---------------------------- ? //...... ? // ---------------------------------------------------------------- // call the user code initialization methods // ---------------------------------------------------------------- ? TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId()); ? Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, producedPartitions, inputGates, network.getTaskEventDispatcher(), checkpointResponder, taskManagerConfig, metrics, this); ? // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); ? // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- ? // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; ? // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } ? // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); ? // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); ? // run the invokable invokable.invoke(); ? // make sure, we enter the catch block if the task leaves the invoke() method due // to the fact that it has been canceled if (isCanceledOrFailed()) { throw new CancelTaskException(); } ? // ---------------------------------------------------------------- // finalization of a successful execution // ---------------------------------------------------------------- ? // finish the produced partitions. if this fails, we consider the execution failed. for (ResultPartition partition : producedPartitions) { if (partition != null) { partition.finish(); } } ? // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { throw new CancelTaskException(); } } catch (Throwable t) { //...... } finally { //...... } } //...... }
- Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask
StreamTask.invoke
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@Internal public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { ? @Override public final void invoke() throws Exception { ? boolean disposed = false; try { // -------- Initialize --------- LOG.debug("Initializing {}.", getName()); ? asyncOperationsThreadPool = Executors.newCachedThreadPool(); ? CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory(); ? synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler( getExecutionConfig().isFailTaskOnCheckpointError(), getEnvironment()); ? asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this); ? stateBackend = createStateBackend(); checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID()); ? // if the clock is not already set, then assign a default TimeServiceProvider if (timerService == null) { ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName(), getUserCodeClassLoader()); ? timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); } ? operatorChain = new OperatorChain<>(this, streamRecordWriters); headOperator = operatorChain.getHeadOperator(); ? // task specific initialization init(); ? // save the work of reloading state, etc, if the task is already canceled if (canceled) { throw new CancelTaskException(); } ? // -------- Invoke -------- LOG.debug("Invoking {}", getName()); ? // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened synchronized (lock) { ? // both the following operations are protected by the lock // so that we avoid race conditions in the case that initializeState() // registers a timer, that fires before the open() is called. ? initializeState(); openAllOperators(); } ? // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } ? // let the task do its work isRunning = true; run(); ? // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } ? LOG.debug("Finished task {}", getName()); ? // make sure no further checkpoint and notification actions happen. // we make sure that no other thread is currently in the locked scope before // we close the operators by trying to acquire the checkpoint scope lock // we also need to make sure that no triggers fire concurrently with the close logic // at the same time, this makes sure that during any "regular" exit where still synchronized (lock) { // this is part of the main logic, so if this fails, the task is considered failed closeAllOperators(); ? // make sure no new timers can come timerService.quiesce(); ? // only set the StreamTask to not running after all operators have been closed! // See FLINK-7430 isRunning = false; } ? // make sure all timers finish timerService.awaitPendingAfterQuiesce(); ? LOG.debug("Closed operators for task {}", getName()); ? // make sure all buffered data is flushed operatorChain.flushOutputs(); ? // make an attempt to dispose the operators such that failures in the dispose call // still let the computation fail tryDisposeAllOperators(); disposed = true; } finally { //...... } } ? private void initializeState() throws Exception { ? StreamOperator<?>[] allOperators = operatorChain.getAllOperators(); ? for (StreamOperator<?> operator : allOperators) { if (null != operator) { operator.initializeState(); } } } ? //...... }
- StreamTask的invoke方法会调用initializeState方法,该方法会遍历operatorChain上的allOperators(StreamOperator),调用其initializeState方法;比如这里的operator为StreamSource
StreamOperator.initializeState
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamOperator.java
@PublicEvolving public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable { /** * Provides a context to initialize all state in the operator. */ void initializeState() throws Exception; ? //...... }
- StreamOperator接口定义了initializeState方法用于初始化operator的state
StreamSource.initializeState
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
@Internal public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> { ? //...... }
- StreamSource继承了AbstractUdfStreamOperator,它没有覆盖initializeState,而AbstractUdfStreamOperator也没有覆盖initializeState方法,因而是执行的是AbstractUdfStreamOperator的父类AbstractStreamOperator的initializeState
AbstractStreamOperator.initializeState
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@PublicEvolving public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, Serializable { ? @Override public final void initializeState() throws Exception { ? final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); ? final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask()); final CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables()); final StreamTaskStateInitializer streamTaskStateManager = Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer()); ? final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), this, keySerializer, streamTaskCloseableRegistry, metrics); ? this.operatorStateBackend = context.operatorStateBackend(); this.keyedStateBackend = context.keyedStateBackend(); ? if (keyedStateBackend != null) { this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig()); } ? timeServiceManager = context.internalTimerServiceManager(); ? CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs(); CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs(); ? try { StateInitializationContext initializationContext = new StateInitializationContextImpl( context.isRestored(), // information whether we restore or start for the first time operatorStateBackend, // access to operator state backend keyedStateStore, // access to keyed state backend keyedStateInputs, // access to keyed state stream operatorStateInputs); // access to operator state stream ? initializeState(initializationContext); } finally { closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry); closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry); } } ? /** * Stream operators with state which can be restored need to override this hook method. * * @param context context that allows to register different states. */ public void initializeState(StateInitializationContext context) throws Exception { ? } ? //...... }
- AbstractStreamOperator实现了StreamOperator接口定义的initializeState方法,该方法会调用initializeState(initializationContext)方法,其子类AbstractUdfStreamOperator对该方法进行了覆盖
AbstractUdfStreamOperator.initializeState(initializationContext)
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@PublicEvolving public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> { ? @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); StreamingFunctionUtils.restoreFunctionState(context, userFunction); } //...... }
- initializeState(initializationContext)方法这里调用了StreamingFunctionUtils.restoreFunctionState
StreamingFunctionUtils.restoreFunctionState
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
public static void restoreFunctionState( StateInitializationContext context, Function userFunction) throws Exception { ? Preconditions.checkNotNull(context); ? while (true) { ? if (tryRestoreFunction(context, userFunction)) { break; } ? // inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function if (userFunction instanceof WrappingFunction) { userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); } else { break; } } } ? private static boolean tryRestoreFunction( StateInitializationContext context, Function userFunction) throws Exception { ? if (userFunction instanceof CheckpointedFunction) { ((CheckpointedFunction) userFunction).initializeState(context); ? return true; } ? if (context.isRestored() && userFunction instanceof ListCheckpointed) { @SuppressWarnings("unchecked") ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction; ? ListState<Serializable> listState = context.getOperatorStateStore(). getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); ? List<Serializable> list = new ArrayList<>(); ? for (Serializable serializable : listState.get()) { list.add(serializable); } ? try { listCheckpointedFun.restoreState(list); } catch (Exception e) { ? throw new Exception("Failed to restore state to function: " + e.getMessage(), e); } ? return true; } ? return false; }
- restoreFunctionState主要是调用了tryRestoreFunction方法,而该方法会判断,如果userFunction实现了CheckpointedFunction接口则调用其initializeState方法,如果userFunction实现了ListCheckpointed接口,而且是context.isRestored()为true,那么就会从OperatorStateStore获取ListState,将里头的值转换为List,调用ListCheckpointed.restoreState方法
小结
- Task的run方法会触发invokable.invoke(),这里的invokable为StreamTask,StreamTask的invoke方法会调用initializeState方法,该方法会遍历operatorChain上的allOperators(StreamOperator),调用其initializeState方法;比如这里的operator为StreamSource,它继承了AbstractUdfStreamOperator
- StreamOperator接口定义了initializeState方法用于初始化operator的state,其抽象子类AbstractStreamOperator实现了initializeState方法,但是它内部会调用调用initializeState(initializationContext)方法,而其子类AbstractUdfStreamOperator对该方法进行了覆盖
- AbstractUdfStreamOperator的initializeState(initializationContext)方法调用了StreamingFunctionUtils.restoreFunctionState,而后者会判断,如果userFunction实现了CheckpointedFunction接口则调用其initializeState方法,如果userFunction实现了ListCheckpointed接口,而且是context.isRestored()为true,那么就会从OperatorStateStore获取ListState,将里头的值转换为List,调用ListCheckpointed.restoreState方法
doc
- Working with State