实时计算Flink版支持通过密钥管理服务(KMS)来对作业中的敏感配置(如数据库密码)进行加密与动态解密,保障密钥安全。本文以Flink JAR作业从RDS MySQL数据读取并写入实时计算开发控制台为例,详细介绍如何通过KMS对MySQL连接密码进行加密,并在Flink作业运行时动态解密使用。
背景信息
阿里云密钥管理服务(KMS) 是一站式密钥管理和数据加密服务平台、提供简单、可靠、安全、合规的数据加密保护和凭据管理能力。它提供简单的密码运算API,简化和抽象了密码学概念,让您可以轻松地使用API完成数据的加解密。此外KMS还提供自动密钥轮转功能,无需手动更新密钥,提高了安全性并减少了管理负担。更多优势请参见产品优势。
在实时计算场景中,Flink作业常需对接Kafka、MySQL等含敏感数据的数据源。传统硬编码或静态文件配置方式面临泄露隐患。通过集成KMS,Flink作业可以在运行时动态从KMS获取加密凭据并实时解密,确保无明文暴露。
本文以具体的场景为您介绍如何在Flink JAR作业中使用KMS,方案架构示意图如下。
前提条件
开发环境已准备。
本地已安装并配置IntelliJ IDEA等开发工具。
本地已安装3.6.3及以上版本的Maven。
已创建Flink工作空间,详情请参见开通实时计算Flink版。
已创建RDS MySQL实例,详情请参见第一步:快捷创建RDS MySQL实例与配置数据库。
已创建并启动KMS实例,并创建默认密钥,详情请参见购买和启用KMS实例和创建密钥。
RDS MySQL和KMS需要与Flink工作空间在相同地域相同VPC下,否则需要打通网络,详情请参见如何访问跨VPC的其他服务?和如何访问公网?。
(可选)步骤一:准备工作
步骤二:通过KMS加密明文密码
本文对RDS MySQL的用户密码flink_rds_password@123进行加密,加密后的密文值为a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==
。
您可以通过以下任何一种方式获取KMS加密密钥。
在线上调试界面,选择服务地址。
填写KeyId(密钥ID)和Plaintext(待加密明文)参数值。
单击发起调用。
查看加密后的取值。
详情请参见Encrypt。
对KMS实例中的密钥开启公网访问,详情请参见开启公网访问。
KMS实例中的密钥默认仅允许VPC网络访问,因此您需要手动开启公网访问。
本地配置环境变量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。参数值获取方法请参见如何查看AccessKey ID和AccessKey Secret信息?在目标IDEA项目中创建名称为EncryptFlink的类文件。
将下面内容拷贝到EncryptFlink的类文件,并根据实际业务修改配置项取值。
package org.example; import com.aliyun.kms20160120.models.EncryptResponse; import com.aliyun.kms20160120.models.EncryptResponseBody; import com.aliyun.tea.*; public class EncryptFlink { /** * <b>description</b> : * <p>使用AK&SK初始化账号Client</p> * @return Client * * @throws Exception */ public static com.aliyun.kms20160120.Client createClient() throws Exception { // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。 com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config() // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。 .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // Endpoint 请参考 https://api.aliyun.com/product/Kms config.endpoint = "kms.cn-hangzhou.aliyuncs.com"; return new com.aliyun.kms20160120.Client(config); } public static void main(String[] args_) throws Exception { java.util.List<String> args = java.util.Arrays.asList(args_); com.aliyun.kms20160120.Client client = EncryptFlink.createClient(); com.aliyun.kms20160120.models.EncryptRequest encryptRequest = new com.aliyun.kms20160120.models.EncryptRequest() .setPlaintext("flink_rds_password@123") .setKeyId("key-hzz67ab1ff4e750h****"); com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions(); try { // 复制代码运行请自行打印 API 的返回值 EncryptResponse encryptResponse = client.encryptWithOptions(encryptRequest, runtime); EncryptResponseBody body = encryptResponse.getBody(); System.out.println(body.getCiphertextBlob()); } catch (TeaException error) { // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 // 错误 message System.out.println(error.getMessage()); // 诊断地址 System.out.println(error.getData().get("Recommend")); com.aliyun.teautil.Common.assertAsString(error.message); } catch (Exception _error) { TeaException error = new TeaException(_error.getMessage(), _error); // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 // 错误 message System.out.println(error.getMessage()); // 诊断地址 System.out.println(error.getData().get("Recommend")); com.aliyun.teautil.Common.assertAsString(error.message); } } }
配置项
说明
示例
config.endpoint
KMS实例的公网地址。
kms.cn-hangzhou.aliyuncs.com
Plaintext
您要加密的明文密码。
flink_rds_password@123
KeyId
密钥的ID。
key-hzz67ab1ff4e750h****
在POM文件添加如下依赖信息。
<dependency> <groupId>com.aliyun</groupId> <artifactId>kms20160120</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea</artifactId> <version>1.3.2</version> </dependency>
本地运行EncryptFlink类文件,获取加密结果。
步骤三:在Flink JAR作业中添加KMS解密代码并打包
操作步骤
创建解密工具类文件。
在目标IDEA项目中创建名称为KmsUtil的工具类文件。
将下面代码拷贝到KmsUtil的类文件中,并根据实际业务修改配置项取值。
package org.example; import com.aliyun.kms20160120.Client; import com.aliyun.kms20160120.models.DecryptRequest; import com.aliyun.teaopenapi.models.Config; public class KmsUtil { public static String decrypt(String ak, String sk, String ciphertext) throws Exception { Client client = new Client(new Config() .setAccessKeyId(ak) .setAccessKeySecret(sk) .setEndpoint("kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com") .setCa("-----BEGIN CERTIFICATE-----\n" + "MIIDuzCCAqOgAwIBAgIJA*****--\n")); return client.decryptWithOptions( new DecryptRequest().setCiphertextBlob(ciphertext), new com.aliyun.teautil.models.RuntimeOptions() ).getBody().getPlaintext(); } }
修改JavaDemo文件。
添加AccessKey参数解析代码和KMS解密代码。
encryptedPassword取值需要替换为您步骤二获取的加密后的密文。
// 参数解析 final ParameterTool params = ParameterTool.fromArgs(args); String ak = params.get("akid"); String sk = params.get("aksecret"); // 对密码进行解密 String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA=="; String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);
在JavaDemo文件中,修改明文密码值为新增的变量。
.password("flink_rds_password@123")
修改为.password(decryptedPassword)
。
修改POM文件。
指定程序的入口点,将mainClass设置为
org.example.JavaDemo
。引入KMS相关依赖。
<dependency> <groupId>com.aliyun</groupId> <artifactId>kms20160120</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea</artifactId> <version>1.3.2</version> </dependency>
(可选)在POM文件中修改artifactId为KmsJavaDemo。
为方便区分两个JAR包名称,您可以修改artifactId取值。
在IDEA上打包。
在IDEA项目的target目录查看已经打包好的KmsJavaDemo-1.0-SNAPSHOT.jar文件。
完整的代码示例
package org.example;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.api.java.utils.ParameterTool;
public class JavaDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 参数解析
final ParameterTool params = ParameterTool.fromArgs(args);
String ak = params.get("akid");
String sk = params.get("aksecret");
// 对密码进行解密
String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);
// 构建反序列化器
DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("username", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()));
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
RowDataDebeziumDeserializeSchema deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType((RowType) dataType.getLogicalType())
.setResultTypeInfo(typeInfo)
.build();
// 数据源(com.ververica.cdc.connectors.mysql.source.MySqlSource)
MySqlSource<RowData> mySqlSource =
MySqlSource.<RowData>builder()
.hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
.port(3306)
.databaseList("school") // 设置数据库
.tableList("school.student") // 设置数据表
.username("flink_rds_user")
.password(decryptedPassword)
// 将数据初始化为RowData结构
.deserializer(deserializer)
.build();
// 将外部数据源集成到flink数据流程序
// WatermarkStrategy.noWatermarks() 指没有使用水印策略
DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
// 输出到日志stdout
mySQLSource.print();
//运行
env.execute("MySQL CDC Test");
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>KmsJavaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Flink MySQL CDC Demo</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.1</flink.version>
<flink-cdc.version>2.4.2</flink-cdc.version>
<log4j.version>2.17.1</log4j.version>
</properties>
<dependencies>
<!-- Flink 核心依赖 (打包时用 provided) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Flink MySQL CDC 连接器 -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>1.17-vvr-8.0.4-1</version>
<!-- 本地运行时注释掉下一行 -->
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.17.1</version> <!-- 与核心版本严格一致 -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.17.1</version> <!-- 必须与核心版本严格一致 -->
<scope>provided</scope>
</dependency>
<!-- 日志依赖 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>kms20160120</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译器插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version> <!-- 修正版本 -->
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 打包 Fat JAR -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<!-- 保留日志依赖 -->
<!-- <exclude>org.slf4j:*</exclude> -->
<!-- <exclude>org.apache.logging.log4j:*</exclude> -->
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/MANIFEST.MF</exclude> <!-- 新增关键过滤 -->
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.JavaDemo</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- 新增必要转换器 -->
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
步骤四:部署与启动KMS加密后的作业
上传作业JAR包
登录实时计算管理控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击文件管理。
单击上传资源,上传要部署的JAR包和数据文件。
本文上传步骤三打包好的JAR包KmsJavaDemo-1.0-SNAPSHOT.jar。
部署JAR作业
在 页面,单击部署作业,选择JAR作业。
填写部署信息。
参数
说明
示例
参数
说明
示例
部署模式
请选择部署为流模式。
流模式
部署名称
填写对应的JAR作业名称。
kmsjavademo
引擎版本
当前作业使用的Flink引擎版本。
vvr-8.0.11-flink-1.17
JAR URI
选择步骤三中上传的JAR包。
实时计算引擎VVR 8.0.6及以上版本仅支持访问开通Flink工作空间时绑定的Bucket,不支持访问其他Bucket。
KmsJavaDemo-1.0-SNAPSHOT.jar
Entry Point Class
程序的入口类。如果您的JAR包未指定主类,请在此处输入您的Endpoint Class类的标准路径。
-
Entry Point Main Arguments
填写传入参数信息,在主方法中调用该参数。
为了避免您的AK信息泄露,建议使用变量的方式填写AccessKey取值,详情请参见变量管理。本文示例变量参数名称为akid和aksecret。
AccessKey取值获取方法请参见如何查看AccessKey ID和AccessKey Secret信息?
--akid ${secret_values.akid} --aksecret ${secret_values.aksecret}
部署目标
在下拉列表中,选择目标资源队列或者Session集群(请勿生产环境中使用)。详情请参见管理资源队列和步骤一:创建Session集群。
部署到Session集群的作业不支持显示监控告警、配置监控告警和开启自动调优功能。请勿将Session集群用于正式生产环境,Session集群可以作为开发测试环境。详情请参见作业调试。
default-queue
更多配置参数详情请参见部署作业。
单击部署。
启动作业。
在作业运维页面,单击kmsjavademo旁边的启动,选择无状态启动后,单击启动。
步骤五:在Flink作业日志中查看运行结果
作业状态变为运行中后,在TaskManager中以.out结尾的日志文件中,搜索lily查看Flink计算结果。
相关文档
- 本页导读 (1)
- 背景信息
- 前提条件
- (可选)步骤一:准备工作
- 准备RDS MySQL数据源
- 运行未加密的JAR作业
- 步骤二:通过KMS加密明文密码
- 步骤三:在Flink JAR作业中添加KMS解密代码并打包
- 操作步骤
- 完整的代码示例
- 步骤四:部署与启动KMS加密后的作业
- 步骤五:在Flink作业日志中查看运行结果
- 相关文档