首页 级联资源转资源组同步方案

级联资源转资源组同步方案

更新时间: 2025-08-21 13:34:19

现代化治理方案。像ECS这样的产品,其关联资源如云盘、安全组、快照资源,当ECS发生转组时需要把这些关联资源也一并转组。需求来源:中兴通讯

方案概述

当企业有多个云账号,并且使用了资源组来做资源分类,用于对资源的分权与分账。当客户变更ECS的资源组之后,这台ECS所关联的资源(如磁盘快照)目前是无法跟随转组的。

本文档介绍了一种通过操作审计(ActionTrail)结合事件总线(EventBridge)和函数计算为企业提供一种实现方案,文中会以特定的一个场景来演示操作步骤。客户可以参考相关配置来适配自身的业务需求。

方案优势

统一管理资源变更

当企业有多个云账号内的ECS资源都需要处理级联资源转组操作。只需要在一个账号(推荐:运维账号)中配置。统一维护代码与配置。

可扩展处理转组规则

当前1.0版本的方案只支持ECS资源对应磁盘的快照实现同步转组操作。后续随着企业业务发展,还可以扩展更多的级联资源同步转组。

客户场景

ECS转资源组关联的云盘快照一并转组

场景描述

企业会使用资源组来做分账分权,当把某台ECS做了一次资源组转移之后,企业也希望能够把这个ECS所关联的云盘快照也一并转组,便于成本分摊与权限管理。

适用客户

  • 使用资源目录管理云上多个账号的企业客户。

  • 使用了资源组的企业客户。

方案架构

架构说明

  • 通过资源目录将企业多个云账号统一纳管,建立账号树。便于在资源管理账号中统一变更各成员账号内的资源分组。

  • 在各个成员账号中配置事件总线,选择事件投递目标为“运维账号”,事件模式选择特定的过滤规则(只过滤ECS类型的事件)。

  • 在“运维账号”内配置事件总线,将对应的事件投递到函数计算;函数计算解析相应的事件对象,并做相应处理,调用各成员账号内OpenApi完成对资源转组。

  • 在本方案中提供函数计算里面的代码为Python3.X。

注意:本方案推荐将函数计算集群部署在运维账号。但有些客户可能还没有运维账号,那也可以放在其他非业务账号(比如日志账号或共享服务账号等)。

产品费用及名词

产品费用

产品名称

产品说明

产品费用

资源目录RD

资源目录RD(Resource Directory)是阿里云面向企业客户提供的一套多级账号和资源关系管理服务。

免费,详情参见产品定价

操作审计

ActionTrail

操作审计(ActionTrail)帮助您监控并记录阿里云账号的活动,包括通过阿里云控制台、OpenAPI、开发者工具对云上产品和服务的访问和使用行为。您可以将这些行为事件下载或保存到日志服务SLS或对象存储OSS,然后进行行为分析、安全分析、资源变更行为追踪和行为合规性审计等操作

操作审计目前不收取任何费用。后续将针对部分功能陆续收费。详情见官网

事件总线

EventBridge

事件总线EventBridge是阿里云提供的一款无服务器事件总线服务,能够以标准化的CloudEvents 1.0协议在应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。

公测期间产品免费。

函数计算

阿里云函数计算是事件驱动的全托管计算服务。通过函数计算,您无需管理服务器等基础设施,只需编写代码并上传。函数计算会为您准备好计算资源,以弹性、可靠的方式运行您的代码,并提供日志查询、性能监控、报警等功能。

收费,详情参见计费概述

名词解释

名称

说明

企业管理主账号

在企业拥有多个阿里云账号时,特指拥有管理其他账号资源权限的管理员账号。用于管理多账号,统一配置多账号身份权限,统一查看各云账号账单,统一配置审计规则并下发到各成员账号。

CloudEvents 1.0

用标准方式描述事件数据的开源规范,旨在简化事件声明以及跨服务、跨平台的消息投递。

事件模式

对事件进行过滤的模块。事件模式支持对CloudEvents包含data在内的所有字段进行过滤。

