文档

JAR作业开发

更新时间:

Flink JAR提供了更灵活的编程模型和API,可以自定义各种数据转换、操作和算子,适用于复杂的业务逻辑和数据处理需求。本文为您介绍Flink JAR作业的开发方法和调试方法。

背景信息

Flink Datastream提供了阿里云Flink全托管产品的底层API调用功能,方便您灵活地使用Flink全托管。目前实时计算Flink支持的Datastream API完全兼容开源的Flink版本,详情请参见Flink DataStream API开发指南

说明

Table API作业的开发方式与Datastream作业相同。

使用限制

受部署环境、网络环境等因素的影响,开发DataStream作业,需要注意以下限制:

  • 仅支持JAR形式的作业提交和运行,支持一个主JAR包和多个附加依赖JAR包。

  • 不支持在Main函数中读取本地配置。

  • 产品运行环境使用的是JDK 1.8,作业开发也需要使用JDK 1.8。

  • 支持开源Scala V2.12版本。

注意事项

  • Flink全托管产品开发界面上配置的参数优先级均低于作业代码中的优先级。为了保证作业的正常运行,建议优先在开发界面配置Checkpoint相关参数,请勿在作业代码中配置Checkpoint相关参数。

  • 如果Datastream作业访问的上下游存储提供了白名单机制,则您需要进行白名单配置。配置方法请参见如何设置白名单?

  • 为了避免JAR包依赖冲突,您需要注意以下几点:

    • 作业开发页面选择的Flink版本,请在Pom文件中使用相同版本的Apache Flink依赖库。如何查看Flink版本,详情请参见如何查看当前作业的Flink版本?

    • Flink相关依赖,scope请使用provided,即在依赖中添加<scope>provided</scope>。Flink相关依赖主要包括org.apache.flink组下以flink-开头的,非Connector的依赖。例如flink-streaming-java、flink-streaming-java_2.11和flink-java,flink-runtime等。

      说明

      建议您在作业开发时先进行依赖配置,以避免Flink依赖冲突问题,配置详情请参见如何解决Flink依赖冲突问题?

    • 其他第三方依赖请采用Shade方式打包,Shade打包详情请参见Maven-Shade-Plugin插件打包

    • Flink源代码中只有明确标注了@Public或者@PublicEvolving的才是公开供用户调用的方法,阿里云实时计算Flink版只对这些方法的兼容性做出产品保证。

    • 如果是Flink服务内置的Connector支持的datastream api,建议使用其内置的依赖。

作业开发

说明

JAR作业需要您在线下完成开发,再在Flink全托管控制台上部署并运行。

开发参考

OSS数据读取

如果您需要读取存储在OSS Bucket上的配置文件,可以使用以下任何一种方式。

方式

方法

适用范围

具体步骤

方式一

在JAR作业中使用代码去读取存储在本地的配置文件。

仅支持读取您工作空间绑定的OSS Bucket。

  1. 实时计算开发控制台左侧资源管理页面,上传该文件。

    在作业运行时,资源管理中的文件会被下载到目标机器,加载到JM和TM所在Pod的/flink/usrlib目录下。

  2. 在JAR作业中使用如下代码去读取存储在本地的配置文件。

    try (BufferedReader reader = new BufferedReader(new FileReader("/flink/usrlib/yourFile"))) {
        // read file and process ...
    }
  3. 部署作业并配置附加依赖文件,详情请参见部署JAR作业

方式二

在JAR作业中通过OSSClient直接读取OSS上的配置文件。

支持读取Flink有权限访问的OSS Bucket。

如何通过OSSClient直接读取OSS上的存储文件,详情请参见流式传输。读取OSS上的存储文件代码示例如下。

OSS ossClient = new OSSClientBuilder().build("yourEndpoint", "yourAccessKeyId", "yourAccessKeySecret");
try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/exampleobject.txt");
     BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) {
    // read file and process ...
} finally {
    if (ossClient != null) {
        ossClient.shutdown();
    }
}

