基于阿里云KMS实现Flink作业数据库密码的加密与动态解密

更新时间:2025-03-10 10:01:55

实时计算Flink版支持通过密钥管理服务(KMS)来对作业中的敏感配置(如数据库密码)进行加密与动态解密,保障密钥安全。本文以Flink JAR作业从RDS MySQL数据读取并写入实时计算开发控制台为例,详细介绍如何通过KMSMySQL连接密码进行加密,并在Flink作业运行时动态解密使用。

背景信息

阿里云密钥管理服务(KMS) 是一站式密钥管理和数据加密服务平台、提供简单、可靠、安全、合规的数据加密保护和凭据管理能力。它提供简单的密码运算API,简化和抽象了密码学概念,让您可以轻松地使用API完成数据的加解密。此外KMS还提供自动密钥轮转功能,无需手动更新密钥,提高了安全性并减少了管理负担。更多优势请参见产品优势

在实时计算场景中,Flink作业常需对接Kafka、MySQL等含敏感数据的数据源。传统硬编码或静态文件配置方式面临泄露隐患。通过集成KMS,Flink作业可以在运行时动态从KMS获取加密凭据并实时解密,确保无明文暴露。

本文以具体的场景为您介绍如何在Flink JAR作业中使用KMS,方案架构示意图如下。

image

前提条件

(可选)步骤一:准备工作

准备RDS MySQL数据源

  1. 创建RDS MySQL数据库和账号。

    为目标实例创建名称为school的数据库和具有对应数据库读写权限的普通账号(flink_rds_user)。具体操作请参见第一步:快捷创建RDS MySQL实例与配置数据库

  2. 准备RDS MySQL数据源。

    1. 在目标实例详情页面,单击上方的登录数据库

    2. 在弹出的DMS页面中,填写创建的数据库账号名和密码,然后单击登录

    3. 登录成功后,在左侧双击school数据库,切换数据库。

    4. SQL Console区域编写三张业务表的建表DDL以及插入的数据语句。

      CREATE TABLE `student` (
        id INT not null primary key,
        username VARCHAR(255),
        age BIGINT
      );
        
      INSERT INTO student VALUES
      (001, 'lily', 15),
      (002, 'leilei', 18),
      (003, 'xiaoming', 17),
      (004, 'huahua', 15);
      
      SELECT * FROM student;
  3. 单击执行,单击直接执行

运行未加密的JAR作业

