基于函数计算创建自定义规则

本文通过编写函数代码检测ECS实例中的CPU核数为例,为您介绍基于函数计算2.0创建自定义规则的完整操作流程。

前提条件

请确保您已开通函数计算服务。具体操作,请参见开通服务

说明

关于函数计算服务的收费标准,请参见计费概览

背景信息

关于自定义函数规则的概念、应用场景、运行原理等,请参见自定义函数规则定义和运行原理

操作步骤

本文通过编写函数代码检测ECS实例中的CPU核数为例,当CPU核数小于或等于2时,ECS实例合规;反之,ECS实例不合规。

  1. 创建服务。

    1. 登录函数计算控制台

    2. 在左侧导航栏,单击服务及函数

    3. 在顶部菜单栏,选择地域,例如:华东2(上海)

    4. 服务列表页面,单击创建服务

    5. 创建服务面板,输入服务的名称,其他参数均保持默认值。

    6. 单击确定

  2. 创建函数。

    1. 步骤 1创建的服务中,单击创建函数

    2. 创建函数页面,输入函数名称请求处理程序类型选择处理事件请求运行环境选择Python 3.10,其他参数保持默认值。

    3. 单击创建

  3. 配置函数代码。

    1. 拷贝并粘贴如下代码至文件index.py

      # # !/usr/bin/env python
      # # -*- encoding: utf-8 -*-
      import json
      import logging
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.request import CommonRequest
      
      
      logger = logging.getLogger()
      # 合规类型
      COMPLIANCE_TYPE_COMPLIANT = 'COMPLIANT'
      COMPLIANCE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
      COMPLIANCE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
      # 资源配置推送类型
      CONFIGURATION_TYPE_COMMON = 'COMMON'
      CONFIGURATION_TYPE_OVERSIZE = 'OVERSIZE'
      CONFIGURATION_TYPE_NONE = 'NONE'
      
      
      # 入口函数,完成业务逻辑编排和处理。
      def handler(event, context):
          """
          处理函数
          :param event:事件
          :param context:上下文
          :return:评估结果
          """
          # 函数入参
          logger.info(f'打印函数入参:{event}')
      
          # 校验Event,代码可直接复制。
          evt = validate_event(event)
          if not evt:
              return None
          creds = context.credentials
          rule_parameters = evt.get('ruleParameters')
          result_token = evt.get('resultToken')
          invoking_event = evt.get('invokingEvent')
          ordering_timestamp = evt.get('orderingTimestamp')
      
          # 资源配置信息。当规则触发机制设置为配置变更时,该入参有值。当您新建规则或手动执行规则时,配置审计会逐个调用函数触发对所有资源的评估。如果资源配置变更,配置审计根据变更的资源信息自动调用函数触发一次资源评估。
          configuration_item = invoking_event.get('configurationItem')
          account_id = configuration_item.get('accountId')
          resource_id = configuration_item.get('resourceId')
          resource_type = configuration_item.get('resourceType')
          region_id = configuration_item.get('regionId')
          resource_name = configuration_item.get('resourceName')
      
          # 判断当前推送的资源配置信息是否大于配置(100 KB),如果是,则需要调用资源详情API进行完整数据查询。
          configuration_type = invoking_event.get('configurationType')
          if configuration_type and configuration_type == CONFIGURATION_TYPE_OVERSIZE:
              resource_result = get_discovered_resource(creds, resource_id, resource_type, region_id)
              resource_json = json.loads(resource_result)
              configuration_item["configuration"] = resource_json["DiscoveredResourceDetail"]["Configuration"]
      
          # 对资源进行评估,需要根据实际业务自行实现评估逻辑,以下代码仅供参考。
          compliance_type, annotation = evaluate_configuration_item(
              rule_parameters, configuration_item)
      
          # 设置评估结果,格式需符合以下示例要求。
          evaluations = [
              {
                  'accountId': account_id,
                  'complianceResourceId': resource_id,
                  'complianceResourceName': resource_name,
                  'complianceResourceType': resource_type,
                  'complianceRegionId': region_id,
                  'orderingTimestamp': ordering_timestamp,
                  'complianceType': compliance_type,
                  'annotation': annotation
              }
          ]
      
          # 将评估结果返回并写入配置审计,代码可直接复制。
          put_evaluations(creds, result_token, evaluations)
          return evaluations
      
      
      # 评估ECS实例的CPU核数。
      def evaluate_configuration_item(rule_parameters, configuration_item):
          """
          评估逻辑
          :param rule_parameters:规则参数
          :param configuration_item:配置项
          :return:评估类型
          """
          # 初始化返回值
          compliance_type = COMPLIANCE_TYPE_COMPLIANT
          annotation = None
      
          # 获取资源配置完整信息
          full_configuration = configuration_item['configuration']
          if not full_configuration:
              annotation = 'Configuration is empty.'
              return compliance_type, annotation
      
          # 转换为JSON
          configuration = parse_json(full_configuration)
          cpu_count = configuration.get('Cpu')
          eq_count = rule_parameters.get('CpuCount')
          if cpu_count and cpu_count <= int(eq_count):
            annotation = json.dumps({"configuration":cpu_count,"desiredValue":eq_count,"operator":"Greater","property":"$.Cpu"})
            compliance_type = COMPLIANCE_TYPE_NON_COMPLIANT
            return compliance_type, annotation
          return compliance_type, annotation
      
      
      def validate_event(event):
          """
          校验Event
          :param event:Event
          :return:JSON对象
          """
          if not event:
              logger.error('Event is empty.')
          evt = parse_json(event)
          logger.info('Loading event: %s .' % json.dumps(evt))
      
          if 'resultToken' not in evt:
              logger.error('ResultToken is empty.')
              return None
          if 'ruleParameters' not in evt:
              logger.error('RuleParameters is empty.')
              return None
          if 'invokingEvent' not in evt:
              logger.error('InvokingEvent is empty.')
              return None
          return evt
      
      
      def parse_json(content):
          """
          JSON类型转换
          :param content:JSON字符串
          :return:JSON对象
          """
          try:
              return json.loads(content)
          except Exception as e:
              logger.error('Parse content:{} to json error:{}.'.format(content, e))
              return None
      
      
      # 评估结果返回,并写入配置审计,代码可直接复制。
      def put_evaluations(creds, result_token, evaluations):
          """
          调用API返回并写入评估结果
          :param context:函数计算上下文
          :param result_token:回调令牌
          :param evaluations:评估结果
          :return: None
          """
          # 需具备权限AliyunConfigFullAccess的函数计算FC的服务角色。
          client = AcsClient(creds.access_key_id, creds.access_key_secret, region_id='cn-shanghai')
      
          # 新建Request,并设置参数,Domain为config.cn-shanghai.aliyuncs.com。
          request = CommonRequest()
          request.set_domain('config.cn-shanghai.aliyuncs.com')
          request.set_version('2019-01-08')
          request.set_action_name('PutEvaluations')
          request.add_body_params('ResultToken', result_token)
          request.add_body_params('Evaluations', evaluations)
          request.add_body_params('SecurityToken', creds.security_token)
          request.set_method('POST')
      
          try:
              response = client.do_action_with_exception(request)
              logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
          except Exception as e:
              logger.error('PutEvaluations error: %s' % e)
      说明

      以上代码用于检测CPU核数。推荐您通过配置审计控制台查看代码中资源配置(configuration)的核心参数,获取待设置规则的参数名称。具体操作,请参见查看资源信息中的步骤 6。例如:在ECS实例的资源核心配置信息区域,单击查看JSON,获取CPU核数的参数为Cpu,从rule_parameters中获取CpuCount的期望值。

    2. 单击左上角的部署代码

  4. 基于函数计算创建自定义规则。

    1. 登录配置审计控制台

    2. (可选)在左上角选择目标账号组。

      仅资源目录下的管理账号需要执行该操作。单个阿里云账号不涉及。

    3. 在左侧导航栏,选择合规审计 > 规则

    4. 规则页面,单击新建规则

    5. 选择创建方式页面,先选择基于函数计算自定义,然后选择函数ARN,再单击下一步

      函数ARN的地域选择华东2(上海)服务选择步骤 1中创建的服务,函数选择步骤 2中创建的函数。

    6. 设置基本属性页面,先输入规则名称,再单击添加规则入参规则入参名称输入CpuCount期望值输入2,然后触发机制选择配置变更,最后单击下一步

      说明
      • 规则入参名称需要与步骤 3代码中的rule_parameters保持一致。

      • 当您在创建规则、修改规则和规则重新审计资源时,配置审计会将目标资源类型的所有资源配置信息逐条推送至函数计算,并触发函数对其进行评估。

    7. 设置生效范围页面,资源类型选择ECS实例,单击下一步

      说明
      • 请您选择实际待评估的资源类型,避免选择所有资源类型触发无效评估。

      • 选择规则关联的资源后,规则将检测您账号下该资源类型的所有资源。

    8. 设置修正页面,单击提交

      说明

      您可以打开设置修正开关,根据控制台提示,设置自定义修正。具体操作,请参见设置自定义修正

  5. 查看规则对ECS实例中CPU核数的检测结果。

    规则列表中,您可以看到目标规则检测出的不合规资源数。

    说明

    单击规则ID操作详情,您可以查看最新检测数据列表

    当目标规则的不合规资源列显示无数据,但您确认当前账号下有该规则中设置的资源时,说明函数调用失败或函数计算提交评估结果到配置审计失败。您可以在目标函数的调用日志页签,单击操作列的请求日志,定位失败原因,并逐步修改。具体操作,请参见查看调用日志

相关操作