优秀的编程知识分享平台

网站首页 > 技术文章 正文

第一个flink datastream程序(flink发展史)

nanyue 2024-08-09 07:01:20 技术文章 9 ℃

代码

创建maven项目

flink提供了项目模板,可以根据模板生成maven java项目。

mvn archetype:generate \
  -DarchetypeGroupId=org.apache.flink \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=1.13.1

stream 程序

写一个简单的stream程序,提供一个自定义source,经过一次map操作,使用自定义sink输出。并打印执行计划。

public class StreamingJob {
    private static Logger logger = LoggerFactory.getLogger(StreamingJob.class);

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream ds = env.addSource(new SourceFunction<Object>() {
            @Override
            public void run(SourceContext<Object> sourceContext) throws Exception {
                while (true) {
                    sourceContext.collect(System.currentTimeMillis());
                    Thread.sleep(1000L);
                }
            }

            @Override
            public void cancel() {

            }
        }).setParallelism(1);

        DataStream dsMap = ds.map( e-> "prefix_" + e).setParallelism(2).startNewChain();
        dsMap.print().disableChaining();
        dsMap.addSink(new SinkFunction() {
            @Override
            public void invoke(Object value, Context context) throws Exception {
                SinkFunction.super.invoke(value, context);
                logger.info("sink, value = {}", value);
            }
        }).disableChaining();

        logger.info("getExecutionPlan = \n{}", env.getExecutionPlan());
        env.execute("Flink Streaming Java API Skeleton");
    }
}

调试与运行

在IDE里调试flink程序

模板生成的pom中有个plugin,org.eclipse.m2e:lifecycle-mapping是给eclipse用的, 可以删除掉。

如果想在IDE里运行,需要在pom文件里加上一些本地运行的依赖。在pom.xml的project里加入一个profile,加入以下依赖。并在IDE里选中这个profile。这样就可以在IDE里直接调试和运行main所在的类了。

<profiles>
    <profile>
        <id>add-dependencies-for-IDEA</id>
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
    </profile>
</profiles>

提交到flink集群运行

如果已经有一个运行的flink集群(假设jobmanager webui为http://localhost:18081/),就可以提交到flink集群去运行。

先把程序打包,运行mvn命令,将java程序以及依赖库打包成一个shade jar。

mvn clean package

用命令行提交

运行flink命令行,向flink集群提交任务。用"-m"参数指定集群机器和端口。

bin/flink run -d -m localhost:18081 ~/datastream-demo/target/datastream-demo-1.0-SNAPSHOT.jar

在flink webui提交

打开webui,点击“Submit New Job”,将生成的shade jar上传,然后提交任务。这里可以查看执行计划,并设置并行度和启动类等参数。

调整执行计划

由于默认的flink webui不能编辑执行计划,只能在java代码中通过setParallelism/startNewChain/disableChaining来调整执行计划,并将getExecutionPlan得到的json执行计划放到可视化页面查看(https://flink.apache.org/visualizer/),查看执行计划是否符合预期。

pom文件内容

最终的pom.xml文件内容如下,供参考。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.luozhucheng.flink</groupId>
    <artifactId>datastream-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>Flink Quickstart Job</name>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.1</flink.version>
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.luozhucheng.flink.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
            </plugins>
        </pluginManagement>
    </build>
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>
            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-clients_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                </dependency>
            </dependencies>
        </profile>
    </profiles>
</project>

参考

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/project-configuration



最近发表
标签列表