事件目标

事件的处理终端,负责消费事件。

安全性

跨账号路由事件

同组织或关联组织下不同的阿里云账号通常需要进行事件互通,事件总线EventBridge提供跨账号路由事件的能力,您可以将多个账号的事件路由到一个账号中统一处理。授权说明及操作详情参见跨账号路由事件

资源管理账号分配AK/SK

借助资源目录在资源管理账号中对各成员账号的统一管理能力,程序中只需要分配一把AK/SK就可以实现对多个成员账号的统一管理。需要在资源管理账号中分配一把AK/SK,具备STS权限。程序里面的AK配置就是用这把。

权限策略名称

说明

AliyunSTSAssumeRoleAccess

调用STS服务AssumeRole接口的权限

运维账号操作权限

运维账号里面需要配置事件总线、函数计算。参照按需为用户分配最小权限。此方案中授权配置如下:

权限策略名称

说明

AliyunEventBridgeFullAccess

事件总线EventBridge的管理权限,等同于阿里云账号的权限,被授予该权限的RAM用户可执行发布事件和控制台所有操作。

AliyunFcDefaultRole

用于FC服务角色的授权策略。

AliyunRDSFullAccess

管理云数据库服务(RDS)的权限

代码中的敏感数据

本方案中用到了RDS数据库,AK/SK等敏感数据,对敏感数据建议不要硬编码在代码中。而是推荐放在函数计算的环境变量里面。

参数说明

环境变量

定义

SCENE

用于标识场景,有两个可选项

  • ma : 表示的是通过资源管理账号,批量管理其成员账号。

  • member:表示的是单个成员账号的管理。

AK

有两种场景

1、当SCENE的值为ma的时候,那这把AK就是资源管理账号里面的具备STS权限的AK。

2、当SCENE的值为member的时候,那这把AK就是单个成员账号的AK具备ECS管理权限。

MYSQL_XXX

以MYSQL_作前缀的参数,填写的是函数计算所依赖的幂等表所在的数据库实例信息。包括数据库连接地址、数据库名称、账号与密码等

REGION

代表资源所在的Region

WHITE_LIST

白名单列表。如果不填,那表示全部。如果填写,则只会匹配白名单里面的账号ID才会触发执行。

格式:["xxxx" , "xxxxx"]

注意事项

不同Region资源变更需要配置多个事件总线

目前事件总线只支持相同Region资源变更事件订阅。如果用户需要订阅多个Region的资源变更,那就需要配置多个事件总线。

函数计算无法访问到RDS数据库

ECS实例和RDS实例需要位于同一地域才能内网互通。可以是同一地域的相同或不同可用区。具体排查可以查看帮助文档

运维账号里面的角色跟成员账号需要匹配

每个成员账号里面的事件总线需要跨账号投递到运维账号里面。需要在运维账号里面为每个成员账号依次分配相应的角色信息策略。角色的信任策略如下:

{
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "{accountid}@eventbridge.aliyuncs.com",
          "{accountid}@eventbridge.aliyuncs.com"
        ]
      }
    }
  ],
  "Version": "1"
}

实施步骤

实施准备

  • 确保已在「企业管理账号」中开通资源目录,并且将其他账号邀请到资源目录中。具体操作,请参见Landing Zone搭建概述

  • 确保已在「企业管理账号」中开启多账号的统一日志投递。具体操作,请参见云治理中心-统一投递审计日志投递审计日志。

  • 确保在各个成员账号中开通事件总线并且完成授权。具体操作,请参见开通事件总线

  • 确保在「运维账号」中已开通了日志服务,用于收集函数计算运行过程中日志。

  • 提前在「运维账号」中规划好保存RDS用到的VPC及vswitch资源,并且提前创建好安全组给到函数计算用。

  • 确保在「运维账号」中开通事件总线及函数计算、配置相应的VPC网络及安全组用于配置数据库及函数计算资源。创建相应的数据库,相应的数据表及授权。具体操作,请参见开通事件总线

实施场景