在对密钥进行加密之前,我们需要确保尚未对密钥进行加密的JAR文件能够正常运行。

  1. 本地开发Flink JAR作业。

    1. IDEA上创建一个新项目。

    2. 将下面代码拷贝到名称为JavaDemo的类文件和POM文件中,并根据实际业务修改配置项取值。

      JavaDemo
      POM
      package org.example;
      
      import com.ververica.cdc.connectors.mysql.source.MySqlSource;
      import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
      
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.DataTypes;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
      import org.apache.flink.table.types.DataType;
      import org.apache.flink.table.types.logical.LogicalType;
      import org.apache.flink.table.types.logical.RowType;
      import org.apache.flink.table.types.utils.TypeConversions;
      
      
      public class JavaDemo {
      
          public static void main(String[] args) throws Exception {
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              // 构建反序列化器
              DataType dataType =
                      DataTypes.ROW(
                              DataTypes.FIELD("id", DataTypes.INT()),
                              DataTypes.FIELD("username", DataTypes.STRING()),
                              DataTypes.FIELD("age", DataTypes.INT()));
      
              LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
              InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
              RowDataDebeziumDeserializeSchema deserializer =
                      RowDataDebeziumDeserializeSchema.newBuilder()
                              .setPhysicalRowType((RowType) dataType.getLogicalType())
                              .setResultTypeInfo(typeInfo)
                              .build();
      
              // 数据源(com.ververica.cdc.connectors.mysql.source.MySqlSource)
              MySqlSource<RowData> mySqlSource =
                      MySqlSource.<RowData>builder()
                              .hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
                              .port(3306)
                              .databaseList("school") // 设置数据库
                              .tableList("school.student") // 设置数据表
                              .username("flink_rds_user")
                              .password("flink_rds_password@123")
                              // 将数据初始化为RowData结构
                              .deserializer(deserializer)
                              .build();
      
              // 将外部数据源集成到flink数据流程序
              // WatermarkStrategy.noWatermarks() 指没有使用水印策略
              DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
      
              // 输出到日志stdout
              mySQLSource.print();
      
              //运行
              env.execute("MySQL CDC Test");
          }
      }
      
      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
      
        <groupId>com.aliyun</groupId>
        <artifactId>JavaDemo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <name>Flink MySQL CDC Demo</name>
      
        <properties>
          <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          <maven.compiler.source>1.8</maven.compiler.source>
          <maven.compiler.target>1.8</maven.compiler.target>
          <flink.version>1.17.1</flink.version>
          <flink-cdc.version>2.4.2</flink-cdc.version>
          <log4j.version>2.17.1</log4j.version>
        </properties>
      
        <dependencies>
          <!-- Flink 核心依赖 (打包时用 provided) -->
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
      
          <!-- Flink MySQL CDC 连接器 -->
          <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>1.17-vvr-8.0.4-1</version>
            <!-- 本地运行时注释掉下一行 -->
            <!-- <scope>provided</scope> -->
          </dependency>
      
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.17.1</version> <!-- 与核心版本严格一致 -->
            <scope>provided</scope>
          </dependency>
      
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.17.1</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.1</version> <!-- 必须与核心版本严格一致 -->
            <scope>provided</scope>
          </dependency>
      
      
          <!-- 日志依赖 -->
          <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.1</version>
            <scope>runtime</scope>
          </dependency>
          
        </dependencies>
      
        <build>
          <plugins>
            <!-- 编译器插件 -->
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.13.0</version> <!-- 修正版本 -->
              <configuration>
                <source>${maven.compiler.source}</source>
                <target>${maven.compiler.target}</target>
                <encoding>UTF-8</encoding>
              </configuration>
            </plugin>
      
            <!-- 打包 Fat JAR -->
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-shade-plugin</artifactId>
              <version>3.5.1</version>
              <executions>
                <execution>
                  <phase>package</phase>
                  <goals>
                    <goal>shade</goal>
                  </goals>
                  <configuration>
                    <artifactSet>
                      <excludes>
                        <exclude>org.apache.flink:force-shading</exclude>
                        <exclude>com.google.code.findbugs:jsr305</exclude>
                        <!-- 保留日志依赖 -->
                        <!-- <exclude>org.slf4j:*</exclude> -->
                        <!-- <exclude>org.apache.logging.log4j:*</exclude> -->
                      </excludes>
                    </artifactSet>
                    <filters>
                      <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                          <exclude>META-INF/MANIFEST.MF</exclude> <!-- 新增关键过滤 -->
                        </excludes>
                      </filter>
                    </filters>
                    <transformers>
                      <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>org.example.JavaDemo</mainClass>
                      </transformer>
                      <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- 新增必要转换器 -->
                    </transformers>
                  </configuration>
      
                </execution>
              </executions>
            </plugin>
          </plugins>
        </build>
      </project>
      
    3. 修改JavaDemo代码中的RDS MySQL实例连接信息。

      配置项

      说明

      示例值

      hostname

      RDS MySQLendpoint。

      rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com

      port

      RDS MySQL实例的端口号。

      3306

      databaseList

      RDS MySQL数据库名称。

      school

      tableList

      RDS MySQL表名称。

      school.student

      username

      RDS MySQL的用户名。

      flink_rds_user

      password

      RDS MySQL的用户密码。

      flink_rds_password@123

    4. IDEA项目中进行打包。

      IDEA项目的target目录查看已经打包好的JavaDemo-1.0-SNAPSHOT.jar文件。

  2. 实时计算开发控制台上部署、启动并查看运行结果。

    1. JavaDemo-1.0-SNAPSHOT.jar文件上传到实时计算开发控制台文件管理

    2. 部署JAR作业。

      1. 运维中心 > 作业运维页面,单击部署作业,选择JAR作业

      2. 填写部署信息。

        参数

        说明

        示例

        部署模式

        请选择部署为流模式。

        流模式

        部署名称

        填写对应的JAR作业名称。

        javademo

        引擎版本

        当前作业使用的Flink引擎版本。

        建议使用带有推荐稳定标签的版本,这些版本具有更高的可靠性和性能表现,详情请参见功能发布记录引擎版本介绍

        vvr-8.0.11-flink-1.17

        JAR URI

        选择步骤4中上传的JAR包。

        说明

        实时计算引擎VVR 8.0.6及以上版本仅支持访问开通Flink工作空间时绑定的Bucket,不支持访问其他Bucket。

        JavaDemo-1.0-SNAPSHOT.jar

        Entry Point Class

        程序的入口类。如果您的JAR包未指定主类,请在此处输入您的Endpoint Class类的标准路径。

        -

        Entry Point Main Arguments

        填写传入参数信息,在主方法中调用该参数。

        -

        部署目标

        在下拉列表中,选择目标资源队列或者Session集群(请勿生产环境中使用)。详情请参见管理资源队列步骤一:创建Session集群

        重要

        部署到Session集群的作业不支持显示监控告警、配置监控告警和开启自动调优功能。请勿将Session集群用于正式生产环境,Session集群可以作为开发测试环境。详情请参见作业调试

        default-queue

        更多配置参数详情请参见部署作业

      3. 单击部署

    3. 作业运维页面,单击javademo作业旁的启动,选择无状态启动后,单击启动

    4. 在作业日志页面,查看Flink作业结果。

      作业状态变为运行中后,在TaskManager中以.out结尾的日志文件中,搜索lily查看Flink计算结果。

      image

