代码
创建maven项目
flink提供了项目模板,可以根据模板生成maven java项目。
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.13.1
stream 程序
写一个简单的stream程序,提供一个自定义source,经过一次map操作,使用自定义sink输出。并打印执行计划。
public class StreamingJob {
private static Logger logger = LoggerFactory.getLogger(StreamingJob.class);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream ds = env.addSource(new SourceFunction<Object>() {
@Override
public void run(SourceContext<Object> sourceContext) throws Exception {
while (true) {
sourceContext.collect(System.currentTimeMillis());
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
}
}).setParallelism(1);
DataStream dsMap = ds.map( e-> "prefix_" + e).setParallelism(2).startNewChain();
dsMap.print().disableChaining();
dsMap.addSink(new SinkFunction() {
@Override
public void invoke(Object value, Context context) throws Exception {
SinkFunction.super.invoke(value, context);
logger.info("sink, value = {}", value);
}
}).disableChaining();
logger.info("getExecutionPlan = \n{}", env.getExecutionPlan());
env.execute("Flink Streaming Java API Skeleton");
}
}
调试与运行
在IDE里调试flink程序
模板生成的pom中有个plugin,org.eclipse.m2e:lifecycle-mapping是给eclipse用的, 可以删除掉。
如果想在IDE里运行,需要在pom文件里加上一些本地运行的依赖。在pom.xml的project里加入一个profile,加入以下依赖。并在IDE里选中这个profile。这样就可以在IDE里直接调试和运行main所在的类了。
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
提交到flink集群运行
如果已经有一个运行的flink集群(假设jobmanager webui为http://localhost:18081/),就可以提交到flink集群去运行。
先把程序打包,运行mvn命令,将java程序以及依赖库打包成一个shade jar。
mvn clean package
用命令行提交
运行flink命令行,向flink集群提交任务。用"-m"参数指定集群机器和端口。
bin/flink run -d -m localhost:18081 ~/datastream-demo/target/datastream-demo-1.0-SNAPSHOT.jar
在flink webui提交
打开webui,点击“Submit New Job”,将生成的shade jar上传,然后提交任务。这里可以查看执行计划,并设置并行度和启动类等参数。
调整执行计划
由于默认的flink webui不能编辑执行计划,只能在java代码中通过setParallelism/startNewChain/disableChaining来调整执行计划,并将getExecutionPlan得到的json执行计划放到可视化页面查看(https://flink.apache.org/visualizer/),查看执行计划是否符合预期。
pom文件内容
最终的pom.xml文件内容如下,供参考。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.luozhucheng.flink</groupId>
<artifactId>datastream-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.1</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.luozhucheng.flink.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
参考
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/project-configuration