连接器使用

通过DataStream的方式读写数据,需要使用对应的DataStream连接器连接Flink全托管。Maven中央库中已经放置了VVR DataStream连接器,以供您在作业开发时直接使用。您可以使用以下任意一种方式来使用连接器:

重要

请使用我们在支持的连接器中指明提供DataStream API的连接器。如果某个连接器未注明提供DataStream API,请勿自行使用,因为未来接口和参数可能会被修改。

(推荐)上传连接器Uber JAR包到Flink开发控制台,DataStream作业通过provided引用

  1. 准备DataStream作业开发环境

    1. 在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。

      <repositories>
        <repository>
          <id>oss.sonatype.org-snapshot</id>
          <name>OSS Sonatype Snapshot Repository</name>
          <url>http://oss.sonatype.org/content/repositories/snapshots</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
        <repository>
          <id>apache.snapshots</id>
          <name>Apache Development Snapshot Repository</name>
          <url>https://repository.apache.org/content/repositories/snapshots/</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
      </repositories>
    2. 检查您的settings.xml配置文件中是否存在<mirrorOf>*</mirrorOf>配置。

      <mirrorOf>中包含*,表示当前mirror已经包含了所有仓库,Maven不会从上述指定的两个SNAPSHOT仓库中下载,这会导致Maven工程无法下载这两个仓库中的SNAPSHOT依赖。因此如果 settings.xml文件<mirrorOf>*</mirrorOf>配置中包含*,您可以根据如下情况进行相应的修改:

      • 存在<mirrorOf>*</mirrorOf>配置,请将配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

      • 存在<mirrorOf>external:*</mirrorOf>配置,请将配置改为<mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

      • 存在<mirrorOf>external:http:*</mirrorOf>配置,请将配置改为<mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

    3. 在作业的Maven POM文件中添加您需要的连接器和公共包作为项目依赖。

      每个连接器版本对应的Connector类型可能不同,建议您使用最新版本。完整的依赖信息请参见MaxCompute-DemoDataHub-DemoKafka-DemoRocketMQ-Demo示例中的pom.xml文件。MaxCompute增量源表的项目依赖代码示例如下。

      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-continuous-odps</artifactId>
          <version>${connector.version}</version>
          <scope>provided</scope>
      </dependency>

      除连接器外,项目还需要依赖连接器的公共包flink-connector-base

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-base</artifactId>
          <version>${flink.version}</version>
      </dependency>

      其中,${flink.version}是作业运行环境对应的Flink版本,如您的作业运行在1.17-vvr-8.0.4-1版本引擎上,其对应的Flink版本为1.17.0

      重要
      • 您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本, 在Maven中央库(search.maven.org)上会查找不到。

      • 在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。

        <transformers>
            <!-- The service transformer is needed to merge META-INF/services files -->
            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                <projectName>Apache Flink</projectName>
                <encoding>UTF-8</encoding>
            </transformer>
        </transformers>
  2. 开发DataStream作业

    DataStream连接配置信息和代码示例需要去查看对应的DataStream连接器文档,详情请参见:

  3. 打包并提交DataStream作业

    使用Maven工具打包工程项目,并将生成的JAR包上传和提交到Flink全托管平台上,详细请参见部署JAR作业

  4. 上传连接器Uber JAR包到Flink开发控制台

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击资源管理

    4. 单击上传资源,选择您要上传的目标连接器的Uber JAR包。

      您可以上传您自己开发的连接器,也可以上传实时计算Flink版提供的连接器(下载地址请参见Connector列表)。

      重要

      因近期Maven中心仓库目录更新问题,最新版本(例如Flink 1.17-vvr-8.0.4-1)的连接器JAR包可能没有在一层目录里,您可以直接拼写URL地址下载需要依赖,例如您可以使用类似https://repo1.maven.org/maven2/com/alibaba/ververica/ververica-connector-continuous-odps/1.17-vvr-8.0.4-1/ververica-connector-continuous-odps-1.17-vvr-8.0.4-1-uber.jar这样的URL地址来下载所需的依赖。

    5. 部署作业页面附加依赖文件项,选择目标连接器的JAR包。