步骤二:通过KMS加密明文密码

本文对RDS MySQL的用户密码flink_rds_password@123进行加密,加密后的密文值为a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==

您可以通过以下任何一种方式获取KMS加密密钥。

方式一:线上通过KMS Encrypt接口获取
方式二:本地通过KMS SDK获取
  1. 线上调试界面,选择服务地址。

  2. 填写KeyId(密钥ID)和Plaintext(待加密明文)参数值。

  3. 单击发起调用

  4. 查看加密后的取值。

详情请参见Encrypt

  1. KMS实例中的密钥开启公网访问,详情请参见开启公网访问

    KMS实例中的密钥默认仅允许VPC网络访问,因此您需要手动开启公网访问。

  2. 本地配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。参数值获取方法请参见如何查看AccessKey IDAccessKey Secret信息?

  3. 在目标IDEA项目中创建名称为EncryptFlink的类文件。

  4. 将下面内容拷贝到EncryptFlink的类文件,并根据实际业务修改配置项取值。

    package org.example;
    
    import com.aliyun.kms20160120.models.EncryptResponse;
    import com.aliyun.kms20160120.models.EncryptResponseBody;
    import com.aliyun.tea.*;
    
    public class EncryptFlink {
    
        /**
         * <b>description</b> :
         * <p>使用AK&amp;SK初始化账号Client</p>
         * @return Client
         *
         * @throws Exception
         */
        public static com.aliyun.kms20160120.Client createClient() throws Exception {
            // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
            com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                    // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
                    .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                    // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
                    .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // Endpoint 请参考 https://api.aliyun.com/product/Kms
            config.endpoint = "kms.cn-hangzhou.aliyuncs.com";
            return new com.aliyun.kms20160120.Client(config);
        }
    
        public static void main(String[] args_) throws Exception {
            java.util.List<String> args = java.util.Arrays.asList(args_);
            com.aliyun.kms20160120.Client client = EncryptFlink.createClient();
            com.aliyun.kms20160120.models.EncryptRequest encryptRequest = new com.aliyun.kms20160120.models.EncryptRequest()
                    .setPlaintext("flink_rds_password@123")
                    .setKeyId("key-hzz67ab1ff4e750h****");
            com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
            try {
                // 复制代码运行请自行打印 API 的返回值
                EncryptResponse encryptResponse = client.encryptWithOptions(encryptRequest, runtime);
                EncryptResponseBody body = encryptResponse.getBody();
                System.out.println(body.getCiphertextBlob());
            } catch (TeaException error) {
                // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
                // 错误 message
                System.out.println(error.getMessage());
                // 诊断地址
                System.out.println(error.getData().get("Recommend"));
                com.aliyun.teautil.Common.assertAsString(error.message);
            } catch (Exception _error) {
                TeaException error = new TeaException(_error.getMessage(), _error);
                // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
                // 错误 message
                System.out.println(error.getMessage());
                // 诊断地址
                System.out.println(error.getData().get("Recommend"));
                com.aliyun.teautil.Common.assertAsString(error.message);
            }
        }
    }

    配置项

    说明

    示例

    config.endpoint

    KMS实例的公网地址。

    kms.cn-hangzhou.aliyuncs.com

    Plaintext

    您要加密的明文密码。

    flink_rds_password@123

    KeyId

    密钥的ID。

    key-hzz67ab1ff4e750h****

  5. POM文件添加如下依赖信息。

        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>kms20160120</artifactId>
          <version>1.2.3</version>
        </dependency>
    
        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>tea</artifactId>
          <version>1.3.2</version>
        </dependency>
  6. 本地运行EncryptFlink类文件,获取加密结果。

