当配置审计提供的托管规则不能满足您的需求时,您可以使用函数计算新建自定义规则。当您的自定义规则被触发时,配置审计会运行对应的规则函数,并给出规则合规结果。

前提条件

请确保您已开通函数计算服务,操作方法请参见开通函数计算服务

背景信息

自定义规则与预设规则差异的差异如下:
  • 托管规则

    托管规则是配置审计已在函数计算中构建的规则函数,使用时您可以直接在配置审计控制台选择所需函数。配置审计支持的托管规则,请参见托管规则列表

  • 自定义规则

    自定义规则需要您提前在函数计算中定义规则函数,使用时在配置审计控制台选择规则函数的ARN。

新建自定义规则

  1. 在函数计算控制台新建函数。
    新建函数的操作方法,请参见新建函数
    说明
    • 新建函数时,创建方式选择事件函数运行环境选择python3函数入口使用默认值index.handler,入口函数为handler。
    • 如果您想深入了解函数计算,请参见函数计算
  2. 在函数计算控制台查看新建函数ARN。
    新建函数后,自动生成一个函数ARN,您可以在该函数的概览页面查看。函数属性
  3. 在配置审计控制台新建规则时,选择新建函数ARN。
    新建规则的操作方法,请参见通过函数计算新建规则

规则函数的代码构建

规则的本质是一段逻辑判断代码,这段代码存放在新建的函数中。在实际持续审计中,通过触发该函数的执行来实现资源评估。本函数代码主要有两个函数,具体如下:

  • handler

    handler为入口函数,即自定义规则触发时调用的函数。handler在新建函数时进行设置。

  • put_evaluations
    put_evaluations在handler中调用,返回合规结果。JSON样例如下:
    #!/usr/bin/env python
    # -*- encoding: utf-8 -*-
    import logging
    import json
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.auth.credentials import StsTokenCredential
    from aliyunsdkcore.acs_exception.exceptions import ClientException
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkcore.request import CommonRequest
    
    logger = logging.getLogger()
    
    # 用于筛选资源类型,以英文逗号(,)分隔多个资源类型。
    MATCH_RESOURCE_TYPES = ''
    
    # 合规类型
    COMPLIACE_TYPE_COMPLIANT = 'COMPLIANT'
    COMPLIACE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
    COMPLIACE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
    COMPLIACE_TYPE_INSUFFICIENT_DATA = 'INSUFFICIENT_DATA'
    
    
    def handler(event, context):
        """
        处理函数
        :param event: 事件
        :param context: 上下文
        :return: 评估结果
        """
        # 校验Event
        evt = validate_event(event)
        if not evt:
            return None
    
        rule_parameters = evt.get('ruleParameters')
        result_token = evt.get('resultToken')
        invoking_event = evt.get('invokingEvent')
    
        # 初始化返回值
        compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
        annotation = None
    
        # 获取ConfigurationItem
        configuration_item = invoking_event.get('configurationItem')
        if not configuration_item:
            logger.error('Configuration item is empty.')
            return None
    
        ordering_timestamp = configuration_item.get('captureTime')
        resource_id = configuration_item.get('resourceId')
        resource_type = configuration_item.get('resourceType')
    
        # 获取评估结果
        compliance_type, annotation = evaluate_configuration_item(
            rule_parameters, configuration_item)
    
        # 评估结果
        evaluations = [
            {
                'complianceResourceId': resource_id,
                'complianceResourceType': resource_type,
                'orderingTimestamp': ordering_timestamp,
                'complianceType': compliance_type,
                'annotation': annotation
            }
        ]
    
        # 回写评估结果
        put_evaluations(context, result_token, evaluations)
    
        return evaluations
    
    
    def evaluate_configuration_item(rule_parameters, configuration_item):
        """
        评估逻辑
        :param rule_parameters: 规则参数
        :param configuration_item: 配置项
        :return: 评估类型,注解
        """
        # 初始化返回值
        compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
        annotation = None
    
        # 获取资源类型和资源ID
        resource_type = configuration_item['resourceType']
        full_configuration = configuration_item['configuration']
    
        # 判断资源类型
        if MATCH_RESOURCE_TYPES and resource_type not in MATCH_RESOURCE_TYPES.split(','):
            annotation = 'Resource type is {}, not in {}.'.format(
                resource_type, MATCH_RESOURCE_TYPES)
            return compliance_type, annotation
    
        # 判断配置信息
        if not full_configuration:
            annotation = 'Configuration is empty.'
            return compliance_type, annotation
    
        # 转换为JSON
        configuration = parse_json(full_configuration)
        if not configuration:
            annotation = 'Configuration:{} in invald.'.format(full_configuration)
            return compliance_type, annotation
    
        # =========== 用户代码 start =========== #
    
    
        # =========== 用户代码 end =========== #
    
        return compliance_type, annotation
    
    
    def validate_event(event):
        """
        校验Event
        :param event: Event
        :return: JSON对象
        """
        if not event:
            logger.error('Event is empty.')
    
        evt = parse_json(event)
        logger.info('Loading event: %s .' % evt)
    
        if 'resultToken' not in evt:
            logger.error('ResultToken is empty.')
            return None
    
        if 'ruleParameters' not in evt:
            logger.error('RuleParameters is empty.')
            return None
    
        if 'invokingEvent' not in evt:
            logger.error('InvokingEvent is empty.')
            return None
    
        return evt
    
    
    def parse_json(content):
        """
        JSON类型转换
        :param content: JSON字符串
        :return: JSON对象
        """
        try:
            return json.loads(content)
        except Exception as e:
            logger.error('Parse content:{} to json error:{}.'.format(content, e))
            return None
    
    
    def put_evaluations(context, result_token, evaluations):
        """
        回调配置审计OpenAPI回写评估结果
        :param context: 函数计算上下文
        :param result_token: 回调令牌
        :param evaluations: 评估结果
        :return: None
        """
        # 需要输入当前阿里云账号的AccessKey ID和AccessKey Secret,需要具备AliyunConfigFullAccess权限。
        client = AcsClient(
            'XXXX',
            'XXXXXXX',
            'cn-shanghai',
        )
    
        # 新建request,并设置参数,domain为config.cn-shanghai.aliyuncs.com。
        request = CommonRequest()
        request.set_domain('config.cn-shanghai.aliyuncs.com')
        request.set_version('2019-01-08')
        request.set_action_name('PutEvaluations')
        request.add_body_params('ResultToken', result_token)
        request.add_body_params('Evaluations', evaluations)
        request.set_method('POST')
    
        try:
            response = client.do_action_with_exception(request)
            logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
        except Exception as e:
            logger.error('PutEvaluations error: %s' % e)
    
                

