优秀的编程知识分享平台

网站首页 > 技术文章 正文

FlinkSQL 动态加载 UDF 实现思路

nanyue 2025-05-30 16:33:57 技术文章 7 ℃

导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job。尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方便提交。但我们在开发的过程中想对用户自定义 UDF Jar 进行管理,想将 UDF Jar 存储管理在阿里云 OSS ,在 Job 中通过动态加载的方式将 UDF Jar 加载进来,取代之前将 UDF 和 Job 打成一个 fat jar 的方式。下面将从几点展开讨论:

  • 将 UDF 写到 Job 中并打成一个 fat jar 的实现方式
  • 动态加载 UDF Jar 代码调整
  • 代码调整后存在的问题
  • 解决 UDF Jar URL 分发的思路

环境

  • Flink 1.11.2
  • 部署方式:Flink on Kubernetes
  • 部署模式: Session Cluster

将 UDF 写到 Job 中并打成一个 fat jar 的方式

下面是一个简单采用 FlinkSQL 编写 Job 的例子。使用 datagen 连接器作为 Source 生成数据, print 作为 Sink 将结果打印到控制台。自定义的一个简单 UDF自定义函数(returnSelf)。

   public static void main(String[] args) throws Exception {

        //创建流运行时环境
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        //采用BlinkPlanner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        //创建StreamTable环境
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        bsEnv.setParallelism(1);

        bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");
        bsTableEnv.executeSql("CREATE TABLE sourceTable (" +
                "    f_sequence INT," +
                "    f_random INT," +
                "    f_random_str STRING," +
                "    ts AS localtimestamp," +
                "    WATERMARK FOR ts AS ts" +
                "    ) WITH (" +
                "    'connector' = 'datagen'," +
                "    'rows-per-second'='5'," +
                "    'fields.f_sequence.kind'='sequence'," +
                "    'fields.f_sequence.start'='1'," +
                "    'fields.f_sequence.end'='1000'," +
                "    'fields.f_random.min'='1'," +
                "    'fields.f_random.max'='1000'," +
                "    'fields.f_random_str.length'='10'" +
                ")");
        bsTableEnv.executeSql("CREATE TABLE sinktable (" +
                "   f_random_str STRING" +
                ") WITH (" +
                "   'connector' = 'print'" +
                ")");
        bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");
    }

要将该 Job 提交给远程 Flink 集群时,我们需要将 Job(包括自定义 UDF) 打成一个 fat Jar。但这并不是我们期望的操作,由于打成 fat jar 会显得比较臃肿,同时不方便管理 UDF Jar ,有些 UDF 具有通用性,可复用。所以我们希望将自定义的UDF Jar 独立出来保存管理,并在 Job 中通过动态加载的方式使用,如下图:

动态加载 UDF Jar 代码调整

  • 将 returnSelf 并独立打成一个 UDF Jar 上传到阿里云OSS。
  • 在 Job 的 main() 方法中新增动态加载的代码
 public static void main(String[] args) throws Exception {

        //创建流运行时环境
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        //采用BlinkPlanner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        //创建StreamTable环境
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        bsEnv.setParallelism(1);

        // 动态加载
        String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";
        loadJar(new URL(funJarPath));

        bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");
        bsTableEnv.executeSql("CREATE TABLE sourceTable (" +
                "    f_sequence INT," +
                "    f_random INT," +
                "    f_random_str STRING," +
                "    ts AS localtimestamp," +
                "    WATERMARK FOR ts AS ts" +
                "    ) WITH (" +
                "    'connector' = 'datagen'," +
                "    'rows-per-second'='5'," +
                "    'fields.f_sequence.kind'='sequence'," +
                "    'fields.f_sequence.start'='1'," +
                "    'fields.f_sequence.end'='1000'," +
                "    'fields.f_random.min'='1'," +
                "    'fields.f_random.max'='1000'," +
                "    'fields.f_random_str.length'='10'" +
                ")");
        bsTableEnv.executeSql("CREATE TABLE sinktable (" +
                "   f_random_str STRING" +
                ") WITH (" +
                "   'connector' = 'print'" +
                ")");
        bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");
    }


    //动态加载Jar
    public static void loadJar(URL jarUrl) {
        //从URLClassLoader类加载器中获取类的addURL方法
        Method method = null;
        try {
            method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
        } catch (NoSuchMethodException | SecurityException e1) {
            e1.printStackTrace();
        }
        // 获取方法的访问权限
        boolean accessible = method.isAccessible();
        try {
            //修改访问权限为可写
            if (accessible == false) {
                method.setAccessible(true);
            }
            // 获取系统类加载器
            URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
            //jar路径加入到系统url路径里
            method.invoke(classLoader, jarUrl);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            method.setAccessible(accessible);
        }
    }

修改后,我们将 UDF jar 存放到 OSS 中进行管理。当 Job 需要依赖某个 UDF 时,只需要通过动态加载就可以完成。动态加载使用 URLClassLoader 实现,使用被管理于 OSS 的 UDF Jar 的 URL 将 Jar 加载进 JVM 中,并取得 returnSelf 类。

代码调整后存在的问题

运行结果:代码调整后,在本地 IDEA 运行程序(即,启动了 Mini Cluster集群)是可以成功运行的。但是当发布到远程 Flink 集群上时(采用 Flink on K8S , Session Cluster 部署模式),会出现找不到 UDF 异常,如下:

Caused by: java.lang.ClassNotFoundException: flinksql.function.udf.ReturnSelf

