ASM的审计功能可以帮助网格管理人员记录或追溯不同用户的日常操作,是集群安全运维中的重要环节。本文介绍如何启用网格审计功能、查看审计报表、查看日志记录以及设置告警。

前提条件

开通日志服务

背景信息

  • 本文中所提及的资源指的是Istio资源,包括VirtualService、Gateway、DestinationRule、EnvoyFilter、Sidecar、ServiceEntry等。
  • 审计功能开启后,审计日志会产生费用,计费方式请参见按量付费

启用网格审计

在创建ASM实例时,您可以在创建新网格对话框中启用网格审计功能,请参见创建ASM实例
说明 默认创建的审计LogProject名称为mesh-log-${mesh-id},同时会在该LogProject中创建名为audit-${mesh-id}的LogStore用于存储审计日志。

查看审计报表

ASM内置了四个审计报表,分别是审计中心概览、账号操作审计、资源操作概览、资源操作详细列表。

  1. 登录ASM控制台
  2. 在左侧导航栏中,选择服务网格 > 网格审计
  3. 网格审计页面,从网格下拉列表中选择需要查看的网格实例。
  4. 单击审计中心概览页签,查看网格实例的事件概览以及重要事件(如公网访问、命令执行、删除资源等)的详细信息。
    审计中心概览
  5. 单击账号操作审计页签,查看指定账号对网格实例操作的详细信息,包括对创建、修改和删除资源等操作的信息以及资源所属的命名空间分布、访问地理位置分布等信息。
    账号操作审计
  6. 单击资源操作概览页签,查看网格实例中主要资源的操作统计信息。
    说明 您可以执行以下操作来过滤筛选统计信息:
    • 自定义统计时间的范围,默认显示最近一周的统计信息。
    • 指定Namespace和子账号ID。
    • 选择一项或多项组合来筛选指定范围的事件。
    资源操作概览
  7. 单击资源操作详细列表页签,查看网格实例中指定资源的详细操作列表。
    您需要选择或输入指定的资源类型进行实时查询,包括资源操作各类事件的总数、Namespace分布、成功率、时序趋势以及详细操作列表等。
    说明 您可以执行以下操作来过滤筛选统计信息:
    • 自定义统计时间的范围,默认显示最近一周的统计信息。
    • 指定Namespace和子账号ID。
    • 选择一项或多项组合来筛选指定范围的事件。
    资源操作详细列表

查看详细日志记录

如果您有自定义查询和分析审计日志的需求,可以进入日志服务管理控制台查看详细的日志记录。

  1. 登录日志服务控制台
  2. Project列表区域,单击创建集群时设置的日志Project。
  3. 选择名称为audit-${clustered}的日志库,单击查询分析图标,查看对应的审计日志。
    查询分析图标
    说明
    • 在集群创建过程中,指定的日志Project中会自动添加一个名为audit-${clustereid}的日志库。
    • 审计日志的日志库默认已经配置索引。请不要修改索引,以免报表失效。
    常见的审计日志搜索方式如下所示。
    • 查询某一子账号的操作记录,直接在搜索框中输入子账号ID,单击查询/分析
    • 查询某一资源的操作,直接在搜索框中输入资源名,单击查询/分析
    • 过滤系统组件的操作,在搜索框中输入NOT user.username: node NOT user.username: serviceaccount NOT user.username: apiserver NOT user.username: kube-scheduler NOT user.username: kube-controller-manager,单击查询/分析

    更多查询和统计方式,请参见查询简介

设置告警

若您需要对某些资源的操作进行实时告警,可以通过日志服务的告警功能实现。告警方式支持短信、钉钉机器人、邮件、自定义WebHook和通知中心。详情请参见简介

关于审计日志的更多查询方式,您还可以通过审计报表中的查询语句来查询审计日志:

  • 示例1:对容器执行命令时触发告警。

    某公司对于网格实例的使用有严格限制,不允许用户登录容器或对容器执行命令。如果有用户执行命令时需要立即给出告警,并希望告警时能够显示用户登录的具体容器、执行的命令、操作人、事件ID、时间、操作源IP等信息。

    • 查询语句如下所示。
      verb : create and objectRef.subresource:exec and stage:  ResponseStarted | SELECT auditID as "事件ID", date_format(from_unixtime(__time__), '%Y-%m-%d %T' ) as "操作时间",  regexp_extract("requestURI", '([^\?]*)/exec\?.*', 1)as "资源",  regexp_extract("requestURI", '\?(.*)', 1)as "命令" ,"responseStatus.code" as "状态码",
       CASE 
       WHEN "user.username" != 'kubernetes-admin' then "user.username"
       WHEN "user.username" = 'kubernetes-admin' and regexp_like("annotations.authorization.k8s.io/reason", 'RoleBinding') then regexp_extract("annotations.authorization.k8s.io/reason", ' to User "(\w+)"', 1)
       ELSE 'kubernetes-admin' END  
       as "操作账号", 
      CASE WHEN json_array_length(sourceIPs) = 1 then json_format(json_array_get(sourceIPs, 0)) ELSE  sourceIPs END
      as "源地址" limit 100
    • 条件表达式如下所示。
      操作事件 =~ ".*"
  • 示例2:APIServer公网访问失败时触发告警。

    某网格实例开启了公网访问,为防止恶意攻击,需要监控公网访问的次数以及失败率。若访问次数到达一定阈值(例如10次)且失败率高于一定阈值(例如50%)则立即告警,并希望告警时能够显示用户的IP所属区域、操作源IP、是否高危IP等信息。

    • 查询语如下所示。
      * | select ip as "源地址", total as "访问次数", round(rate * 100, 2) as "失败率%", failCount as "非法访问次数", CASE when security_check_ip(ip) = 1 then 'yes' else 'no' end  as "是否高危IP",  ip_to_country(ip) as "国家", ip_to_province(ip) as "省", ip_to_city(ip) as "市", ip_to_provider(ip) as "运营商" from (select CASE WHEN json_array_length(sourceIPs) = 1 then json_format(json_array_get(sourceIPs, 0)) ELSE  sourceIPs END
      as ip, count(1) as total,
      sum(CASE WHEN "responseStatus.code" < 400 then 0 
      ELSE 1 END) * 1.0 / count(1) as rate,
      count_if("responseStatus.code" = 403) as failCount
      from log  group by ip limit 10000) where ip_to_domain(ip) != 'intranet'  having "访问次数" > 10 and "失败率%" > 50 ORDER by "访问次数" desc limit 100
    • 条件表达式如下所示。
      源地址 =~ ".*"