网站首页 > 技术文章 正文
导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job。尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方便提交。但我们在开发的过程中想对用户自定义 UDF Jar 进行管理,想将 UDF Jar 存储管理在阿里云 OSS ,在 Job 中通过动态加载的方式将 UDF Jar 加载进来,取代之前将 UDF 和 Job 打成一个 fat jar 的方式。下面将从几点展开讨论:
- 将 UDF 写到 Job 中并打成一个 fat jar 的实现方式
- 动态加载 UDF Jar 代码调整
- 代码调整后存在的问题
- 解决 UDF Jar URL 分发的思路
环境
- Flink 1.11.2
- 部署方式:Flink on Kubernetes
- 部署模式: Session Cluster
将 UDF 写到 Job 中并打成一个 fat jar 的方式
下面是一个简单采用 FlinkSQL 编写 Job 的例子。使用 datagen 连接器作为 Source 生成数据, print 作为 Sink 将结果打印到控制台。自定义的一个简单 UDF自定义函数(returnSelf)。
public static void main(String[] args) throws Exception {
//创建流运行时环境
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//采用BlinkPlanner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//创建StreamTable环境
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setParallelism(1);
bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");
bsTableEnv.executeSql("CREATE TABLE sourceTable (" +
" f_sequence INT," +
" f_random INT," +
" f_random_str STRING," +
" ts AS localtimestamp," +
" WATERMARK FOR ts AS ts" +
" ) WITH (" +
" 'connector' = 'datagen'," +
" 'rows-per-second'='5'," +
" 'fields.f_sequence.kind'='sequence'," +
" 'fields.f_sequence.start'='1'," +
" 'fields.f_sequence.end'='1000'," +
" 'fields.f_random.min'='1'," +
" 'fields.f_random.max'='1000'," +
" 'fields.f_random_str.length'='10'" +
")");
bsTableEnv.executeSql("CREATE TABLE sinktable (" +
" f_random_str STRING" +
") WITH (" +
" 'connector' = 'print'" +
")");
bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");
}
要将该 Job 提交给远程 Flink 集群时,我们需要将 Job(包括自定义 UDF) 打成一个 fat Jar。但这并不是我们期望的操作,由于打成 fat jar 会显得比较臃肿,同时不方便管理 UDF Jar ,有些 UDF 具有通用性,可复用。所以我们希望将自定义的UDF Jar 独立出来保存管理,并在 Job 中通过动态加载的方式使用,如下图:
动态加载 UDF Jar 代码调整
- 将 returnSelf 并独立打成一个 UDF Jar 上传到阿里云OSS。
- 在 Job 的 main() 方法中新增动态加载的代码
public static void main(String[] args) throws Exception {
//创建流运行时环境
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//采用BlinkPlanner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//创建StreamTable环境
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setParallelism(1);
// 动态加载
String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";
loadJar(new URL(funJarPath));
bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");
bsTableEnv.executeSql("CREATE TABLE sourceTable (" +
" f_sequence INT," +
" f_random INT," +
" f_random_str STRING," +
" ts AS localtimestamp," +
" WATERMARK FOR ts AS ts" +
" ) WITH (" +
" 'connector' = 'datagen'," +
" 'rows-per-second'='5'," +
" 'fields.f_sequence.kind'='sequence'," +
" 'fields.f_sequence.start'='1'," +
" 'fields.f_sequence.end'='1000'," +
" 'fields.f_random.min'='1'," +
" 'fields.f_random.max'='1000'," +
" 'fields.f_random_str.length'='10'" +
")");
bsTableEnv.executeSql("CREATE TABLE sinktable (" +
" f_random_str STRING" +
") WITH (" +
" 'connector' = 'print'" +
")");
bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
修改后,我们将 UDF jar 存放到 OSS 中进行管理。当 Job 需要依赖某个 UDF 时,只需要通过动态加载就可以完成。动态加载使用 URLClassLoader 实现,使用被管理于 OSS 的 UDF Jar 的 URL 将 Jar 加载进 JVM 中,并取得 returnSelf 类。
代码调整后存在的问题
运行结果:代码调整后,在本地 IDEA 运行程序(即,启动了 Mini Cluster集群)是可以成功运行的。但是当发布到远程 Flink 集群上时(采用 Flink on K8S , Session Cluster 部署模式),会出现找不到 UDF 异常,如下:
Caused by: java.lang.ClassNotFoundException: flinksql.function.udf.ReturnSelf
分析:这是由于 Flink 的部署方式有多种。在本地运行的启动的是 MiniCluster,即 JobManager 和 TaskManager 在同一个JVM 进程中。而我们在远程部署 Flink on Kubernetes 的 Session Cluster 集群 JobManager 和 TaskManager 是不同的 JVM 进程。
在 Session 模式下,客户端在 main() 方法开始执行直到 env.execute() 方法之前需要完成以下三件事情
- 获取作业所需的依赖项
- 通过执行环境分析并取得逻辑计划,即StreamGraph→JobGraph
- 将依赖项和JobGraph上传到集群中
只有在这些都完成之后,才会通过env.execute() 方法触发 Flink 运行时真正地开始执行作业。所以在本地运行的 Mini Cluster,因为都处于同一个 JVM 进程,客户端运行 main() 方法进行动态加载后将依赖项和 JobGraph 提交给 JobMananger 再由 TaskManager 执行 Job。
而当在远程集群时,客户端实现动态加载 Jar 后将依赖项和 JobGraph 提交给 JobMananger,但是由于 JobMananger 和 TaskMananger 是处于不同的 JVM进程中,且没有对自定义 UDF Jar URL 进行分发,这会让 TaskMananger 在运行任务时出现 Class Not Found 异常,这是因为 TaskMananger 没有进行类加载,JVM 中没有 returnSelf 类所导致。
解决 UDF Jar 分发的思路
基于以上问题我们查阅了一些相关资料及阅读源码,以以下三点为条件
- 基于采用 Session 模式部署
- 基于 REST API 提交 Job 而不采用命令行方式
- 不改动 Flink 源码
分析:官网提供了一个 -C 参数,大致用法就是把用户自定义 Jar 放到一个 JobMananger 和 TaskMananger 都能访问到的存储地方,然后通过命令行方式启动 Job 时使用 -C 参数,后面加上自定义 Jar 的URLs 就可以实现分发。
但是我们平台由于采用 REST API,而提交 Job 的 API 并没有提供该参数,所以在不改变 Flink 源码的前提下进行源码研究,最后发现可以在 main 中将 UDF Jar 的 URL 加到配置项 pipeline.classpaths 中,也就是曲线救国实现了 -C 的效果。在 main 中增加以下代码片段:
Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
configurationField.setAccessible(true);
Configuration o = (Configuration)configurationField.get(bsEnv);
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map<String,Object> temp = (Map<String,Object>)confData.get(o);
List<String> jarList = new ArrayList<>();
jarList.add(funJarPath);
temp.put("pipeline.classpaths",jarList);
完整代码
public static void main(String[] args) throws Exception {
//创建流运行时环境
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//采用BlinkPlanner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//创建StreamTable环境
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setParallelism(1);
// 动态加载
String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";
loadJar(new URL(funJarPath));
Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
configurationField.setAccessible(true);
Configuration o = (Configuration)configurationField.get(bsEnv);
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map<String,Object> temp = (Map<String,Object>)confData.get(o);
List<String> jarList = new ArrayList<>();
jarList.add(funJarPath);
temp.put("pipeline.classpaths",jarList);
bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");
bsTableEnv.executeSql("CREATE TABLE sourceTable (" +
" f_sequence INT," +
" f_random INT," +
" f_random_str STRING," +
" ts AS localtimestamp," +
" WATERMARK FOR ts AS ts" +
" ) WITH (" +
" 'connector' = 'datagen'," +
" 'rows-per-second'='5'," +
" 'fields.f_sequence.kind'='sequence'," +
" 'fields.f_sequence.start'='1'," +
" 'fields.f_sequence.end'='1000'," +
" 'fields.f_random.min'='1'," +
" 'fields.f_random.max'='1000'," +
" 'fields.f_random_str.length'='10'" +
")");
bsTableEnv.executeSql("CREATE TABLE sinktable (" +
" f_random_str STRING" +
") WITH (" +
" 'connector' = 'print'" +
")");
bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
最后
以上就是在 Flink on K8S 集群 Session 模式下, FlinkSQL 动态加载 Jar 的解决方案。由于 REST API 没有提供 -C 效果,自定义 Jar URL 没有分发到 TaskMananger,导致 TaskMananger 没有进行类加载到其 JVM 中。通过在 Job 的 main 方法中增加动态加载方法及配置 pipeline.classpaths,可以达到不改动 Flink 源码的情况下实现 -C 效果。以上方案刚实现不久,还不保证是否有其他未知的问题,如果有更好的解决方案或者该方案中存在错误或者疏漏也欢迎提出共同讨论。
感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。
- 上一篇: 浅谈xaingce apk样本分析
- 下一篇: 史上最详细最清晰的SpringBoot项目打包方法
猜你喜欢
- 2025-05-30 大厂都在用的 Git 代码管理规范 !
- 2025-05-30 Linux操作系统之常用命令
- 2025-05-30 AndroidStudio_Android使用OkHttp发起Http请求
- 2025-05-30 11月1日周三周赛快速全清简单打法#保卫萝卜4周赛
- 2025-05-30 老友记 第一季 第十集 中英文台词 完整版 Part 1
- 2025-05-30 史上超全食物英文名称汇总及配图,建议珍藏!
- 2025-05-30 使用Docker快速部署Storm环境
- 2025-05-30 这样优化Spring Boot,启动速度快到飞起
- 2025-05-30 Vert.x Blueprint-教你如何玩转待办事项服务二
- 2025-05-30 SpringBoot 整合 Flink 实时同步 MySQL
- 06-13C++之类和对象(c++中类和对象的区别)
- 06-13C语言进阶教程:数据结构 - 哈希表的基本原理与实现
- 06-13C语言实现见缝插圆游戏!零基础代码思路+源码分享
- 06-13Windows 10下使用编译并使用openCV
- 06-13C语言进阶教程:栈和队列的实现与应用
- 06-13C语言这些常见标准文件该如何使用?很基础也很重要
- 06-13C语言 vs C++:谁才是编程界的“全能王者”?
- 06-13C语言无锁编程指南(c语言锁机代码)
- 最近发表
- 标签列表
-
- cmd/c (64)
- c++中::是什么意思 (83)
- 标签用于 (65)
- 主键只能有一个吗 (66)
- c#console.writeline不显示 (75)
- pythoncase语句 (81)
- es6includes (73)
- sqlset (64)
- windowsscripthost (67)
- apt-getinstall-y (86)
- node_modules怎么生成 (76)
- chromepost (65)
- c++int转char (75)
- static函数和普通函数 (76)
- el-date-picker开始日期早于结束日期 (70)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- & (66)
- java (73)
- js数组插入 (83)
- linux删除一个文件夹 (65)
- mac安装java (72)
- eacces (67)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)