步骤三:Flink JAR作业中添加KMS解密代码并打包

操作步骤

  1. 创建解密工具类文件。

    1. 在目标IDEA项目中创建名称为KmsUtil的工具类文件。

    2. 将下面代码拷贝到KmsUtil的类文件中,并根据实际业务修改配置项取值。

      package org.example;
      
      import com.aliyun.kms20160120.Client;
      import com.aliyun.kms20160120.models.DecryptRequest;
      import com.aliyun.teaopenapi.models.Config;
      
      public class KmsUtil {
          public static String decrypt(String ak, String sk, String ciphertext) throws Exception {
              Client client = new Client(new Config()
                      .setAccessKeyId(ak)
                      .setAccessKeySecret(sk)
                      .setEndpoint("kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com")
                      .setCa("-----BEGIN CERTIFICATE-----\n" +
                              "MIIDuzCCAqOgAwIBAgIJA*****--\n"));
              return client.decryptWithOptions(
                      new DecryptRequest().setCiphertextBlob(ciphertext),
                      new com.aliyun.teautil.models.RuntimeOptions()
              ).getBody().getPlaintext();
          }
      }
      

      配置项

      说明

      示例值

      配置项

      说明

      示例值

      Endpoint

      kms实例的VPC地址。

      kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com

      Ca

      CA证书内容。

      您可以在密钥管理服务控制台下载目标KMS实例的CA证书到本地,详情请参见KMS实例CA证书

      -----BEGIN CERTIFICATE-----\n" + "MIIDuzCCAqOgAwIBAgIJA*****--\n

  2. 修改JavaDemo文件。

    1. 添加AccessKey参数解析代码和KMS解密代码。

      encryptedPassword取值需要替换为您步骤二获取的加密后的密文。

       // 参数解析
       final ParameterTool params = ParameterTool.fromArgs(args);
       String ak = params.get("akid");
       String sk = params.get("aksecret");
       
       // 对密码进行解密
       String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
       String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);
    2. JavaDemo文件中,修改明文密码值为新增的变量。

      .password("flink_rds_password@123")修改为.password(decryptedPassword)

  3. 修改POM文件。

    1. 指定程序的入口点,将mainClass设置为org.example.JavaDemo

    2. 引入KMS相关依赖。

          <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>kms20160120</artifactId>
            <version>1.2.3</version>
          </dependency>
      
          <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.3.2</version>
          </dependency>
    3. (可选)在POM文件中修改artifactIdKmsJavaDemo。

      为方便区分两个JAR包名称,您可以修改artifactId取值。

  4. IDEA上打包。

    IDEA项目的target目录查看已经打包好的KmsJavaDemo-1.0-SNAPSHOT.jar文件。

完整的代码示例

KmsJavaDemo
POM
package org.example;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.api.java.utils.ParameterTool;


public class JavaDemo {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       
        // 参数解析
        final ParameterTool params = ParameterTool.fromArgs(args);
        String ak = params.get("akid");
        String sk = params.get("aksecret");
        