某客户多账号架构如下:

基于这张架构图,可以实现的效果:

  • 业务账号B、C里面对ECS或云磁盘做资源转组操作,会自动将磁盘对应的快照资源一并转组。

为了简化操作过程,整个操作仅在业务账号B里面进行。

在业务账号B里面有以下磁盘及磁盘快照:

ECS实例ID

磁盘ID

快照ID

当前资源组

目标资源组

i-wz96p5svu7v0su0n1klm

d-wz9im0hpk8ez3co4k7lu

s-wz934ufqe1w40mvp2of2

resource4

resource3

s-wz90kj4wuuuvckwfgy02

目前云产品已经支持部分级联资源转组,如当ECS发生转组之后,会自动将其关联的磁盘也一并完成转组。但磁盘这块目前还不具备这个能力。

以下操作会实现当把磁盘由当前资源组转移到目标资源组之后,其关联的两个快照资源也一并转移。

数据规划

本文以订阅ECS磁盘变更事件,为您介绍通过操作审计结合事件总线,函数计算实现过滤特定变更事件。相关数据规划如下表:

云服务

参数

示例

事件总线

规则名称

eb-filter-ecs-disk

函数计算

服务

eb_event_action

服务的系统模板权限

AliyunFCDefaultRolePolicy

函数

eb_event_trigger

触发器

ConfigurationItemChangeTrigger

操作流程

通过配置跨账号事件总线投递将账号内资源变更事件投递到事件总线,触发函数计算完成对事件内容处理过滤,并完成数据入库完整的操作流程如下图所示:

操作步骤

本操作步骤中提到的账号均以上面「某客户多账号架构」为指引。

  1. 登录运维账号A,新建一个RAM角色支持事件总线跨账号路由。角色名称:b-account-eb-role

具体操作,请参见 跨云账号路由事件。通过这一步操作之后,业务账号B的事件总线EventBridge就可以扮演运维账号A里面的RAM角色。

这个角色信任策略:

{
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "业务账号UID@eventbridge.aliyuncs.com",
          "业务账号UID@eventbridge.aliyuncs.com"
        ]
      }
    }
  ],
  "Version": "1"
}

这个角色权限:AliyunEventBridgePutEventsPolicy

  1. 登录业务账号B,配置事件总线:

    1. 登录事件总线控制台

    2. 在左侧导航栏,单击事件总线。

    3. 在顶部菜单栏,选择地域,例如:华南1(深圳)。

    4. 选择 default 进入这个默认事件配置页。

    5. 在 事件规则 页面,单击 创建规则。

    6. 创建规则页面,完成以下操作。

  2. 配置基本信息配置向导,在名称文本框输入规则名称(eb-filter-ecs-disk),在描述文本框输入规则的描述,然后单击下一步

  3. 配置事件模式配置向导,事件源类型选择阿里云官方事件源事件源选择acs.ecs 访问控制事件类型选择ecs:ActionTrail:AliyunServiceEvent,ecs:ActionTrail:ConsoleOperation,ecs:ActionTrail:ApiCall 。在事件模式内容代码框输入事件模式如下,然后点下一步。

{
    "source": [
        "acs.ecs"
    ],
    "type": [
        "ecs:ActionTrail:AliyunServiceEvent",
        "ecs:ActionTrail:ConsoleOperation",
        "ecs:ActionTrail:ApiCall"
    ]
}
  1. 配置事件目标配置向导,配置以下参数,然后单击创建

  • 服务类型:单击事件总线

  • 目标账户类型:默认选择其他阿里云账号

  • 账号ID:输入接收账号的ID。在本场景中配置「运维账号UID」。

  • 总线名称:输入default。

  • 角色名称:输入 「b-account-eb-role」。【这一步很关键】

  • 事件:默认选择完整事件,不做转换,直接投递原生CloudEvents 1.0协议中的完整结构。

  1. 登录运维账号A,配置函数计算 (需要提前把RDS相关的VPC、安全组都配置好):

    1. 新建服务。

  2. 登录函数计算控制台

  3. 在左侧导航栏,单击服务及函数

  4. 在顶部菜单栏,选择地域,例如:华南1(深圳)。

  5. 服务列表页面,单击创建服务

  6. 创建服务面板,名称输入eb_event_action

  7. 打开高级面板,在这里可以配置VPC访问策略,允许访问RDS所在的VPC。并且选择相应的安全组。

  8. 单击确定

    1. 新建函数。

  9. 在服务eb_event_action的左侧导航栏,单击函数管理

  10. 单击创建函数

  11. 创建函数页面,名称输入eb_event_trigger,运行环境选择Python 3,配置函数触发器选择EventBridge触发器->云服务器ECS,其中触发器名称ConfigurationItemChangeTrigger,事件类型选择「自定义事件类型」,事件模式内容如下:

