序
本文主要研究一下flink的TableFunction
实例
// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer). public class Split extends TableFunction<Tuple2<String, Integer>> { private String separator = " "; public Split(String separator) { this.separator = separator; } public void eval(String str) { for (String s : str.split(separator)) { // use collect(...) to emit a row collect(new Tuple2<String, Integer>(s, s.length())); } } } ? BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Table myTable = ... // table schema: [a: String] ? // Register the function. tableEnv.registerFunction("split", new Split("#")); ? // Use the table function in the Java Table API. "as" specifies the field names of the table. myTable.join(new Table(tableEnv, "split(a) as (word, length)")) .select("a, word, length"); myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)")) .select("a, word, length"); ? // Use the table function in SQL with LATERAL and TABLE keywords. // CROSS JOIN a table function (equivalent to "join" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
- 本实例自定义了Split function,并通过TableEnvironment.registerFunction注册,最后在Table的api或者TableEnvironment.sqlQuery中使用;这里的Split定义了public的eval方法,用于发射数据
UserDefinedFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scala
abstract class UserDefinedFunction extends Serializable { /** * Setup method for user-defined function. It can be used for initialization work. * * By default, this method does nothing. */ @throws(classOf[Exception]) def open(context: FunctionContext): Unit = {} ? /** * Tear-down method for user-defined function. It can be used for clean up work. * * By default, this method does nothing. */ @throws(classOf[Exception]) def close(): Unit = {} ? /** * @return true if and only if a call to this function is guaranteed to always return * the same result given the same parameters; true is assumed by default * if user's function is not pure functional, like random(), date(), now()... * isDeterministic must return false */ def isDeterministic: Boolean = true ? final def functionIdentifier: String = { val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this))) getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5) } ? /** * Returns the name of the UDF that is used for plan explain and logging. */ override def toString: String = getClass.getSimpleName ? }
- UserDefinedFunction定义了open、close、functionIdentifier方法
TableFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/TableFunction.scala
abstract class TableFunction[T] extends UserDefinedFunction { ? // ---------------------------------------------------------------------------------------------- ? /** * Emit an output row. * * @param row the output row */ protected def collect(row: T): Unit = { collector.collect(row) } ? // ---------------------------------------------------------------------------------------------- ? /** * The code generated collector used to emit row. */ private var collector: Collector[T] = _ ? /** * Internal use. Sets the current collector. */ private[flink] final def setCollector(collector: Collector[T]): Unit = { this.collector = collector } ? // ---------------------------------------------------------------------------------------------- ? /** * Returns the result type of the evaluation method with a given signature. * * This method needs to be overridden in case Flink's type extraction facilities are not * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation * method. Flink's type extraction facilities can handle basic types or * simple POJOs but might be wrong for more complex, custom, or composite types. * * @return [[TypeInformation]] of result type or null if Flink should determine the type */ def getResultType: TypeInformation[T] = null ? /** * Returns [[TypeInformation]] about the operands of the evaluation method with a given * signature. * * In order to perform operand type inference in SQL (especially when NULL is used) it might be * necessary to determine the parameter [[TypeInformation]] of an evaluation method. * By default Flink's type extraction facilities are used for this but might be wrong for * more complex, custom, or composite types. * * @param signature signature of the method the operand types need to be determined * @return [[TypeInformation]] of operand types */ def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = { signature.map { c => try { TypeExtractor.getForClass(c) } catch { case ite: InvalidTypesException => throw new ValidationException( s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " + s"automatically determined. Please provide type information manually.") } } } ? }
- TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法
ProcessOperator
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/operators/ProcessOperator.java
@Internal public class ProcessOperator<IN, OUT> extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { ? private static final long serialVersionUID = 1L; ? private transient TimestampedCollector<OUT> collector; ? private transient ContextImpl context; ? /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */ private long currentWatermark = Long.MIN_VALUE; ? public ProcessOperator(ProcessFunction<IN, OUT> function) { super(function); ? chainingStrategy = ChainingStrategy.ALWAYS; } ? @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); ? context = new ContextImpl(userFunction, getProcessingTimeService()); } ? @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); context.element = element; userFunction.processElement(element.getValue(), context, collector); context.element = null; } ? @Override public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); this.currentWatermark = mark.getTimestamp(); } ? //...... }
- ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner
CRowCorrelateProcessRunner
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
class CRowCorrelateProcessRunner( processName: String, processCode: String, collectorName: String, collectorCode: String, @transient var returnType: TypeInformation[CRow]) extends ProcessFunction[CRow, CRow] with ResultTypeQueryable[CRow] with Compiler[Any] with Logging { ? private var function: ProcessFunction[Row, Row] = _ private var collector: TableFunctionCollector[_] = _ private var cRowWrapper: CRowWrappingCollector = _ ? override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode) LOG.debug("Instantiating TableFunctionCollector.") collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]] this.cRowWrapper = new CRowWrappingCollector() ? LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode") val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode) val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]]) LOG.debug("Instantiating ProcessFunction.") function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]] FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext) FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) FunctionUtils.openFunction(collector, parameters) FunctionUtils.openFunction(function, parameters) } ? override def processElement( in: CRow, ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]) : Unit = { ? cRowWrapper.out = out cRowWrapper.setChange(in.change) ? collector.setCollector(cRowWrapper) collector.setInput(in.row) collector.reset() ? function.processElement( in.row, ctx.asInstanceOf[ProcessFunction[Row, Row]#Context], cRowWrapper) } ? override def getProducedType: TypeInformation[CRow] = returnType ? override def close(): Unit = { FunctionUtils.closeFunction(collector) FunctionUtils.closeFunction(function) } }
- CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法
小结
- TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法;UserDefinedFunction定义了open、close、functionIdentifier方法
- 自定义TableFunction的话,除了继承TableFunction类外,还需要定义一个public的eval方法,该方法的参数类型需要依据使用场景来定义,比如本实例中调用split的时候传入的是table的a字段,该字段为String类型,因而eval方法的入参就定义为String类型
- ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner;CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法
doc
- Table Functions