直接将连接器作为项目依赖打进作业JAR包

  1. 准备DataStream作业开发环境

    1. 在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。

      <repositories>
        <repository>
          <id>oss.sonatype.org-snapshot</id>
          <name>OSS Sonatype Snapshot Repository</name>
          <url>http://oss.sonatype.org/content/repositories/snapshots</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
        <repository>
          <id>apache.snapshots</id>
          <name>Apache Development Snapshot Repository</name>
          <url>https://repository.apache.org/content/repositories/snapshots/</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
      </repositories>
    2. 检查您的settings.xml配置文件中是否存在<mirrorOf>*</mirrorOf>配置。

      <mirrorOf>中包含*,表示当前mirror已经包含了所有仓库,Maven不会从上述指定的两个SNAPSHOT仓库中下载,这会导致Maven工程无法下载这两个仓库中的SNAPSHOT依赖。因此如果 settings.xml文件<mirrorOf>*</mirrorOf>配置中包含*,您可以根据如下情况进行相应的修改:

      • 存在<mirrorOf>*</mirrorOf>配置,请将配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

      • 存在<mirrorOf>external:*</mirrorOf>配置,请将配置改为<mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

      • 存在<mirrorOf>external:http:*</mirrorOf>配置,请将配置改为<mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

    3. 在作业的Maven POM文件中添加您需要的连接器和公共包作为项目依赖。

      每个连接器版本对应的Connector类型可能不同,建议您使用最新版本。完整的依赖信息请参见MaxCompute-DemoDataHub-DemoKafka-DemoRocketMQ-Demo示例中的pom.xml文件。MaxCompute增量源表的项目依赖代码示例如下。

      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-continuous-odps</artifactId>
          <version>${connector.version}</version>
      </dependency>

      除连接器外,项目还需要依赖连接器的公共包flink-connector-base

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-base</artifactId>
          <version>${flink.version}</version>
      </dependency>

      其中,${flink.version}是作业运行环境对应的Flink版本,如您的作业运行在1.17-vvr-8.0.4-1版本引擎上,其对应的Flink版本为1.17.0

      重要
      • 您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本, 在Maven中央库(search.maven.org)上会查找不到。

      • 在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。

        <transformers>
            <!-- The service transformer is needed to merge META-INF/services files -->
            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                <projectName>Apache Flink</projectName>
                <encoding>UTF-8</encoding>
            </transformer>
        </transformers>
  2. 开发DataStream作业

    DataStream连接配置信息和代码示例需要去查看对应的DataStream连接器文档,详情请参见:

  3. 打包并提交DataStream作业

    使用Maven工具打包工程项目,并将生成的JAR包上传和提交到Flink全托管平台上,详细请参见部署JAR作业

Maven-Shade-Plugin插件打包

通过maven-shade-plugin生成一个uber-jar,它包含所有的依赖JAR包,示例如下:

<project>
    <!-- ... 其他配置 ... -->

    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <!-- 改变打包后类的包路径 -->
                        <relocations>
                            <relocation>
                                <pattern>com.example.oldpackage</pattern>
                                <shadedPattern>com.example.newpackage</shadedPattern>
                            </relocation>
                        </relocations>

                        <!-- 排除不必要的依赖 -->
                        <artifactSet>
                            <excludes>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>org.apache.logging.log4j:*</exclude>
                            </excludes>
                        </artifactSet>

                        <!-- 过滤排除不必要的文件 -->
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
 
                        <!-- 通过MainClass 设置一个可执行的jar包 -->
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.alibaba.ververica.connector.demo.flink.flinkDemoJob</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
    </build>

    <!-- ... 其他配置 ... -->
</project>

Shade打包更多详情请参见Apache Maven Shade Plugin

作业调试

相关文档