规则函数的入参

在函数中设置的入参信息保存在ruleParameters中,其他内容在规则触发时自动生成事件信息。JSON格式如下:

{
    version:"版本号",
    orderingTimestamp:"命令执行开始时间",
    invokingEvent:{
      messageType:"消息类型",
        configurationItem:{
          "accountId":"阿里云账号ID",
            "arn":"资源ARN",
            "availabilityZone":"可用区",
            "regionId":"地域ID",
            "configuration":"字符串形式的资源本身配置信息,各类资源有所不同",
            "configurationDiff":"配置变更内容",
            "relationship":"关系",
            "relationshipDiff":"关系内容变更",
            "captureTime":"捕获时间",
            "resourceCreationTime":"资源创建时间",
            "resourceStatus":"资源状态",
            "resourceId":"资源ID",
            "resourceName":"资源名称",
            "resourceType":"资源类型",
            "supplementaryConfiguration":"补充配置",
            "tags":"标签"
        },
        notificationCreationTimestamp:"事件消息生成时间"
    },
    ruleParameters:{
      "key":"value"
    },
    resultToken:"用户在函数计算中的回调信息" 
}

context参数是上下文信息,规则触发时自动带入。

  • context.credentials.access_key_id:"accessKey"
  • context.credentials.access_key_secret:"accessSecret"
  • context.region:"地域信息"