分析:这是由于 Flink 的部署方式有多种。在本地运行的启动的是 MiniCluster,即 JobManager 和 TaskManager 在同一个JVM 进程中。而我们在远程部署 Flink on Kubernetes 的 Session Cluster 集群 JobManager 和 TaskManager 是不同的 JVM 进程。

在 Session 模式下,客户端在 main() 方法开始执行直到 env.execute() 方法之前需要完成以下三件事情

  • 获取作业所需的依赖项
  • 通过执行环境分析并取得逻辑计划,即StreamGraph→JobGraph
  • 将依赖项和JobGraph上传到集群中

只有在这些都完成之后,才会通过env.execute() 方法触发 Flink 运行时真正地开始执行作业。所以在本地运行的 Mini Cluster,因为都处于同一个 JVM 进程,客户端运行 main() 方法进行动态加载后将依赖项和 JobGraph 提交给 JobMananger 再由 TaskManager 执行 Job。

而当在远程集群时,客户端实现动态加载 Jar 后将依赖项和 JobGraph 提交给 JobMananger,但是由于 JobMananger 和 TaskMananger 是处于不同的 JVM进程中,且没有对自定义 UDF Jar URL 进行分发,这会让 TaskMananger 在运行任务时出现 Class Not Found 异常,这是因为 TaskMananger 没有进行类加载,JVM 中没有 returnSelf 类所导致。

解决 UDF Jar 分发的思路

基于以上问题我们查阅了一些相关资料及阅读源码,以以下三点为条件

  • 基于采用 Session 模式部署
  • 基于 REST API 提交 Job 而不采用命令行方式
  • 不改动 Flink 源码

分析:官网提供了一个 -C 参数,大致用法就是把用户自定义 Jar 放到一个 JobMananger 和 TaskMananger 都能访问到的存储地方,然后通过命令行方式启动 Job 时使用 -C 参数,后面加上自定义 Jar 的URLs 就可以实现分发。

但是我们平台由于采用 REST API,而提交 Job 的 API 并没有提供该参数,所以在不改变 Flink 源码的前提下进行源码研究,最后发现可以在 main 中将 UDF Jar 的 URL 加到配置项 pipeline.classpaths 中,也就是曲线救国实现了 -C 的效果。在 main 中增加以下代码片段:

Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
        configurationField.setAccessible(true);
        Configuration o = (Configuration)configurationField.get(bsEnv);
        Field confData = Configuration.class.getDeclaredField("confData");
        confData.setAccessible(true);
        Map<String,Object> temp = (Map<String,Object>)confData.get(o);
        List<String> jarList = new ArrayList<>();
        jarList.add(funJarPath);
        temp.put("pipeline.classpaths",jarList);

完整代码

public static void main(String[] args) throws Exception {

        //创建流运行时环境
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        //采用BlinkPlanner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        //创建StreamTable环境
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        bsEnv.setParallelism(1);

        // 动态加载
        String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";
        loadJar(new URL(funJarPath));
        Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
        configurationField.setAccessible(true);
        Configuration o = (Configuration)configurationField.get(bsEnv);
        Field confData = Configuration.class.getDeclaredField("confData");
        confData.setAccessible(true);
        Map<String,Object> temp = (Map<String,Object>)confData.get(o);
        List<String> jarList = new ArrayList<>();
        jarList.add(funJarPath);
        temp.put("pipeline.classpaths",jarList);

        bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");
        bsTableEnv.executeSql("CREATE TABLE sourceTable (" +
                "    f_sequence INT," +
                "    f_random INT," +
                "    f_random_str STRING," +
                "    ts AS localtimestamp," +
                "    WATERMARK FOR ts AS ts" +
                "    ) WITH (" +
                "    'connector' = 'datagen'," +
                "    'rows-per-second'='5'," +
                "    'fields.f_sequence.kind'='sequence'," +
                "    'fields.f_sequence.start'='1'," +
                "    'fields.f_sequence.end'='1000'," +
                "    'fields.f_random.min'='1'," +
                "    'fields.f_random.max'='1000'," +
                "    'fields.f_random_str.length'='10'" +
                ")");
        bsTableEnv.executeSql("CREATE TABLE sinktable (" +
                "   f_random_str STRING" +
                ") WITH (" +
                "   'connector' = 'print'" +
                ")");
        bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");
    }


    //动态加载Jar
    public static void loadJar(URL jarUrl) {
        //从URLClassLoader类加载器中获取类的addURL方法
        Method method = null;
        try {
            method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
        } catch (NoSuchMethodException | SecurityException e1) {
            e1.printStackTrace();
        }
        // 获取方法的访问权限
        boolean accessible = method.isAccessible();
        try {
            //修改访问权限为可写
            if (accessible == false) {
                method.setAccessible(true);
            }
            // 获取系统类加载器
            URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
            //jar路径加入到系统url路径里
            method.invoke(classLoader, jarUrl);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            method.setAccessible(accessible);
        }
    }

最后

以上就是在 Flink on K8S 集群 Session 模式下, FlinkSQL 动态加载 Jar 的解决方案。由于 REST API 没有提供 -C 效果,自定义 Jar URL 没有分发到 TaskMananger,导致 TaskMananger 没有进行类加载到其 JVM 中。通过在 Job 的 main 方法中增加动态加载方法及配置 pipeline.classpaths,可以达到不改动 Flink 源码的情况下实现 -C 效果。以上方案刚实现不久,还不保证是否有其他未知的问题,如果有更好的解决方案或者该方案中存在错误或者疏漏也欢迎提出共同讨论。

感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

Tags:

最近发表
标签列表