优秀的编程知识分享平台

网站首页 > 技术文章 正文

Flink ExecNodeBase是干啥的?有啥用?

nanyue 2024-08-09 07:12:12 技术文章 10 ℃

ExecNodeBase在flink sql中的解析流程位置

sql -> Operation -> RelNode -> Optmize RelNode -> ExecNode -> Transformation

flink sql中的作用:将上游优化过的RelNode转换成Transformation算子

ExecNodeBase 基本属性

  • id(唯一标识)
  • description(描述)
  • outputType(节点返回类型)
  • inputProperties(输入属性)
  • inputEdges(输入的边)
  • transformation(该节点对应的transformation算子)

源码解读

下面就以StreamExecWatermarkAssigner为例子

/**
      * 继承ExecNodeBase 没得说,不解释
      * StreamExecNode 和 SingleTransformationTranslator 算是两个约定的类
      */
    public class StreamExecWatermarkAssigner extends ExecNodeBase<RowData>
            implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
        public static final String FIELD_NAME_WATERMARK_EXPR = "watermarkExpr";
        public static final String FIELD_NAME_ROWTIME_FIELD_INDEX = "rowtimeFieldIndex";

        @JsonProperty(FIELD_NAME_WATERMARK_EXPR)
        private final RexNode watermarkExpr;

        @JsonProperty(FIELD_NAME_ROWTIME_FIELD_INDEX)
        private final int rowtimeFieldIndex;

        public StreamExecWatermarkAssigner(
                RexNode watermarkExpr,
                int rowtimeFieldIndex,
                InputProperty inputProperty,
                RowType outputType,
                String description) {
            this(
                    watermarkExpr,
                    rowtimeFieldIndex,
                    getNewNodeId(),
                    Collections.singletonList(inputProperty),
                    outputType,
                    description);
        }

        @JsonCreator
        public StreamExecWatermarkAssigner(
                @JsonProperty(FIELD_NAME_WATERMARK_EXPR) RexNode watermarkExpr,
                @JsonProperty(FIELD_NAME_ROWTIME_FIELD_INDEX) int rowtimeFieldIndex,
                @JsonProperty(FIELD_NAME_ID) int id,
                @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
                @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
                @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
            // ExecNodeBase 用到的信息
            super(id, inputProperties, outputType, description);
            checkArgument(inputProperties.size() == 1);
            // (watermarkExpr) RexNode
            this.watermarkExpr = checkNotNull(watermarkExpr);
            // 这个暂时不清楚有啥用
            this.rowtimeFieldIndex = rowtimeFieldIndex;
        }

        // translateToPlanInternal: 转换为 Transformation 算子
        // 参数planner: 主要获取上下文配置信息
        @SuppressWarnings("unchecked")
        @Override
        protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
            final ExecEdge inputEdge = getInputEdges().get(0);
            // 先将输入边(其实也是 ExecNodeBase 的实现类)转换为Transformation
            final Transformation<RowData> inputTransform =
                    (Transformation<RowData>) inputEdge.translateToPlan(planner);

            final TableConfig tableConfig = planner.getTableConfig();
            // 通过上下文信息生成代码(CodeGenerator 是干啥的?后面有机会会介绍)
            final GeneratedWatermarkGenerator watermarkGenerator =
                    WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
                            tableConfig,
                            (RowType) inputEdge.getOutputType(),
                            watermarkExpr,
                            JavaScalaConversionUtil.toScala(Optional.empty()));

            final long idleTimeout =
                    tableConfig
                            .getConfiguration()
                            .get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT)
                            .toMillis();

            final WatermarkAssignerOperatorFactory operatorFactory =
                    new WatermarkAssignerOperatorFactory(
                            rowtimeFieldIndex, idleTimeout, watermarkGenerator);
            // 最后生成 Transformation 算子
            return new OneInputTransformation<>(
                    inputTransform,
                    getDescription(),
                    operatorFactory,
                    InternalTypeInfo.of(getOutputType()),
                    inputTransform.getParallelism());
        }
    }

Tags:

最近发表
标签列表