{
    "source": [
        "acs.ecs"
    ],
    "type": [
        "ecs:ActionTrail:AliyunServiceEvent",
        "ecs:ActionTrail:ApiCall",
        "ecs:ActionTrail:ConsoleOperation"
    ]
}
  1. 单击创建

    1. 配置函数的环境变量。

  2. 在函数eb_event_trigger函数代码页签,单击函数配置页签。

  3. 环境变量区域,单击编辑

  4. 单击添加变量,输入该环境变量的变量名称和变量值。

    1. 以下是必填项:

  • 变量AK,值为操作账号的AK。

  • 变量SK,值为操作账号的SK。

  • 变量REGION,值为操作资源所在的Region。

  • 变量SCENE,值有两个:member / ma.

    • member:表示的是不需要通过资源管理账号来统一管理。那对应的AK,SK就是当前成员账号具备操作ECS只读与操作资源转组权限(具备ECS只读与转组权限)。

    • ma: 表示需要通过资源管理账号来统一管理。那对应的AK,SK就得配置资源管理账号中具备STS权限的用户。

2. 以下是高阶选项(配置数据库实现操作幂等)

  • 变量MYSQL_ENDPOING,值为RDS内网地址。

  • 变量MYSQL_PORT,值为RDS数据库端口。

  • 变量MYSQL_USER,值为数据库用户名称。

  • 变量MYSQL_PASSWORD,值为数据库用户密码。

  • 变量MYSQL_DBNAME,值为数据库名称。

  • 变量WHITE_LIST,值为账号白名单。如果填写则必须命中才会生效。

  1. 单击保存

  1. 配置函数实例生命周期回调方法

    1. 函数管理页面,单击目标函数操作列的配置

    2. 在编辑函数配置页面的实例生命周期回调区域,设置回调程序与超时时间,然后单击保存

    3. 本示例中的配置如下截图:

  1. 函数代码

    1. 在函数eb_event_trigger触发器管理页签,单击函数代码页签。

    2. 单击文件index.py

    3. 拷贝并粘贴如下代码至文件index.py

# -*- coding:utf-8 -*-
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.auth.credentials import StsTokenCredential, AccessKeyCredential
from aliyunsdkecs.request.v20140526.DescribeSnapshotsRequest import DescribeSnapshotsRequest
from aliyunsdkresourcemanager.request.v20200331.MoveResourcesRequest import MoveResourcesRequest
import pymysql
from aliyunsdksts.request.v20150401.AssumeRoleRequest import AssumeRoleRequest
import json, logging
import os

connection = None
logger = logging.getLogger()


