由于Flink无法提前预知您要使用的上下游系统,当作业需要访问不同的上下游系统来读写数据时,可能需要使用您的AccessKey信息作为访问凭证。主账号的AccessKey具备云账号下资源的全部权限,一旦泄露可能会造成严重后果。本文通过对RAM用户授予相关上下游更小化权限,并结合Flink变量功能对AccessKey进行加密,以进一步提升访问安全性。
方案说明
本方案通过减少暴露面积原则来实现更安全的AccessKey访问。即不使用主账号的AccessKey,通过主账号授权子账号的方式,让子账号具备访问某些上下游的权限或更小化权限,然后利用Flink变量功能使用AccessKey,降低AccessKey明文泄漏的风险,减少意外情况下AccessKey的泄漏导致您主账号下所有的云资源受到的相关安全风险。
本文创建了一个RAM用户,并为RAM用户授予日志服务SLS Project下指定Logstore权限,并利用变量功能,实现Flink作业读写SLS数据。
示例教程
使用阿里云账号(主账号)或RAM管理员创建RAM用户,具体操作请参见创建RAM用户。
访问方式必须勾选OpenAPI调用访问,启用后,会自动为RAM用户生成一个AccessKey ID和AccessKey Secret。
重要RAM用户的AccessKey Secret只在创建时显示,不支持查看,请务必妥善保管。
为RAM用户授予SLS相关权限。
创建自定义权限策略。
在左侧导航栏,选择 。
在权限策略页面,单击创建权限策略。
在脚本编辑页签,将配置框中的原有脚本替换为如下内容,然后单击继续编辑基本信息。
Project和Logstore需要根据您的实际情况替换,更多权限策略请参见RAM自定义授权示例。
指定Logstore只读权限
{ "Version": "1", "Statement": [ { "Action": "log:ListProject", "Resource": "acs:log:*:*:project/*", "Effect": "Allow" }, { "Action": "log:List*", "Resource": "acs:log:*:*:project/<指定的Project名称>/logstore/*", "Effect": "Allow" }, { "Action": [ "log:Get*", "log:List*" ], "Resource": "acs:log:*:*:project/<指定的Project名称>/logstore/<指定的Logstore名称>", "Effect": "Allow" } ] }
指定Logstore写入权限
输入权限策略名称和备注,单击确定。
为RAM用户添加上一步创建的自定义权限策略。具体操作,请参见为RAM用户授权。
配置变量,降低AccessKey明文泄漏的风险。
为步骤1创建RAM用户时得到的AccessKey ID和AccessKey Secret创建变量,创建后只需调用变量名使用,无需填写具体值,具体操作步骤请参见新增变量。本文针对AccessKey ID和AccessKey Secret分别创建了名为slslak和slsaks的变量。
创建Flink作业,读取SLS数据。
在SQL作业开发时以
${secret_values.变量名}
格式使用变量,避免明文AccessKey带来的安全风险,示例如下。CREATE TEMPORARY TABLE sls_input( `__source__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, `__topic__` STRING METADATA VIRTUAL, deploymentName STRING, `level`STRING, `location` STRING, message STRING, thread STRING, `time`STRING ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-beijing-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.slsak}', 'accessKey' = '${secret_values.slsaks}', 'starttime' = '2024-08-30 15:39:00', 'project' ='test', 'logstore' ='flinktest' ); CREATE TEMPORARY TABLE blackhole_sink( `__source__` STRING, `__topic__` STRING, deploymentName STRING, `level` STRING, `location` STRING, message STRING, thread STRING, `time` STRING, receive_time BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT `__source__`, `__topic__`, deploymentName, `level`, `location`, message, thread, `time`, cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;
相关文档
Flink读写OSS时,您也可以为RAM用户配置指定读写Bucket等权限策略,详情请参见RAM Policy。
SQL作业开发具体操作请参见SQL作业开发。
Flink提供了作业变量和项目变量功能,可以避免明文AccessKey、密码等信息带来的安全风险,支持在多个场景使用。详情请参见新增变量。