ExecNodeBase在flink sql中的解析流程位置
sql -> Operation -> RelNode -> Optmize RelNode -> ExecNode -> Transformation
flink sql中的作用:将上游优化过的RelNode转换成Transformation算子
ExecNodeBase 基本属性
- id(唯一标识)
- description(描述)
- outputType(节点返回类型)
- inputProperties(输入属性)
- inputEdges(输入的边)
- transformation(该节点对应的transformation算子)
源码解读
下面就以StreamExecWatermarkAssigner为例子
/**
* 继承ExecNodeBase 没得说,不解释
* StreamExecNode 和 SingleTransformationTranslator 算是两个约定的类
*/
public class StreamExecWatermarkAssigner extends ExecNodeBase<RowData>
implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
public static final String FIELD_NAME_WATERMARK_EXPR = "watermarkExpr";
public static final String FIELD_NAME_ROWTIME_FIELD_INDEX = "rowtimeFieldIndex";
@JsonProperty(FIELD_NAME_WATERMARK_EXPR)
private final RexNode watermarkExpr;
@JsonProperty(FIELD_NAME_ROWTIME_FIELD_INDEX)
private final int rowtimeFieldIndex;
public StreamExecWatermarkAssigner(
RexNode watermarkExpr,
int rowtimeFieldIndex,
InputProperty inputProperty,
RowType outputType,
String description) {
this(
watermarkExpr,
rowtimeFieldIndex,
getNewNodeId(),
Collections.singletonList(inputProperty),
outputType,
description);
}
@JsonCreator
public StreamExecWatermarkAssigner(
@JsonProperty(FIELD_NAME_WATERMARK_EXPR) RexNode watermarkExpr,
@JsonProperty(FIELD_NAME_ROWTIME_FIELD_INDEX) int rowtimeFieldIndex,
@JsonProperty(FIELD_NAME_ID) int id,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
// ExecNodeBase 用到的信息
super(id, inputProperties, outputType, description);
checkArgument(inputProperties.size() == 1);
// (watermarkExpr) RexNode
this.watermarkExpr = checkNotNull(watermarkExpr);
// 这个暂时不清楚有啥用
this.rowtimeFieldIndex = rowtimeFieldIndex;
}
// translateToPlanInternal: 转换为 Transformation 算子
// 参数planner: 主要获取上下文配置信息
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
final ExecEdge inputEdge = getInputEdges().get(0);
// 先将输入边(其实也是 ExecNodeBase 的实现类)转换为Transformation
final Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
final TableConfig tableConfig = planner.getTableConfig();
// 通过上下文信息生成代码(CodeGenerator 是干啥的?后面有机会会介绍)
final GeneratedWatermarkGenerator watermarkGenerator =
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
tableConfig,
(RowType) inputEdge.getOutputType(),
watermarkExpr,
JavaScalaConversionUtil.toScala(Optional.empty()));
final long idleTimeout =
tableConfig
.getConfiguration()
.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT)
.toMillis();
final WatermarkAssignerOperatorFactory operatorFactory =
new WatermarkAssignerOperatorFactory(
rowtimeFieldIndex, idleTimeout, watermarkGenerator);
// 最后生成 Transformation 算子
return new OneInputTransformation<>(
inputTransform,
getDescription(),
operatorFactory,
InternalTypeInfo.of(getOutputType()),
inputTransform.getParallelism());
}
}