配置审计服务为您提供数十款预设规则,参见阿里云预设规则列表。同时也支持您自定义规则函数。

自定义规则与系统预设规则的运行原理相同,区别仅在于系统预设规则是配置审计服务已经在函数计算服务中构建好了规则函数,使用时您直接在配置审计控制台选择要用的函数即可;而自定义规则需要您自己提前在函数计算服务中定义好规则函数,使用时在配置审计控制台需要录入规则函数的ARN。

通过自定义规则您可以更好的支持个性化的合规场景。

自定义规则的创建

一共分为两个步骤,第一步是在函数计算服务创建函数,第二步是跟预设规则一样,在配置审计服务创建规则。本文重点讲解第一步,第二步请参见创建规则的说明。

要创建一个自定义规则,首先要在函数计算(FC,以下将使用FC指代函数计算)控制台创建一个FC函数。目前在函数计算服务中,支持的编程语言有Java8、Nodejs6、Nodejs8、Python2.7、Python3、PHP7.2、dotnetcore2.1。Java8和dotnetcore值支持代码包上传(包含oss上传),其他语言级支持代码包上传,也支持在线编辑。函数计算相关内容,请参考:https://help.aliyun.com/product/50980.html 。

接下来以一个规则样例为您讲解如何自定义规则。

  • 规则场景:评估ECS实例是否由特定镜像启动
  • 函数语言使用Python3

在函数计算服务中创建规则函数并在配置审计引用

函数计算创建内容,请参考:https://help.aliyun.com/product/50980.html

  • 在函数计算创建规则函数后,会自动生成一个函数ARN。函数ARN可以在创建好的FC函数的概览页面中查看。

  • 在配置审计服务控制台实际引用该规则函数时,需要填入上图的ARN。

规则函数的代码构建

规则的本质是一段逻辑判断代码,这段代码放在刚刚创建的规则函数中。在实际的持续审计中,通过触发该规则函数的执行来实现评估。

  • 以下为规则样例的示例代码。本函数代码主要有两个函数,handler为入口函数,即自定义合规触发时调用的函数。hander需在在构建FC函数时进行配置。

  • 另一个函数为put_evaluations,在handler中调用,返回合规结果。
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import logging
import json
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.request import CommonRequest

logger = logging.getLogger()

# 用于筛选的资源类型,以逗号分隔多个资源类型
MATCH_RESOURCE_TYPES = ''

# 合规类型
COMPLIACE_TYPE_COMPLIANT = 'COMPLIANT'
COMPLIACE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
COMPLIACE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
COMPLIACE_TYPE_INSUFFICIENT_DATA = 'INSUFFICIENT_DATA'


def handler(event, context):
    """
    处理函数
    :param event: 事件
    :param context: 上下文
    :return: 评估结果
    """
    # 校验Event
    evt = validate_event(event)
    if not evt:
        return None

    rule_parameters = evt.get('ruleParameters')
    result_token = evt.get('resultToken')
    invoking_event = evt.get('invokingEvent')

    # 初始化返回值
    compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
    annotation = None

    # 获取ConfigurationItem
    configuration_item = invoking_event.get('configurationItem')
    if not configuration_item:
        logger.error('Configuration item is empty.')
        return None

    ordering_timestamp = configuration_item.get('captureTime')
    resource_id = configuration_item.get('resourceId')
    resource_type = configuration_item.get('resourceType')

    # 获取评估结果
    compliance_type, annotation = evaluate_configuration_item(
        rule_parameters, configuration_item)

    # 评估结果
    evaluations = [
        {
            'complianceResourceId': resource_id,
            'complianceResourceType': resource_type,
            'orderingTimestamp': ordering_timestamp,
            'complianceType': compliance_type,
            'annotation': annotation
        }
    ]

    # 回写评估结果
    put_evaluations(context, result_token, evaluations)

    return evaluations


