JAR作业开发

Flink DataStream提供了更灵活的编程模型和API,可以自定义各种数据转换、操作和算子,适用于复杂的业务逻辑和数据处理需求。本文为您介绍Flink JAR作业的开发方法。

支持开源Apache Flink

目前实时计算Flink支持的DataStream API完全兼容开源的Flink版本,详情请参见Apache Flink介绍Flink DataStream API开发指南

开发环境要求

  • 已安装IntelliJ IDEA等开发工具。

  • 已安装3.6.3及以上版本的Maven。

  • 作业开发仅支持JDK 8和JDK 11版本。

  • JAR作业需要您在线下完成开发,再在实时计算管理控制台上部署并运行。

开发准备

本样例涉及关于数据源连接器如何使用,请准备好相关数据源。

说明

作业开发

Maven环境配置(可选)

配置Maven的setting.xml文件。如果您在后续的操作中,对Maven中央仓库的访问存在无法拉取或速率较慢的情况,可以更换为阿里云镜像仓库。

  <mirror>
      <id>aliyunmaven</id>
        <mirrorOf>central</mirrorOf>
        <name>Aliyun Maven</name>
        <url>https://maven.aliyun.com/repository/public</url>
    </mirror>

配置Flink环境依赖

说明

为了避免JAR包依赖冲突,请您注意以下几点:

  • ${flink.version}为作业运行对应的Flink版本。请使用与作业部署页面选择的VVR引擎所使用的Flink版本一致。例如您在部署页面选择的引擎为vvr-8.0.9-flink-1.17,其对应的Flink版本为1.17.2,查看VVR引擎版本详情请参见操作指导

  • Flink相关依赖,作用域请使用provided,即在依赖中添加<scope>provided</scope>。主要包含org.apache.flink组下以flink-开头的非Connector依赖。

  • Flink源代码中只有明确标注了@Public或者@PublicEvolving的才是公开供用户调用的方法,阿里云实时计算Flink版只对这些方法的兼容性做出产品保证。

  • 如果是Flink服务内置的Connector支持的DataStream API,建议使用其内置的依赖。

下面是Flink的一些基本相关依赖,您可能还需要补充一些日志文件相关的依赖,完整的依赖参考请参见文末的完整示例代码

flink相关依赖
         <!-- Apache Flink 依赖项 -->
        <!-- 之所以提供这些依赖项,是因为它们不应该打包到JAR文件中。 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

连接器依赖和使用

通过DataStream的方式读写数据,需要使用对应的DataStream连接器连接实时计算Flink版。Maven中央仓库已经放置了VVR DataStream连接器,以供您在作业开发时直接使用。

重要

请使用我们在支持的连接器中指明提供DataStream API的连接器。如果某个连接器未注明提供给DataStream API,请勿自行使用,因为未来接口和参数可能会被修改。

您可以选择以下任意一种方式来使用连接器:

(推荐)上传连接器Uber JAR包作为附加依赖文件引入

  1. 在作业的Maven POM文件中添加您需要的连接器作为项目依赖,其作用域为provided。完整的依赖文件请参考文末的完整示例代码

    说明
    • ${vvr.version}是作业运行环境引擎版本,如您的作业运行在vvr-8.0.9-flink-1.17版本引擎上,其对应的Flink版本为1.17.2。建议您使用最新的引擎,具体版本详见引擎

    • 由于将连接器的Uber JAR包作为附加依赖文件引入,则无需将该依赖打入JAR包中,所以需要声明作用域为provided

            <!-- Kafka 连接器依赖 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
            <!-- MySQL 连接器依赖 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
  2. 如果您有开发新连接器或者拓展现有连接器功能的需求,项目还需要依赖连接器公共包flink-connector-baseververica-connector-common

            <!-- Flink 连接器公共接口基础依赖 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 阿里云连接器公共接口基础依赖 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  3. DataStream连接配置信息和代码示例需要查看对应的DataStream连接器文档。

    支持作为DataStream类型的连接器列表,请参见支持的连接器

  4. 部署作业并在附加依赖文件项中添加相应的连接器Uber JAR包,详情请参见部署JAR作业。您可以上传您自己开发的连接器,也可以上传实时计算Flink版提供的连接器(下载地址请参见Connector列表)。如图所示。

    image

