JAR作业开发
本文为您介绍Flink全托管DataStream API开发的限制说明和开发方法。
背景信息
Flink Datastream提供了阿里云Flink全托管产品的底层API调用功能,方便您灵活地使用Flink全托管。目前实时计算Flink支持的Datastream API完全兼容开源的Flink版本,详情请参见Flink DataStream API开发指南。
Table API作业的开发方式与Datastream作业相同。
使用限制
由于Flink全托管产品受部署环境、网络环境等因素的影响,所以开发Flink全托管DataStream作业,需要注意以下限制:
仅支持JAR形式的作业提交和运行。
支持一个主JAR包和多个附加依赖JAR包。
不支持在Main函数中读取本地配置。
Flink全托管产品开发界面上配置的参数优先级均低于作业代码中的优先级。为了保证作业的正常运行,建议优先在开发界面配置Checkpoint相关参数,请勿在作业代码中配置Checkpoint相关参数。
产品运行环境使用的是JDK 1.8,作业开发也需要使用JDK 1.8。
支持开源Scala V2.11版本。
注意事项
如果Datastream作业访问的上下游存储提供了白名单机制,则您需要进行白名单配置。配置方法请参见如何设置白名单?。
为了避免JAR包依赖冲突,您需要注意以下几点:
作业开发页面选择的Flink版本,请在Pom文件中使用相同版本的Apache Flink依赖库。如何查看Flink版本,详情请参见如何查看当前作业的Flink版本?
Flink相关依赖,scope请使用provided,即在依赖中添加
<scope>provided</scope>
。Flink相关依赖主要包括org.apache.flink
组下以flink-
开头的,非Connector的依赖。例如flink-streaming-java、flink-streaming-java_2.11和flink-java,flink-runtime等。说明建议您在作业开发时先进行依赖配置,以避免Flink依赖冲突问题,配置详情请参见如何解决Flink依赖冲突问题?
其他第三方依赖请采用Shade方式打包,Shade打包详情请参见Apache Maven Shade Plugin。
Flink源代码中只有明确标注了@Public或者@PublicEvolving的才是公开供用户调用的方法,阿里云只对这些方法的兼容性做出产品保证。
作业开发
您需要在线下完成作业开发后,再在Flink全托管控制台上提交作业到集群上运行。您在编写Flink全托管产品业务代码时,可以参见以下文档:
Apache Flink作业的源代码项目配置,请参见Project Configuration和Datastream开发打包问题。
Flink Datastream Connector依赖,请参见Connector依赖。
Connector的使用方法,请参见Connector使用。
Apache Flink简介,以及它的体系架构、应用程序和特性功能等,请参见Apache Flink介绍。
Apache Flink V1.10业务代码开发,请参见Flink DataStream API开发指南和Flink Table API & SQL开发指南。
Apache Flink的编码、Java语言、Scala语言、组件和格式等指南,请参见代码风格和质量指南。
Apache Flink编码过程中遇到的问题及解决方法,请参见常见问题。
如果读取存储在OSS上的配置文件,可以使用以下任何一种方式。
方式
方法
具体步骤
方式一
在JAR作业中使用代码去读取存储在本地的配置文件。
在Flink全托管开发控制台左侧资源管理页面,上传该文件。
上传的附件依赖文件会固定被保存在oss://ossBucketName/artifacts/namespaces/namespaceName/<file> 目录,也会被下载到目标机器,在作业运行时,加载到JM和TM所在Pod的/flink/usrlib目录下。
在JAR作业中使用如下代码去读取存储在本地的配置文件。
try (BufferedReader reader = new BufferedReader(new FileReader("/flink/usrlib/yourFile"))) { // read file and process ... }
部署作业并配置附加依赖文件,详情请参见部署JAR作业。
方式二
在JAR作业中通过OSSClient直接读取OSS上的存储文件。
如何通过OSSClient直接读取OSS上的存储文件,详情请参见流式传输。读取OSS上的存储文件代码示例如下。
OSS ossClient = new OSSClientBuilder().build("yourEndpoint", "yourAccessKeyId", "yourAccessKeySecret"); try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/exampleobject.txt"); BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) { // read file and process ... } finally { if (ossClient != null) { ossClient.shutdown(); } }
作业调试
您可以使用DataGeneratorSource输入模拟数据,使用示例请参见DataGeneratorSourceTest.java。您也可以单独创建一个Flink SQL作业,使用Datagen连接器产生模拟数据写入JAR作业读取的数据源,然后由JAR作业进行消费处理。
您可以使用Print调试结果,使用示例请参见WindowWordCount.java。您需要提前配置日志输出,详情请参见配置作业日志输出。
Connector使用
Maven中央库中已经放置了VVR Connector,以供您在作业开发时直接使用。您可以使用以下任意一种方式来使用Connector:
目前中央仓库暂未提供RocketMQ Connector,您可以直接在此下载以下版本的RocketMQ Connector JAR包:
ververica-connector-mq-1.13-vvr-4.0.15.jar
您需要将VVR 4.x版本升级到最新小版本VVR 4.0.15。
(推荐)直接将Connector作为项目依赖打进作业JAR包。
在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
检查您的settings.xml配置文件中是否存在
<mirrorOf>*</mirrorOf>
配置。如果存在
<mirrorOf>*</mirrorOf>
配置,则需要将此配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。修改的目的是为了避免SNAPSHOT仓库被覆盖,因为mirrorOf中只使用星号(*)会导致第一步中配置的两个repository被覆盖。在作业的Maven POM文件中添加您需要的Connector作为项目依赖,示例如下。
<dependencies> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>${connector.type}</artifactId> <version>${connector.version}</version> </dependency> </dependencies>
每个Connector版本对应的Connector类型可能不同,建议您使用最新版本。
重要您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本, 在Maven中央库(search.maven.org)上会查找不到。
在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
上传Connector JAR包到Flink全托管控制台后,填写配置信息。
登录实时计算控制台。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击资源管理。
单击上传资源,选择您要上传的目标Connector的JAR包。
您可以上传您自己开发的Connector,也可以上传Flink全托管产品提供的Connector。
在部署作业页面附加依赖文件项,选择目标Connector的JAR包。