优秀的编程知识分享平台

网站首页 > 技术文章 正文

SpringBoot 整合 Flink 实时同步 MySQL

nanyue 2025-05-30 16:34:06 技术文章 11 ℃

1、需求

在 Flink 发布SpringBoot 打包的 jar 包能够实时同步 MySQL 表,做到原表进行新增、修改、删除的时候目标表都能对应同步。

2、设计

  1. 在 SpringBoot 用 Java 做业务代码的开发;
  2. 基于Flink CDC 用 FlinkSQL 做 Mysql 实时同步处理;
  3. 打包成 jar 包后用 Flink 运行并管理。

3、环境要求


MySQL

8.*

Flink

1.16.2

Flink CDC

2.3.0

Java

8

SpringBoot

2.7.12

3、代码实现

3.1、pom 文件

pom 文件可以整个复制过来,自己打包运行可能会遇到各种各样的错,可以直接全部复制。

pom 中的 mainClass 一定要替换成自己的

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.usoten</groupId>
    <artifactId>processminingreport</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>ProcessMiningReport</name>
    <description>ProcessMiningReport</description>
    <properties>
        <java.version>1.8</java.version>
        <flink.version>1.16.2</flink.version>
        <flink-cdc.version>2.3.0</flink-cdc.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
           </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
        </dependency>

        <!-- -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- mysql-cdc fat jar -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <configuration>
                    <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring/org.springframework.boot.actuate.autoconfigure.web.ManagementContextConfiguration.imports</resource>
                                </transformer>
                                <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                    <resource>META-INF/spring.factories</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 启动类-->
                                    <mainClass>com.usoten.processminingcdcmanager.ProcessMiningCdcManagerApplication</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

3.2、代码实现

需要将原始表同步到目标表,这里我们需要执行三个 SQL,所以入参用 List。具体 SQL 见后面 postman 调用时的例子。

启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProcessMiningCdcManagerApplication {

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(ProcessMiningCdcManagerApplication.class, args);

        while (true){
            Thread.sleep(30000);
        }
    }

}

controller层

import com.usoten.processminingcdcmanager.service.CdcBaseService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController

@RequestMapping("/datasource")

public class CdcBaseController {

    private CdcBaseService cdcBaseService;

    public CdcBaseController(CdcBaseService cdcBaseService) {
        this.cdcBaseService = cdcBaseService;
    }

    @PostMapping("/cdc/executeSql")
    public void getColumnMetadata(@RequestBody List<String> sqlList) {

        cdcBaseService.executeSql(sqlList);
    }
}

service层

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestBody;

import java.util.List;

@Service
public class CdcBaseService {
    public void executeSql(@RequestBody List<String> sqlList) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        sqlList.forEach(sql -> {
                tEnv.executeSql(sql);
        });

    }
}

4、打包

执行 mvn clean package 打包,并将包放进 Flink 的 lib 目录下

5、运行 jar

nohup ./bin/flink run --class com.usoten.processminingcdcmanager.ProcessMiningCdcManagerApplication ./lib/processminingreport-0.0.1-SNAPSHOT.jar &
  • 报错1
20:30:59.960 [http-nio-8100-exec-1] ERROR o.a.c.c.C.[.[.[.[dispatcherServlet] - [log,175] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.IllegalStateException: Unable to instantiate java compiler] with root cause
java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
	at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
	at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
	at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426)

这是因为Flink 把客户端的ClassLoader解析顺序调整为了Child优先,这就导致用户的Jar包不能包含Flink框架的classes,比如常见的Calcite、Flink-Planner依赖、Hive依赖等等。用户需要把有冲突classes的jar放到flink-home/lib下,或者调整策略为Parent优先,这里直接调整为 Parent 优先,$
FLINK_HOME/conf/flink-conf.yaml中 添加

classloader.resolve-order: parent-first
  • 报错2

如果是报下面的错,是因为main方法停止了,解决的话可以看前面的启动类加一个死循环

重新启动 Flink 再次运行 jar,出现下面日志即可

6、准备数据

  • 建一个 test 数据库
  • 建两张表 原表:test 目标表:test1
  • 先在 test 表中造几条数据
  • CREATE TABLE `test` (
    `id` int NOT NULL,
    `username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
    `password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
    CREATE TABLE `test` (
    `id` int NOT NULL,
    `username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
    `password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
  • mysql 需要开启 binlog,并且表有主键

7、发送请求

localhost:8100/datasource/cdc/executeSql

body

[
    "CREATE TABLE mysql_source ( id INT, username STRING,  password STRING,PRIMARY KEY(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'table-name' = 'test', 'hostname' = 'localhost', 'database-name' = 'test', 'port' = '3306', 'username' = 'root', 'password' = '12345678', 'scan.startup.mode' = 'initial')",
    "CREATE TABLE oceanbase_sink ( id INT NOT NULL, username STRING,  password STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'test1', 'username' = 'root', 'password' = '12345678')",
    "insert into oceanbase_sink select id,username,password from mysql_source"
]

请求响应成功过后,我们进入 Flink 页面就可以看到运行的任务,此任务会一直运行,监听并同步 MySQL。

验证

此时我们会发现test表中的数据会同步test1表中,然后u对test表做新增、修改、删除操作时,test1表都会做相应变化


Flink 入门系列入口:

第1章 Flink 基础概念

第3章 Flink的运行架构

第4章 Flink 基础API 三:转换算子(Transformation)

.....

更多内容持续更新,点关注、不迷路。。。

Tags:

最近发表
标签列表