直接将连接器作为项目依赖打进作业JAR包

  1. 在作业的Maven POM文件中添加您需要的连接器作为项目依赖。例如引入Kafka连接器和MySQL连接器。

    说明
    • ${vvr.version}是作业运行环境引擎版本,如您的作业运行在vvr-8.0.9-flink-1.17版本引擎上,其对应的Flink版本为1.17.2。建议您使用最新的引擎,具体版本详见引擎

    • 由于将连接器作为项目依赖直接打入JAR包,它们必须在默认作用域(compile)中。

            <!-- Kafka 连接器依赖 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
            </dependency>
            <!-- MySQL 连接器依赖 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  2. 如果您有开发新连接器或者拓展现有连接器功能的需求,项目还需要依赖连接器公共包flink-connector-baseververica-connector-common

            <!-- Flink 连接器公共接口基础依赖 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 阿里云连接器公共接口基础依赖 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  3. DataStream连接配置信息和代码示例需要查看对应的DataStream连接器文档。

    支持作为DataStream类型的连接器列表,请参见支持的连接器

OSS附加依赖文件读取

因为Flink JAR作业不支持在Main函数中读取本地配置,您可以将配置文件上传到Flink工作空间下的OSS Bucket,在部署JAR作业时,通过添加附加配置文件的方式进行读取。示例如下。

  1. 创建配置文件config.properties,避免在代码中出现明文代码。

    # Kafka 
    bootstrapServers=host1:9092,host2:9092,host3:9092
    inputTopic=topic
    groupId=groupId
    # MySQL
    database.url=jdbc:mysql://localhost:3306/my_database
    database.username=username
    database.password=password
  2. 在JAR作业中使用代码读取存储在OSS Bucket上的配置文件config.properties。

    方式一:读取工作空间绑定的OSS Bucket

    1. 实时计算开发控制台左侧导航栏资源管理页面,上传该文件。

    2. 在作业运行时,部署作业所添加附加依赖文件将会加载到作业所运行Pod的/flink/usrlib目录下。

    3. 读取该配置文件代码示例如下。

                  Properties properties = new Properties();
                  Map<String,String> configMap = new HashMap<>();
      
                  try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
                      // 加载属性文件
                      properties.load(input);
                      // 获取属性值
                      configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
                      configMap.put("inputTopic",properties.getProperty("inputTopic"));
                      configMap.put("groupId",properties.getProperty("groupId"));
                      configMap.put("url",properties.getProperty("database.url")) ;
                      configMap.put("username",properties.getProperty("database.username"));
                      configMap.put("password",properties.getProperty("database.password"));
                  } catch (IOException ex) {
                      ex.printStackTrace();
                  }

    方式二:读取工作空间有权限访问的OSS Bucket

    1. 将配置文件上传目标OSS Bucket。

    2. 通过OSSClient直接读取OSS上的存储文件详情,请参见流式传输管理访问凭据。代码示例如下。

      OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret");
      try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties");
           BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) {
          // read file and process ...
      } finally {
          if (ossClient != null) {
              ossClient.shutdown();
          }
      }

业务代码编写

  1. 将外部数据源集成到Flink数据流程序。Watermark是Flink一种基于时间语义的计算策略,往往伴随着时间戳一起使用,所以本示例不使用水印策略。详情请参考水印策略

             // 将外部数据源集成到flink数据流程序
            // WatermarkStrategy.noWatermarks() 指没有使用水印策略
            DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
  2. 算子转换处理。示例中将DataStream<String>转换成DataStream<Student>,更多复杂的算子转化和处理方式请参考Flink算子

              // 转换数据结构为student的算子
              DataStream<student> source = stream
                    .map(new MapFunction<String, student>() {
                        @Override
                        public student map(String s) throws Exception {
                            // 数据由逗号分隔
                            String[] data = s.split(",");
                            return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                        }
                    }).filter(student -> student.score >=60); // 筛选出分数大于60分的数据

作业打包

通过maven-shade-plugin插件打包。

重要
  • 如果选择作为附加依赖文件引入使用连接器,打包作业时,确认连接器相关依赖的作用域为provided

  • 如果选择连接器作为依赖一起打包,作用域默认(compile)即可。

