当需要投递日志到 SIEM 时,可通过部署一个能够连接日志服务与 SIEM 的应用程序,利用 SLS 消费组拉取日志,并使用 Splunk HEC 或 Syslog 将数据推送到 SIEM,实现云上日志与本地安全分析平台的集成。
业务背景
企业通常将安全信息和事件管理(SIEM)平台(如 Splunk、QRadar)部署在本地数据中心,且不向公网暴露接收端口以保证安全。当业务上云后,云上资源产生的日志数据又需要纳入本地 SIEM 进行统一的监控、审计和威胁分析。因此在不降低本地系统安全性的前提下,需要建立一条从日志服务到本地 SIEM 的日志投递管道,以实现云上日志的投递。
投递流程
在数据投递场景中,建议采用日志服务消费组来实现实时消费,并利用 Splunk 的 API(HTTP事件收集,HEC)或 Syslog over TCP/TLS 将日志传输至 SIEM。
核心逻辑
日志拉取:基于消费组构建程序,从日志服务拉取数据。此机制支持并发消费和故障转移。
并发与吞吐
可通过多次启动程序来实现并发效果。多个消费者属于同一消费组,且名称均不相同(消费者名以进程ID为后缀)。
一个分区(Shard)只能被一个消费者消费,因此并发上限为Shard数量。例如一个日志库有10个分区,那么最多有10个消费者同时消费。
在理想网络条件下:
单个消费者(约占用20%单核CPU)可达10 MB/s原始日志消费速率。
10个消费者可消费100 MB/s原始日志。
高可用性
消费组将检测点(Checkpoint)存储于服务端。
当某一消费者实例终止运行,另一个消费者实例将自动接管并从断点继续消费。因此可在不同机器上启动消费者,当一台机器故障的情况下,其他机器上的消费者便可以自动从断点继续消费。
可在不同机器启动大于Shard数量的消费者以作备用。
数据转发:程序收到日志后,根据配置进行格式化,并发送到本地 SIEM。
准备工作
创建RAM用户及授权:该RAM需要拥有AliyunLogFullAccess的权限。
网络要求:程序所在机器需要能访问日志服务Endpoint域名,且与SIEM 处于相同网络环境。
Endpoint域名获取方式:
登录日志服务控制台,在Project列表中,单击目标Project。
单击Project名称右侧的
进入项目概览页面。
在访问域名中复制公网域名。Endpoint域名为
https://
+公网域名。
环境要求:准备Python 3运行环境,并安装Python sdk。
安装日志服务Python sdk:
pip install -U aliyun-log-python-sdk
。验证安装结果:
pip show aliyun-log-python-sdk
,返回如下信息表示安装成功。Name: aliyun-log-python-sdk Version: 0.9.12 Summary: Aliyun log service Python client SDK Home-page: https://github.com/aliyun/aliyun-log-python-sdk Author: Aliyun
实施步骤
步骤一:应用程序准备
日志服务提供Splunk HEC 与 Syslog 两种投递方式,请选择对应程序示例进行配置。
Splunk HEC :HTTP 事件收集器 (HEC) 基于Token,通过 HTTP 以高效安全的方式将多种数据格式的日志直接发送到 Splunk。
Syslog:常见的日志通道,兼容大多数SIEM,支持文本格式。
Splunk HEC
投递日志数据至Splunk时,可以参考sync_data.py进行配置,代码主要由三部分内容组成:
main()方法:主程序控制逻辑。
get_option() 方法:消费配置项。
基本配置项:包括日志服务连接配置和消费组配置。
消费组的高级选项:性能调参,不推荐修改。
SIEM(Splunk)相关参数与选项。
若在数据投递过程中涉及数据清洗(如行过滤、列裁剪和数据规整等)时,可以通过添加SPL语句添加规则,参考如下:
# SPL 语句 query = "* | where instance_id in ('instance-1', 'instance-2')" # 基于规则创建消费,相比普通消费在参数列表最后增加了参数 query option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval, query=query)
SyncData(ConsumerProcessorBase):内容包含如何从日志服务获取数据并投递到Splunk,请仔细阅读代码中相关注释并根据需求调整。
完整代码如下:
Syslog
Syslog主要基于RFC5424和RFC3164定义相关日志格式规范,推荐使用RFC5424协议。理论上TCP和UDP都支持Syslog,可以较好的保证数据传输稳定性,RFC5424协议也定义了TLS的安全传输层,当SIEM支持TCP通道或者TLS通道时建议优先使用。
当需要投递日志数据至SIEM时可以参考sync_data.py进行配置,代码主要由三部分内容组成:
main()方法:主程序控制逻辑。
get_monitor_option() 方法:消费配置项。
SyncData(ConsumerProcessorBase):内容包含如何从日志服务获取数据投递到SIEM Syslog服务器,请仔细阅读代码中相关注释并根据需求调整。
完整代码如下:
步骤二:配置环境变量
程序配置完成后,进行表格中系统环境变量配置。
环境变量名 | 取值 | 示例 |
SLS_ENDPOINT |
若Endpoint前缀配置为 |
|
SLS_PROJECT | 在日志服务控制台,复制目标Project名称。 | my-sls-project-one |
SLS_LOGSTORE | 在日志服务控制台,复制目标Logstore名称。 | my-sls-logstore-a1 |
SLS_AK_ID | 建议使用RAM账号的AccessKey ID。 重要
| L***ky |
SLS_AK_KEY | 建议使用RAM账号的AccessKey Secret。 | x***Xl |
SLS_CG | 消费组名,可以简单命名为"syc_data",若消费组不存在,程序会自动创建。 | syc_data |
步骤三:启动并验证
启动多消费者进行并发消费,支持的最大并发数等于总 Shard 数。
# 启动第一个消费者进程 nohup python3 sync_data.py & # 启动第二个消费者进程 nohup python3 sync_data.py &
在日志服务控制台查看消费组状态。
在Project列表区域,单击目标Project。在
页签中,单击目标Logstore的图标,然后单击数据消费的
图标。
在消费组列表中,单击目标消费组,在Consumer Group状态页面,查看每个Shard消费数据的客户端和时间。
通过云监控查看消费组延迟情况并配置告警。
常见问题
出现ConsumerGroupQuotaExceed
错误
此错误表示超出限制,单个日志库(Logstore)配置消费组上限为30个,请在日志服务控制台删除无用消费组。