当需要投递日志到 SIEM 时,可通过部署一个能够连接日志服务与 SIEM 的应用程序,利用 SLS 消费组拉取日志,并使用 Splunk HEC 或 Syslog 将数据推送到 SIEM,实现云上日志与本地安全分析平台的集成。
业务背景
企业通常将安全信息和事件管理(SIEM)平台(如 Splunk、QRadar)部署在本地数据中心,且不向公网暴露接收端口以保证安全。当业务上云后,云上资源产生的日志数据又需要纳入本地 SIEM 进行统一的监控、审计和威胁分析。因此在不降低本地系统安全性的前提下,需要建立一条从日志服务到本地 SIEM 的日志投递管道,以实现云上日志的投递。
投递流程
在数据投递场景中,建议采用日志服务消费组来实现实时消费,并利用 Splunk 的 API(HTTP事件收集,HEC)或 Syslog over TCP/TLS 将日志传输至 SIEM。
核心逻辑
- 日志拉取:基于消费组构建程序,从日志服务拉取数据。此机制支持并发消费和故障转移。 - 并发与吞吐 - 可通过多次启动程序来实现并发效果。多个消费者属于同一消费组,且名称均不相同(消费者名以进程ID为后缀)。 
- 一个分区(Shard)只能被一个消费者消费,因此并发上限为Shard数量。例如一个日志库有10个分区,那么最多有10个消费者同时消费。 
- 在理想网络条件下: - 单个消费者(约占用20%单核CPU)可达10 MB/s原始日志消费速率。 
- 10个消费者可消费100 MB/s原始日志。 
 
 
- 高可用性 - 消费组将检测点(Checkpoint)存储于服务端。 
- 当某一消费者实例终止运行,另一个消费者实例将自动接管并从断点继续消费。因此可在不同机器上启动消费者,当一台机器故障的情况下,其他机器上的消费者便可以自动从断点继续消费。 
- 可在不同机器启动大于Shard数量的消费者以作备用。 
 
 
- 数据转发:程序收到日志后,根据配置进行格式化,并发送到本地 SIEM。 
准备工作
- 创建RAM用户及授权:该RAM需要拥有AliyunLogFullAccess的权限。 
- 网络要求:程序所在机器需要能访问日志服务Endpoint域名,且与SIEM 处于相同网络环境。 - Endpoint域名获取方式: - 登录日志服务控制台,在Project列表中,单击目标Project。 
- 单击Project名称右侧的  进入项目概览页面。 进入项目概览页面。
- 在访问域名中复制公网域名。Endpoint域名为 - https://+公网域名。
 
 
- 环境要求:准备Python 3运行环境,并安装Python sdk。 - 安装日志服务Python sdk: - pip install -U aliyun-log-python-sdk。
- 验证安装结果: - pip show aliyun-log-python-sdk,返回如下信息表示安装成功。- Name: aliyun-log-python-sdk Version: 0.9.12 Summary: Aliyun log service Python client SDK Home-page: https://github.com/aliyun/aliyun-log-python-sdk Author: Aliyun
 
实施步骤
步骤一:应用程序准备
日志服务提供Splunk HEC 与 Syslog 两种投递方式,请选择对应程序示例进行配置。
- Splunk HEC :HTTP 事件收集器 (HEC) 基于Token,通过 HTTP 以高效安全的方式将多种数据格式的日志直接发送到 Splunk。 
- Syslog:常见的日志通道,兼容大多数SIEM,支持文本格式。 
Splunk HEC
投递日志数据至Splunk时,可以参考sync_data.py进行配置,代码主要由三部分内容组成:
- main()方法:主程序控制逻辑。 
- get_option() 方法:消费配置项。 - 基本配置项:包括日志服务连接配置和消费组配置。 
- 消费组的高级选项:性能调参,不推荐修改。 
- SIEM(Splunk)相关参数与选项。 
- 若在数据投递过程中涉及数据清洗(如行过滤、列裁剪和数据规整等)时,可以通过添加SPL语句添加规则,参考如下: - # SPL 语句 query = "* | where instance_id in ('instance-1', 'instance-2')" # 基于规则创建消费,相比普通消费在参数列表最后增加了参数 query option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval, query=query)
 
- SyncData(ConsumerProcessorBase):内容包含如何从日志服务获取数据并投递到Splunk,请仔细阅读代码中相关注释并根据需求调整。 
完整代码如下:
Syslog
Syslog主要基于RFC5424和RFC3164定义相关日志格式规范,推荐使用RFC5424协议。理论上TCP和UDP都支持Syslog,可以较好的保证数据传输稳定性,RFC5424协议也定义了TLS的安全传输层,当SIEM支持TCP通道或者TLS通道时建议优先使用。
当需要投递日志数据至SIEM时可以参考sync_data.py进行配置,代码主要由三部分内容组成:
- main()方法:主程序控制逻辑。 
- get_monitor_option() 方法:消费配置项。 
- SyncData(ConsumerProcessorBase):内容包含如何从日志服务获取数据投递到SIEM Syslog服务器,请仔细阅读代码中相关注释并根据需求调整。 
完整代码如下:
步骤二:配置环境变量
程序配置完成后,进行表格中系统环境变量配置。
| 环境变量名 | 取值 | 示例 | 
| SLS_ENDPOINT | 
 若Endpoint前缀配置为 | 
 | 
| SLS_PROJECT | 在日志服务控制台,复制目标Project名称。 | my-sls-project-one | 
| SLS_LOGSTORE | 在日志服务控制台,复制目标Logstore名称。 | my-sls-logstore-a1 | 
| SLS_AK_ID | 建议使用RAM账号的AccessKey ID。 重要  
 | L***ky | 
| SLS_AK_KEY | 建议使用RAM账号的AccessKey Secret。 | x***Xl | 
| SLS_CG | 消费组名,可以简单命名为"syc_data",若消费组不存在,程序会自动创建。 | syc_data | 
步骤三:启动并验证
- 启动多消费者进行并发消费,支持的最大并发数等于总 Shard 数。 - # 启动第一个消费者进程 nohup python3 sync_data.py & # 启动第二个消费者进程 nohup python3 sync_data.py &
- 在日志服务控制台查看消费组状态。 - 在Project列表区域,单击目标Project。在页签中,单击目标Logstore的  图标,然后单击数据消费的 图标,然后单击数据消费的 图标。 图标。
- 在消费组列表中,单击目标消费组,在Consumer Group状态页面,查看每个Shard消费数据的客户端和时间。 
 
- 通过云监控查看消费组延迟情况并配置告警。 
常见问题
出现ConsumerGroupQuotaExceed错误
此错误表示超出限制,单个日志库(Logstore)配置消费组上限为30个,请在日志服务控制台删除无用消费组。