class ECS(object):

    def __init__(self, sts_access_key, sts_access_secret, sts_token, region_id):
        self.sts_access_key = sts_access_key
        self.sts_access_secret = sts_access_secret
        self.sts_token = sts_token
        self.region_id = region_id
        if sts_token is None:
            self.credentials = AccessKeyCredential(self.sts_access_key, self.sts_access_secret)
        else:
            self.credentials = StsTokenCredential(self.sts_access_key, self.sts_access_secret, self.sts_token)
        self.clt = AcsClient(region_id=self.region_id, credential=self.credentials)

    def ListDiskSnapshot(self, diskid):
        request = DescribeSnapshotsRequest()
        request.set_accept_format('json')
        request.set_DiskId(diskid)
        request.set_PageSize(100)
        response = self.clt.do_action_with_exception(request)
        return response

    def ListEcsDiskSnapshot(self, instanceid):
        request = DescribeSnapshotsRequest()
        request.set_accept_format('json')
        request.set_InstanceId(instanceid)
        request.set_PageSize(100)
        response = self.clt.do_action_with_exception(request)
        return response

    def MoveResources(self, target_rg, resources):
        request = MoveResourcesRequest()
        request.set_accept_format('json')

        request.set_ResourceGroupId(target_rg)
        request.set_Resources(resources)
        response = self.clt.do_action_with_exception(request)
        logger.info("批量转组成功,转组对象:" + json.dumps(resources))


class ResourceManage(object):
    def __init__(self, sts_access_key, sts_access_secret, region_id):
        self.sts_access_key = sts_access_key
        self.sts_access_secret = sts_access_secret
        self.region_id = region_id
        self.credentials = AccessKeyCredential(self.sts_access_key, self.sts_access_secret)
        self.clt = AcsClient(region_id=self.region_id, credential=self.credentials)

    """
    当需要通过资源管理账号操作多个成员账号时适用
    """

    def AssumeRole(self, memberUid):
        request = AssumeRoleRequest()
        request.set_accept_format('json')
        request.set_RoleArn("acs:ram::%s:role/resourcedirectoryaccountaccessrole" % (memberUid))
        request.set_RoleSessionName("rdMaster")
        response = self.clt.do_action_with_exception(request)

        response = json.loads(response).get('Credentials')
        return response.get('AccessKeyId'), response.get('AccessKeySecret'), response.get('SecurityToken')


def initialize(context):
    global connection
    try:
        connection = pymysql.connect(
            host=os.environ['MYSQL_ENDPOING'],  # 替换为您的HOST名称。
            port=int(os.environ['MYSQL_PORT']),  # 替换为您的端口号。
            user=os.environ['MYSQL_USER'],  # 替换为您的用户名。
            passwd=os.environ['MYSQL_PASSWORD'],  # 替换为您的用户名对应的密码。
            db=os.environ['MYSQL_DBNAME'],  # 替换为您的数据库名称。
            connect_timeout=5)
        logger.info('eb job connect mysql success!!!')
    except Exception as e:
        logger.error(
            "ERROR: Unexpected error: Could not connect to MySql instance.")


def pre_stop(context):
    if connection != None:
        connection.close()


"""
通过主键异常来发现是否当前这条EB事件有处理过
"""


def save_transactional(sql, params):
    if connection is None:
        return True
    try:
        cursor = connection.cursor()
        cursor.execute(sql, params)
        connection.commit()
        return True
    except Exception as e:
        logger.error(e)
    return False


def check_pk(sql):
    if connection is None:
        # 表示不做幂等验证,直接处理
        return None
    try:
        cursor = connection.cursor()
        cursor.execute(sql)
        data = cursor.fetchone()
        return data
    except Exception as e:
        logger.error(e)


