优秀的编程知识分享平台

网站首页 > 技术文章 正文

聊聊flink的TableFunction(flink table api join)

nanyue 2024-07-25 06:09:44 技术文章 14 ℃

本文主要研究一下flink的TableFunction

实例

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
 private String separator = " ";
 
 public Split(String separator) {
 this.separator = separator;
 }
 
 public void eval(String str) {
 for (String s : str.split(separator)) {
 // use collect(...) to emit a row
 collect(new Tuple2<String, Integer>(s, s.length()));
 }
 }
}
?
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ... // table schema: [a: String]
?
// Register the function.
tableEnv.registerFunction("split", new Split("#"));
?
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
 .select("a, word, length");
myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
 .select("a, word, length");
?
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
  • 本实例自定义了Split function,并通过TableEnvironment.registerFunction注册,最后在Table的api或者TableEnvironment.sqlQuery中使用;这里的Split定义了public的eval方法,用于发射数据

UserDefinedFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scala

abstract class UserDefinedFunction extends Serializable {
 /**
 * Setup method for user-defined function. It can be used for initialization work.
 *
 * By default, this method does nothing.
 */
 @throws(classOf[Exception])
 def open(context: FunctionContext): Unit = {}
?
 /**
 * Tear-down method for user-defined function. It can be used for clean up work.
 *
 * By default, this method does nothing.
 */
 @throws(classOf[Exception])
 def close(): Unit = {}
?
 /**
 * @return true if and only if a call to this function is guaranteed to always return
 * the same result given the same parameters; true is assumed by default
 * if user's function is not pure functional, like random(), date(), now()...
 * isDeterministic must return false
 */
 def isDeterministic: Boolean = true
?
 final def functionIdentifier: String = {
 val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
 getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
 }
?
 /**
 * Returns the name of the UDF that is used for plan explain and logging.
 */
 override def toString: String = getClass.getSimpleName
?
}
  • UserDefinedFunction定义了open、close、functionIdentifier方法

TableFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/TableFunction.scala

abstract class TableFunction[T] extends UserDefinedFunction {
?
 // ----------------------------------------------------------------------------------------------
?
 /**
 * Emit an output row.
 *
 * @param row the output row
 */
 protected def collect(row: T): Unit = {
 collector.collect(row)
 }
?
 // ----------------------------------------------------------------------------------------------
?
 /**
 * The code generated collector used to emit row.
 */
 private var collector: Collector[T] = _
?
 /**
 * Internal use. Sets the current collector.
 */
 private[flink] final def setCollector(collector: Collector[T]): Unit = {
 this.collector = collector
 }
?
 // ----------------------------------------------------------------------------------------------
?
 /**
 * Returns the result type of the evaluation method with a given signature.
 *
 * This method needs to be overridden in case Flink's type extraction facilities are not
 * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
 * method. Flink's type extraction facilities can handle basic types or
 * simple POJOs but might be wrong for more complex, custom, or composite types.
 *
 * @return [[TypeInformation]] of result type or null if Flink should determine the type
 */
 def getResultType: TypeInformation[T] = null
?
 /**
 * Returns [[TypeInformation]] about the operands of the evaluation method with a given
 * signature.
 *
 * In order to perform operand type inference in SQL (especially when NULL is used) it might be
 * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
 * By default Flink's type extraction facilities are used for this but might be wrong for
 * more complex, custom, or composite types.
 *
 * @param signature signature of the method the operand types need to be determined
 * @return [[TypeInformation]] of operand types
 */
 def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
 signature.map { c =>
 try {
 TypeExtractor.getForClass(c)
 } catch {
 case ite: InvalidTypesException =>
 throw new ValidationException(
 s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " +
 s"automatically determined. Please provide type information manually.")
 }
 }
 }
?
}
  • TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法

ProcessOperator

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/operators/ProcessOperator.java

@Internal
public class ProcessOperator<IN, OUT>
 extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
 implements OneInputStreamOperator<IN, OUT> {
?
 private static final long serialVersionUID = 1L;
?
 private transient TimestampedCollector<OUT> collector;
?
 private transient ContextImpl context;
?
 /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
 private long currentWatermark = Long.MIN_VALUE;
?
 public ProcessOperator(ProcessFunction<IN, OUT> function) {
 super(function);
?
 chainingStrategy = ChainingStrategy.ALWAYS;
 }
?
 @Override
 public void open() throws Exception {
 super.open();
 collector = new TimestampedCollector<>(output);
?
 context = new ContextImpl(userFunction, getProcessingTimeService());
 }
?
 @Override
 public void processElement(StreamRecord<IN> element) throws Exception {
 collector.setTimestamp(element);
 context.element = element;
 userFunction.processElement(element.getValue(), context, collector);
 context.element = null;
 }
?
 @Override
 public void processWatermark(Watermark mark) throws Exception {
 super.processWatermark(mark);
 this.currentWatermark = mark.getTimestamp();
 }
?
 //......
}
  • ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner

CRowCorrelateProcessRunner

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala

class CRowCorrelateProcessRunner(
 processName: String,
 processCode: String,
 collectorName: String,
 collectorCode: String,
 @transient var returnType: TypeInformation[CRow])
 extends ProcessFunction[CRow, CRow]
 with ResultTypeQueryable[CRow]
 with Compiler[Any]
 with Logging {
?
 private var function: ProcessFunction[Row, Row] = _
 private var collector: TableFunctionCollector[_] = _
 private var cRowWrapper: CRowWrappingCollector = _
?
 override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
 val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
 LOG.debug("Instantiating TableFunctionCollector.")
 collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
 this.cRowWrapper = new CRowWrappingCollector()
?
 LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode")
 val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode)
 val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
 LOG.debug("Instantiating ProcessFunction.")
 function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
 FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
 FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
 FunctionUtils.openFunction(collector, parameters)
 FunctionUtils.openFunction(function, parameters)
 }
?
 override def processElement(
 in: CRow,
 ctx: ProcessFunction[CRow, CRow]#Context,
 out: Collector[CRow])
 : Unit = {
?
 cRowWrapper.out = out
 cRowWrapper.setChange(in.change)
?
 collector.setCollector(cRowWrapper)
 collector.setInput(in.row)
 collector.reset()
?
 function.processElement(
 in.row,
 ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
 cRowWrapper)
 }
?
 override def getProducedType: TypeInformation[CRow] = returnType
?
 override def close(): Unit = {
 FunctionUtils.closeFunction(collector)
 FunctionUtils.closeFunction(function)
 }
}
  • CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法

小结

  • TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法;UserDefinedFunction定义了open、close、functionIdentifier方法
  • 自定义TableFunction的话,除了继承TableFunction类外,还需要定义一个public的eval方法,该方法的参数类型需要依据使用场景来定义,比如本实例中调用split的时候传入的是table的a字段,该字段为String类型,因而eval方法的入参就定义为String类型
  • ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner;CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法

doc

  • Table Functions

Tags:

最近发表
标签列表