maven-shade-plugin插件依赖参考

<build>
        <plugins>
            <!-- Java 编译器-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- 我们使用maven-shade-plugin创建一个包含所有必须依赖的fat jar -->
            <!-- 修改<mainClass>的值.如果您的程序入口点发生了改变 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <!-- 去掉一些不必要的依赖性 -->
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- 不要复制META-INF文件夹中的签名。否则,这可能会在使用JAR文件时导致安全异常 -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

作业测试及部署

  • 由于实时计算Flink版默认不具备访问公网的能力,可能您的代码无法在本地进行直接测试。建议您分开进行单元测试,详情请参见本地运行和调试包含连接器的作业

  • JAR作业部署请参见部署JAR作业

    说明
    • 部署时,如果选择方式一使用连接器打包的作业,切记需要上传添加连接器相关的Uber JAR包。

    • 如果需要读取配置文件,也需要在附加依赖文件中上传添加。

    image

完整示例代码

本示例代码中,将Kafka数据源的数据进行处理后写入MySQL。此示例仅供参考,更多的代码风格和质量指南请参见代码风格和质量指南

FlinkDemo.java

package com.aliyun;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class FlinkDemo {
    // 定义数据结构
    public static class Student {
        public int id;
        public String name;
        public int score;

        public Student(int id, String name, int score) {
            this.id = id;
            this.name = name;
            this.score = score;
        }
    }

    public static void main(String[] args) throws Exception {
        // 创建Flink执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        Map<String,String> configMap = new HashMap<>();

        try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
            // 加载属性文件
            properties.load(input);
            // 获取属性值
            configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
            configMap.put("inputTopic",properties.getProperty("inputTopic"));
            configMap.put("groupId",properties.getProperty("groupId"));
            configMap.put("url",properties.getProperty("database.url")) ;
            configMap.put("username",properties.getProperty("database.username"));
            configMap.put("password",properties.getProperty("database.password"));
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        // Build Kafka source
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                        .setBootstrapServers(configMap.get("bootstrapServers"))
                        .setTopics(configMap.get("inputTopic"))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(configMap.get("groupId"))
                        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                        .build();

        // 将外部数据源集成到flink数据流程序
        // WatermarkStrategy.noWatermarks() 指没有使用水印策略
        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");

        // 筛选出分数大于60分的数据
        DataStream<Student> source = stream
                .map(new MapFunction<String, Student>() {
                    @Override
                    public Student map(String s) throws Exception {
                        String[] data = s.split(",");
                        return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                    }
                }).filter(Student -> Student.score >=60);

        source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
                new JdbcStatementBuilder<Student>() {
                    public void accept(PreparedStatement ps, Student data) {
                        try {
                            ps.setInt(1, data.id);
                            ps.setString(2, data.name);
                            ps.setInt(3, data.score);
                        } catch (SQLException e) {
                            throw new RuntimeException(e);
                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5) // 每次批量写入的记录数
                        .withBatchIntervalMs(2000) // 重试时的最大延迟时间(毫秒)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(configMap.get("url"))
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername(configMap.get("username"))
                        .withPassword(configMap.get("password"))
                        .build()
        )).name("Sink MySQL");

        env.execute("Flink Demo");
    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>FlinkDemo</name>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
        <target.java.version>1.8</target.java.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.14.1</log4j.version>
    </properties>
    <dependencies>
        <!-- Apache Flink 依赖项 -->
        <!-- 之所以提供这些依赖项,是因为它们不应该打包到JAR文件中。 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- 在这里添加连接器依赖项。它们必须在默认作用域(compile)中。 -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <!-- 添加日志框架,以便在运行时生成控制台输出 -->
        <!-- 默认情况下,这些依赖项从应用程序JAR中排除 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Java 编译器-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- 我们使用maven-shade-plugin创建一个包含所有必须依赖的 fat jar -->
            <!-- 修改<mainClass>的值.如果您的程序入口点发生了改变 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <!-- 去掉一些不必要的依赖性 -->
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- 不要复制META-INF文件夹中的签名。否则,这可能会在使用JAR文件时导致安全异常 -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

相关文档