        // 对密码进行解密
        String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
        String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);

        // 构建反序列化器
        DataType dataType =
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.INT()),
                        DataTypes.FIELD("username", DataTypes.STRING()),
                        DataTypes.FIELD("age", DataTypes.INT()));

        LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
        InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
        RowDataDebeziumDeserializeSchema deserializer =
                RowDataDebeziumDeserializeSchema.newBuilder()
                        .setPhysicalRowType((RowType) dataType.getLogicalType())
                        .setResultTypeInfo(typeInfo)
                        .build();

        // 数据源(com.ververica.cdc.connectors.mysql.source.MySqlSource)
        MySqlSource<RowData> mySqlSource =
                MySqlSource.<RowData>builder()
                        .hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
                        .port(3306)
                        .databaseList("school") // 设置数据库
                        .tableList("school.student") // 设置数据表
                        .username("flink_rds_user")
                        .password(decryptedPassword)
                        // 将数据初始化为RowData结构
                        .deserializer(deserializer)
                        .build();

        // 将外部数据源集成到flink数据流程序
        // WatermarkStrategy.noWatermarks() 指没有使用水印策略
        DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");

        // 输出到日志stdout
        mySQLSource.print();

        //运行
        env.execute("MySQL CDC Test");
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.aliyun</groupId>
    <artifactId>KmsJavaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>Flink MySQL CDC Demo</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.1</flink.version>
        <flink-cdc.version>2.4.2</flink-cdc.version>
        <log4j.version>2.17.1</log4j.version>
    </properties>
    <dependencies>
        <!-- Flink 核心依赖 (打包时用 provided) -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink MySQL CDC 连接器 -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>1.17-vvr-8.0.4-1</version>
            <!-- 本地运行时注释掉下一行 -->
            <!-- <scope>provided</scope> -->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.17.1</version> <!-- 与核心版本严格一致 -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.17.1</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.1</version> <!-- 必须与核心版本严格一致 -->
            <scope>provided</scope>
        </dependency>


        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.1</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>kms20160120</artifactId>
            <version>1.2.3</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.3.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 编译器插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.13.0</version> <!-- 修正版本 -->
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <!-- 打包 Fat JAR -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <!-- 保留日志依赖 -->
                                    <!-- <exclude>org.slf4j:*</exclude> -->
                                    <!-- <exclude>org.apache.logging.log4j:*</exclude> -->
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>META-INF/MANIFEST.MF</exclude> <!-- 新增关键过滤 -->
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.example.JavaDemo</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- 新增必要转换器 -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

步骤四:部署与启动KMS加密后的作业

  1. 上传作业JAR

    1. 登录实时计算管理控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击文件管理

    4. 单击上传资源,上传要部署的JAR包和数据文件。

      本文上传步骤三打包好的JARKmsJavaDemo-1.0-SNAPSHOT.jar。

  2. 部署JAR作业

    1. 运维中心 > 作业运维页面,单击部署作业,选择JAR作业

    2. 填写部署信息。

      参数

      说明

      示例

      参数

      说明

      示例

      部署模式

      请选择部署为流模式。

      流模式

      部署名称

      填写对应的JAR作业名称。

      kmsjavademo

      引擎版本

      当前作业使用的Flink引擎版本。

      建议使用带有推荐稳定标签的版本,这些版本具有更高的可靠性和性能表现,详情请参见功能发布记录引擎版本介绍

      vvr-8.0.11-flink-1.17

      JAR URI

      选择步骤三中上传的JAR包。

      说明

      实时计算引擎VVR 8.0.6及以上版本仅支持访问开通Flink工作空间时绑定的Bucket,不支持访问其他Bucket。

      KmsJavaDemo-1.0-SNAPSHOT.jar

      Entry Point Class

      程序的入口类。如果您的JAR包未指定主类,请在此处输入您的Endpoint Class类的标准路径。

      -

      Entry Point Main Arguments

      填写传入参数信息,在主方法中调用该参数。

      为了避免您的AK信息泄露,建议使用变量的方式填写AccessKey取值,详情请参见变量管理。本文示例变量参数名称为akidaksecret。

      AccessKey取值获取方法请参见如何查看AccessKey IDAccessKey Secret信息?

      --akid ${secret_values.akid} --aksecret ${secret_values.aksecret}

      部署目标

      在下拉列表中,选择目标资源队列或者Session集群(请勿生产环境中使用)。详情请参见管理资源队列步骤一:创建Session集群

      重要

      部署到Session集群的作业不支持显示监控告警、配置监控告警和开启自动调优功能。请勿将Session集群用于正式生产环境,Session集群可以作为开发测试环境。详情请参见作业调试

      default-queue

      更多配置参数详情请参见部署作业

    3. 单击部署

  3. 启动作业。

    在作业运维页面,单击kmsjavademo旁边的启动,选择无状态启动后,单击启动

步骤五:在Flink作业日志中查看运行结果

作业状态变为运行中后,在TaskManager中以.out结尾的日志文件中,搜索lily查看Flink计算结果。

image

相关文档

  • 本页导读 (1)
  • 背景信息
  • 前提条件
  • (可选)步骤一:准备工作
  • 准备RDS MySQL数据源
  • 运行未加密的JAR作业
  • 步骤二:通过KMS加密明文密码
  • 步骤三:在Flink JAR作业中添加KMS解密代码并打包
  • 操作步骤
  • 完整的代码示例
  • 步骤四:部署与启动KMS加密后的作业
  • 步骤五:在Flink作业日志中查看运行结果
  • 相关文档
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等