def scene(context):
    """
    Params: context 格式
    {
        "accountId":"",
        "eventId:":"",
        "rgId":"",
        "resId":"",
        "resType":"disk"
    }
    场景一:有两个账号:
    1、日志账号,开通事件总线 + 函数计算 + RDS,用于响应事件。
    2、业务账号,开通事件总线,将事件总线消息推到日志账号。

    场景二:有三个账号:
    1、日志账号,开通事件总线 + 函数计算 + RDS,用于响应事件。
    2、资源管理账号,开通资源目录,在资源目录里面统一管理成员账号变更资源组。
    3、业务账号,开通事件总线,将事件总线消息推到日志账号。
    """
    if os.environ.get("AK") is None or os.environ.get("SK") is None or os.environ.get(
            "REGION") is None or os.environ.get("SCENE") is None:
        logger.error("没有配置程序用的AK/SK/Region,请检查环境变量")
        return
    else:
        ak = os.environ.get("AK")
        sk = os.environ.get("SK")

    region_id = os.environ.get("REGION")

    # 1. 先判断幂等
    pk_sql = "insert into pk_eventbridge(eb_id) values(%s)"
    params = (context.get("eventId"))
    if not save_transactional(pk_sql, params):
        logger.error("当前事件%s已处理过,忽略" % (context.get("eventId")))
        return

    # 2. 依据磁盘ID或实例ID查询快照列表
    resId = context.get("resId")

    # 需要判断一下是场景一还是场景二
    scene_type = os.environ.get("SCENE")
    if scene_type is not None and scene_type == "ma":
        # 需要先在MA账号里面AssumeRole到成员账号拿到STS_TOKEN
        resM = ResourceManage(ak, sk, region_id)
        sts_ak, sts_sk, sts_token = resM.AssumeRole(context.get("accountId"))
        ecs = ECS(sts_ak, sts_sk, sts_token, region_id)
    else:
        ecs = ECS(ak, sk, None, region_id)

    # 做多一步判断,如果是disk直接就查快照,如果是ECS实例则需要先查询出磁盘出来
    if context.get("resType") == "instance":
        resp = ecs.ListEcsDiskSnapshot(resId)
    if context.get("resType") == "disk":
        resp = ecs.ListDiskSnapshot(resId)
    if resp is None:
        return
    snap_object = json.loads(resp)
    if len(snap_object["Snapshots"]["Snapshot"]) == 0:
        logger.error("当前账号:%s,磁盘没有快照%s不需要处理,忽略" % (context.get("accountId"), context.get("resId")))
        return

    resources = []
    for item in snap_object["Snapshots"]["Snapshot"]:
        resources.append(
            {"ResourceId": item["SnapshotId"], "RegionId": region_id, "Service": "ecs", "ResourceType": "snapshot"})

    # 3. 批量转组
    ecs.MoveResources(context.get("rgId"), resources)


def transfer(event):
    eb = json.loads(event).get("data")
    # 添加一道过滤,只处理事件类型为:JoinResourceGroup 的订阅
    if eb.get("eventName") != "JoinResourceGroup":
        return
    context = {
        "accountId": eb.get("recipientAccountId"),
        "eventId": eb.get("eventId"),
        "rgId": eb.get("requestParameters").get("ResourceGroupId"),
        "resId": eb.get("requestParameters").get("ResourceId"),
        "resType": eb.get("requestParameters").get("ResourceType")
    }

    # 1、优先判断一下当前这个账号ID是否在白名单
    if os.environ.get("WHITE_LIST") is not None:
        white_list = json.loads(os.environ.get("WHITE_LIST"))
        if context.get("accountId") not in white_list:
            logger.info("当前账号:" + context.get("accountId") + "不在白名单里面,不需要处理")
            return None

    if context.get("resType") == "disk" or context.get("resType") == "instance":
        logger.info("当前事件需要订阅处理" + eb.get("eventId"))
        return context
    return None


def handler(event, context):
    transfer_context = transfer(event)
    if transfer_context is None:
        return
    scene(transfer_context)

说明

幂等表:pk_eventbridge , 结构如下:

CREATE TABLE `pk_eventbridge` (
  `eb_id` varchar(128) NOT NULL,
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`id`),
  UNIQUE KEY `id_eb_id` (`eb_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='去重,确保幂等'
;
  1. 登录业务账号B,做两种场景变更。

  • 将一台ECS从当前资源组转到目标资源组。

  • 将一块上述磁盘ID从当前资源组转到目标资源组。

可以看到对应的磁盘快照资源也一并发生转组。

故障排除

函数计算里面执行抛错,找不到依赖的包

本方案运行的是python代码,是需要安装三方包。具体安装过程可以参考官网

函数计算中的程序无法访问后端数据库?

  • 确认当前函数计算是否有配置网络选项,需要确保函数计算所在的网络环境跟RDS在同一个VPC内。

相关内容