网站首页 > 技术文章 正文
1、需求
在 Flink 发布SpringBoot 打包的 jar 包能够实时同步 MySQL 表,做到原表进行新增、修改、删除的时候目标表都能对应同步。
2、设计
- 在 SpringBoot 用 Java 做业务代码的开发;
- 基于Flink CDC 用 FlinkSQL 做 Mysql 实时同步处理;
- 打包成 jar 包后用 Flink 运行并管理。
3、环境要求
MySQL | 8.* |
Flink | 1.16.2 |
Flink CDC | 2.3.0 |
Java | 8 |
SpringBoot | 2.7.12 |
3、代码实现
3.1、pom 文件
pom 文件可以整个复制过来,自己打包运行可能会遇到各种各样的错,可以直接全部复制。
pom 中的 mainClass 一定要替换成自己的
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.usoten</groupId>
<artifactId>processminingreport</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ProcessMiningReport</name>
<description>ProcessMiningReport</description>
<properties>
<java.version>1.8</java.version>
<flink.version>1.16.2</flink.version>
<flink-cdc.version>2.3.0</flink-cdc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- mysql-cdc fat jar -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink-cdc.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring/org.springframework.boot.actuate.autoconfigure.web.ManagementContextConfiguration.imports</resource>
</transformer>
<transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 启动类-->
<mainClass>com.usoten.processminingcdcmanager.ProcessMiningCdcManagerApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3.2、代码实现
需要将原始表同步到目标表,这里我们需要执行三个 SQL,所以入参用 List。具体 SQL 见后面 postman 调用时的例子。
启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProcessMiningCdcManagerApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ProcessMiningCdcManagerApplication.class, args);
while (true){
Thread.sleep(30000);
}
}
}
controller层
import com.usoten.processminingcdcmanager.service.CdcBaseService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/datasource")
public class CdcBaseController {
private CdcBaseService cdcBaseService;
public CdcBaseController(CdcBaseService cdcBaseService) {
this.cdcBaseService = cdcBaseService;
}
@PostMapping("/cdc/executeSql")
public void getColumnMetadata(@RequestBody List<String> sqlList) {
cdcBaseService.executeSql(sqlList);
}
}
service层
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
@Service
public class CdcBaseService {
public void executeSql(@RequestBody List<String> sqlList) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
sqlList.forEach(sql -> {
tEnv.executeSql(sql);
});
}
}
4、打包
执行 mvn clean package 打包,并将包放进 Flink 的 lib 目录下
5、运行 jar
nohup ./bin/flink run --class com.usoten.processminingcdcmanager.ProcessMiningCdcManagerApplication ./lib/processminingreport-0.0.1-SNAPSHOT.jar &
- 报错1
20:30:59.960 [http-nio-8100-exec-1] ERROR o.a.c.c.C.[.[.[.[dispatcherServlet] - [log,175] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.IllegalStateException: Unable to instantiate java compiler] with root cause
java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426)
这是因为Flink 把客户端的ClassLoader解析顺序调整为了Child优先,这就导致用户的Jar包不能包含Flink框架的classes,比如常见的Calcite、Flink-Planner依赖、Hive依赖等等。用户需要把有冲突classes的jar放到flink-home/lib下,或者调整策略为Parent优先,这里直接调整为 Parent 优先,$
FLINK_HOME/conf/flink-conf.yaml中 添加
classloader.resolve-order: parent-first
- 报错2
如果是报下面的错,是因为main方法停止了,解决的话可以看前面的启动类加一个死循环
重新启动 Flink 再次运行 jar,出现下面日志即可
6、准备数据
- 建一个 test 数据库
- 建两张表 原表:test 目标表:test1
- 先在 test 表中造几条数据
- CREATE TABLE `test` (
`id` int NOT NULL,
`username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
CREATE TABLE `test` (
`id` int NOT NULL,
`username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; - mysql 需要开启 binlog,并且表有主键
7、发送请求
localhost:8100/datasource/cdc/executeSql
body
[
"CREATE TABLE mysql_source ( id INT, username STRING, password STRING,PRIMARY KEY(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'table-name' = 'test', 'hostname' = 'localhost', 'database-name' = 'test', 'port' = '3306', 'username' = 'root', 'password' = '12345678', 'scan.startup.mode' = 'initial')",
"CREATE TABLE oceanbase_sink ( id INT NOT NULL, username STRING, password STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'test1', 'username' = 'root', 'password' = '12345678')",
"insert into oceanbase_sink select id,username,password from mysql_source"
]
请求响应成功过后,我们进入 Flink 页面就可以看到运行的任务,此任务会一直运行,监听并同步 MySQL。
验证
此时我们会发现test表中的数据会同步test1表中,然后u对test表做新增、修改、删除操作时,test1表都会做相应变化
Flink 入门系列入口:
第4章 Flink 基础API 三:转换算子(Transformation)
.....
更多内容持续更新,点关注、不迷路。。。
猜你喜欢
- 2025-05-30 大厂都在用的 Git 代码管理规范 !
- 2025-05-30 Linux操作系统之常用命令
- 2025-05-30 AndroidStudio_Android使用OkHttp发起Http请求
- 2025-05-30 11月1日周三周赛快速全清简单打法#保卫萝卜4周赛
- 2025-05-30 老友记 第一季 第十集 中英文台词 完整版 Part 1
- 2025-05-30 史上超全食物英文名称汇总及配图,建议珍藏!
- 2025-05-30 使用Docker快速部署Storm环境
- 2025-05-30 这样优化Spring Boot,启动速度快到飞起
- 2025-05-30 Vert.x Blueprint-教你如何玩转待办事项服务二
- 2025-05-30 十款优质企业级Java微服务开源项目(开源框架,公司项目等)
- 06-13C++之类和对象(c++中类和对象的区别)
- 06-13C语言进阶教程:数据结构 - 哈希表的基本原理与实现
- 06-13C语言实现见缝插圆游戏!零基础代码思路+源码分享
- 06-13Windows 10下使用编译并使用openCV
- 06-13C语言进阶教程:栈和队列的实现与应用
- 06-13C语言这些常见标准文件该如何使用?很基础也很重要
- 06-13C语言 vs C++:谁才是编程界的“全能王者”?
- 06-13C语言无锁编程指南(c语言锁机代码)
- 最近发表
- 标签列表
-
- cmd/c (64)
- c++中::是什么意思 (83)
- 标签用于 (65)
- 主键只能有一个吗 (66)
- c#console.writeline不显示 (75)
- pythoncase语句 (81)
- es6includes (73)
- sqlset (64)
- windowsscripthost (67)
- apt-getinstall-y (86)
- node_modules怎么生成 (76)
- chromepost (65)
- c++int转char (75)
- static函数和普通函数 (76)
- el-date-picker开始日期早于结束日期 (70)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- & (66)
- java (73)
- js数组插入 (83)
- linux删除一个文件夹 (65)
- mac安装java (72)
- eacces (67)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)