Flink JAR提供了更灵活的编程模型和API,可以自定义各种数据转换、操作和算子,适用于复杂的业务逻辑和数据处理需求。本文为您介绍Flink JAR作业的开发方法和调试方法。
背景信息
Flink Datastream提供了阿里云Flink全托管产品的底层API调用功能,方便您灵活地使用Flink全托管。目前实时计算Flink支持的Datastream API完全兼容开源的Flink版本,详情请参见Flink DataStream API开发指南。
Table API作业的开发方式与Datastream作业相同。
使用限制
受部署环境、网络环境等因素的影响,开发DataStream作业,需要注意以下限制:
仅支持JAR形式的作业提交和运行,支持一个主JAR包和多个附加依赖JAR包。
不支持在Main函数中读取本地配置。
产品运行环境使用的是JDK 1.8,作业开发也需要使用JDK 1.8。
支持开源Scala V2.12版本。
注意事项
Flink全托管产品开发界面上配置的参数优先级均低于作业代码中的优先级。为了保证作业的正常运行,建议优先在开发界面配置Checkpoint相关参数,请勿在作业代码中配置Checkpoint相关参数。
如果Datastream作业访问的上下游存储提供了白名单机制,则您需要进行白名单配置。配置方法请参见如何设置白名单?。
为了避免JAR包依赖冲突,您需要注意以下几点:
作业开发页面选择的Flink版本,请在Pom文件中使用相同版本的Apache Flink依赖库。如何查看Flink版本,详情请参见如何查看当前作业的Flink版本?
Flink相关依赖,scope请使用provided,即在依赖中添加
<scope>provided</scope>
。Flink相关依赖主要包括org.apache.flink
组下以flink-
开头的,非Connector的依赖。例如flink-streaming-java、flink-streaming-java_2.11和flink-java,flink-runtime等。说明建议您在作业开发时先进行依赖配置,以避免Flink依赖冲突问题,配置详情请参见如何解决Flink依赖冲突问题?
其他第三方依赖请采用Shade方式打包,Shade打包详情请参见Maven-Shade-Plugin插件打包。
Flink源代码中只有明确标注了@Public或者@PublicEvolving的才是公开供用户调用的方法,阿里云实时计算Flink版只对这些方法的兼容性做出产品保证。
如果是Flink服务内置的Connector支持的datastream api,建议使用其内置的依赖。
作业开发
JAR作业需要您在线下完成开发,再在Flink全托管控制台上部署并运行。
开发参考
Apache Flink简介,以及它的体系架构、应用程序和特性功能等,请参见Apache Flink介绍。
Apache Flink V1.17业务代码开发,请参见Flink DataStream API开发指南和Flink Table API & SQL开发指南。
Apache Flink的编码、Java语言、Scala语言、组件和格式等指南,请参见代码风格和质量指南。
Apache Flink作业的源代码项目配置,请参见Project Configuration和Datastream开发打包问题。
Flink Datastream Connector依赖,请参见Connector依赖。
Apache Flink编码过程中遇到的问题及解决方法,请参见常见问题。
OSS数据读取
如果您需要读取存储在OSS上的配置文件,可以使用以下任何一种方式。
方式 | 方法 | 具体步骤 |
方式一 | 在JAR作业中使用代码去读取存储在本地的配置文件。 |
|
方式二 | 在JAR作业中通过OSSClient直接读取OSS上的配置文件。 | 如何通过OSSClient直接读取OSS上的存储文件,详情请参见流式传输。读取OSS上的存储文件代码示例如下。
|
连接器使用
通过DataStream的方式读写数据,需要使用对应的DataStream连接器连接Flink全托管。Maven中央库中已经放置了VVR DataStream连接器,以供您在作业开发时直接使用。您可以使用以下任意一种方式来使用连接器:
请使用我们在支持的连接器中指明提供DataStream API的连接器。如果某个连接器未注明提供DataStream API,请勿自行使用,因为未来接口和参数可能会被修改。
(推荐)上传连接器Uber JAR包到Flink开发控制台,DataStream作业通过provided引用
准备DataStream作业开发环境
在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
检查您的settings.xml配置文件中是否存在
<mirrorOf>*</mirrorOf>
配置。<mirrorOf>
中包含*,表示当前mirror已经包含了所有仓库,Maven不会从上述指定的两个SNAPSHOT仓库中下载,这会导致Maven工程无法下载这两个仓库中的SNAPSHOT依赖。因此如果 settings.xml文件<mirrorOf>*</mirrorOf>
配置中包含*,您可以根据如下情况进行相应的修改:存在
<mirrorOf>*</mirrorOf>
配置,请将配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。存在
<mirrorOf>external:*</mirrorOf>
配置,请将配置改为<mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。存在
<mirrorOf>external:http:*</mirrorOf>
配置,请将配置改为<mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。
在作业的Maven POM文件中添加您需要的连接器和公共包作为项目依赖。
每个连接器版本对应的Connector类型可能不同,建议您使用最新版本。完整的依赖信息请参见MaxCompute-Demo、DataHub-Demo、Kafka-Demo或RocketMQ-Demo示例中的pom.xml文件。MaxCompute增量源表的项目依赖代码示例如下。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-continuous-odps</artifactId> <version>${connector.version}</version> <scope>provided</scope> </dependency>
除连接器外,项目还需要依赖连接器的公共包
flink-connector-base
:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency>
其中,
${flink.version}
是作业运行环境对应的Flink版本,如您的作业运行在1.17-vvr-8.0.4-1
版本引擎上,其对应的Flink版本为1.17.0
。重要您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本, 在Maven中央库(search.maven.org)上会查找不到。
在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
开发DataStream作业
DataStream连接配置信息和代码示例需要去查看对应的DataStream连接器文档,详情请参见:
打包并提交DataStream作业
使用Maven工具打包工程项目,并将生成的JAR包上传和提交到Flink全托管平台上,详细请参见部署JAR作业。
上传连接器Uber JAR包到Flink开发控制台
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击资源管理。
单击上传资源,选择您要上传的目标连接器的Uber JAR包。
您可以上传您自己开发的连接器,也可以上传实时计算Flink版提供的连接器(下载地址请参见Connector列表)。
重要因近期Maven中心仓库目录更新问题,最新版本(例如Flink 1.17-vvr-8.0.4-1)的连接器JAR包可能没有在一层目录里,您可以直接拼写URL地址下载需要依赖,例如您可以使用类似https://repo1.maven.org/maven2/com/alibaba/ververica/ververica-connector-continuous-odps/1.17-vvr-8.0.4-1/ververica-connector-continuous-odps-1.17-vvr-8.0.4-1-uber.jar这样的URL地址来下载所需的依赖。
在部署作业页面附加依赖文件项,选择目标连接器的JAR包。
直接将连接器作为项目依赖打进作业JAR包
准备DataStream作业开发环境
在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
检查您的settings.xml配置文件中是否存在
<mirrorOf>*</mirrorOf>
配置。<mirrorOf>
中包含*,表示当前mirror已经包含了所有仓库,Maven不会从上述指定的两个SNAPSHOT仓库中下载,这会导致Maven工程无法下载这两个仓库中的SNAPSHOT依赖。因此如果 settings.xml文件<mirrorOf>*</mirrorOf>
配置中包含*,您可以根据如下情况进行相应的修改:存在
<mirrorOf>*</mirrorOf>
配置,请将配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。存在
<mirrorOf>external:*</mirrorOf>
配置,请将配置改为<mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。存在
<mirrorOf>external:http:*</mirrorOf>
配置,请将配置改为<mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。
在作业的Maven POM文件中添加您需要的连接器和公共包作为项目依赖。
每个连接器版本对应的Connector类型可能不同,建议您使用最新版本。完整的依赖信息请参见MaxCompute-Demo、DataHub-Demo、Kafka-Demo或RocketMQ-Demo示例中的pom.xml文件。MaxCompute增量源表的项目依赖代码示例如下。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-continuous-odps</artifactId> <version>${connector.version}</version> </dependency>
除连接器外,项目还需要依赖连接器的公共包
flink-connector-base
:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency>
其中,
${flink.version}
是作业运行环境对应的Flink版本,如您的作业运行在1.17-vvr-8.0.4-1
版本引擎上,其对应的Flink版本为1.17.0
。重要您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本, 在Maven中央库(search.maven.org)上会查找不到。
在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
开发DataStream作业
DataStream连接配置信息和代码示例需要去查看对应的DataStream连接器文档,详情请参见:
打包并提交DataStream作业
使用Maven工具打包工程项目,并将生成的JAR包上传和提交到Flink全托管平台上,详细请参见部署JAR作业。
Maven-Shade-Plugin插件打包
通过maven-shade-plugin生成一个uber-jar,它包含所有的依赖JAR包,示例如下:
<project>
<!-- ... 其他配置 ... -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!-- 改变打包后类的包路径 -->
<relocations>
<relocation>
<pattern>com.example.oldpackage</pattern>
<shadedPattern>com.example.newpackage</shadedPattern>
</relocation>
</relocations>
<!-- 排除不必要的依赖 -->
<artifactSet>
<excludes>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<!-- 过滤排除不必要的文件 -->
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
</excludes>
</filter>
</filters>
<!-- 通过MainClass 设置一个可执行的jar包 -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.alibaba.ververica.connector.demo.flink.flinkDemoJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- ... 其他配置 ... -->
</project>
Shade打包更多详情请参见Apache Maven Shade Plugin。
作业调试
您可以使用DataGeneratorSource输入模拟数据,使用示例请参见DataGeneratorSourceTest.java。您也可以单独创建一个Flink SQL作业,使用Datagen连接器产生模拟数据写入JAR作业读取的数据源,然后由JAR作业进行消费处理。
您可以使用Print调试结果,使用示例请参见WindowWordCount.java。您需要提前配置日志输出,详情请参见配置作业日志输出。
相关文档
支持作为DataStream类型的连接器列表,请参见支持的连接器。
MaxCompute连接器的本地运行和调试方法,请参见本地运行和调试包含连接器的作业。
Flink JAR作业的完整开发流程示例,请参见Flink JAR作业快速入门。
Flink全托管还支持运行SQL和Python作业,开发方法请参见SQL作业开发和Python作业开发。
- 本页导读 (1)