def evaluate_configuration_item(rule_parameters, configuration_item):
    """
    评估逻辑
    :param rule_parameters: 规则参数
    :param configuration_item: 配置项
    :return: 评估类型,注解
    """
    # 初始化返回值
    compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
    annotation = None

    # 获取资源类型和ID
    resource_type = configuration_item['resourceType']
    full_configuration = configuration_item['configuration']

    # 判断资源类型
    if MATCH_RESOURCE_TYPES and resource_type not in MATCH_RESOURCE_TYPES.split(','):
        annotation = 'Resource type is {}, not in {}.'.format(
            resource_type, MATCH_RESOURCE_TYPES)
        return compliance_type, annotation

    # 判断配置信息
    if not full_configuration:
        annotation = 'Configuration is empty.'
        return compliance_type, annotation

    # 转换为JSON
    configuration = parse_json(full_configuration)
    if not configuration:
        annotation = 'Configuration:{} in invald.'.format(full_configuration)
        return compliance_type, annotation

    # =========== 用户代码 start =========== #


    # =========== 用户代码 end =========== #

    return compliance_type, annotation


def validate_event(event):
    """
    校验Event
    :param event: Event
    :return: Json对象
    """
    if not event:
        logger.error('Event is empty.')

    evt = parse_json(event)
    logger.info('Loading event: %s .' % evt)

    if 'resultToken' not in evt:
        logger.error('ResultToken is empty.')
        return None

    if 'ruleParameters' not in evt:
        logger.error('RuleParameters is empty.')
        return None

    if 'invokingEvent' not in evt:
        logger.error('InvokingEvent is empty.')
        return None

    return evt


def parse_json(content):
    """
    Json类型转换
    :param content: json字符串
    :return: Json对象
    """
    try:
        return json.loads(content)
    except Exception as e:
        logger.error('Parse content:{} to json error:{}.'.format(content, e))
        return None


def put_evaluations(context, result_token, evaluations):
    """
    回调Config Open API 回写评估结果
    :param context: 函数计算上下文
    :param result_token: 回调令牌
    :param evaluations: 评估结果
    :return: None
    """
    # 需要输入用户自己的ak/sk,需要具备 AliyunConfigFullAccess 权限
    client = AcsClient(
        'XXXX',
        'XXXXXXX',
        'cn-shanghai',
    )

    # 创建request,并设置参数
    request = CommonRequest()
    request.set_domain('config.cn-shanghai.aliyuncs.com')
    request.set_version('2018-12-24')
    request.set_action_name('PutEvaluations')
    request.add_body_params('ResultToken', result_token)
    request.add_body_params('Evaluations', evaluations)
    request.set_method('POST')

    try:
        response = client.do_action_with_exception(request)
        logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
    except Exception as e:
        logger.error('PutEvaluations error: %s' % e)

			

规则函数的入参

event参数。在规则函数中输入的入参信息保存在ruleParameters中,其他内容为在规则触发时自动生成事件信息。 JSON格式如下所示:

{
    version:"版本号",
    orderingTimestamp:"命令执行开始时间",
    invokingEvent:{
      messageType:"消息类型",
        configurationItem:{
          "accountId":"用户id",
            "arn":"资源ARN",
            "availabilityZone":"可用区",
            "regionId":"区域id",
            "configuration":"字符串形式的资源本身配置信息,各类资源有所不同",
            "configurationDiff":"配置变更内容",
            "relationship":"关系",
            "relationshipDiff":"关系内容变更",
            "captureTime":"捕获时间",
            "resourceCreationTime":"资源创建时间",
            "resourceStatus":"资源状态",
            "resourceId":"资源ID",
      "resourceName":"资源名称",
            "resourceType":"资源类型",
            "supplementaryConfiguration":"补充配置",
            "tags":"标签"
        },
        notificationCreationTimestamp:"事件消息生成时间"
    },
    ruleParameters:{
      "key":"value"
    },
    resultToken:"用户在FC中的回调信息" 
}
			

context参数。上下文信息,规则触发时自动带入

  • context.credentials.access_key_id:"accessKey值"
  • context.credentials.access_key_secret:"accessSecret值"
  • context.region:"区域信息"

以上完成自定义规则在函数计算服务的创建后,copy函数的ARN,就可以在配置审计服务继续进行规则的创建了。