当SIEM(如Splunk,QRadar)位于组织内部环境(on-premise)而非云端,且出于安全考虑,对外界未开放访问端口时,可通过部署一个可以连接日志服务与SIEM的应用程序,利用 SLS 消费组拉取日志,并使用 HTTPS + Splunk HEC 或 Syslog 将数据推送到内部 SIEM,实现云上日志与本地安全分析平台的集成。
业务背景
企业通常将安全信息和事件管理(SIEM)平台(如 Splunk、QRadar)部署在本地数据中心,且不向公网暴露接收端口以保证安全。当业务上云后,云上资源产生的日志数据又需要纳入本地 SIEM 进行统一的监控、审计和威胁分析。因此在不降低本地系统安全性的前提下,需要建立一条从日志服务到本地 SIEM 的日志投递管道,以实现云上日志的投递。
投递流程
在数据投递场景中,建议采用日志服务消费组来实现实时消费,并利用 Splunk 的 API(HTTP事件收集,HEC)或 Syslog over TCP/TLS 将日志传输至 SIEM。
核心逻辑
日志拉取:基于消费组构建程序,从日志服务拉取数据。此机制支持并发消费和故障转移。
并发与吞吐
并发度:可通过多次启动程序来实现并发效果。多个消费者属于同一消费组,且名称均不相同(消费者名以进程ID为后缀)。由于一个分区(Shard)只能被一个消费者消费,因此并发上限为Shard数量。例如一个日志库有10个分区,那么最多有10个消费者同时消费。
nohup python3 sync_data.py & nohup python3 sync_data.py & nohup python3 sync_data.py & ...
吞吐量:在理想网络条件下,用python3运行示例,单个消费者(约占用20%单核CPU)可达10 MB/s原始日志消费速率。因此10个消费者理论上可以消费100 MB/s原始日志。
高可用:消费组将检测点(Checkpoint)存储于服务端。当某一消费者实例终止运行,另一个消费者实例将自动接管并从断点继续消费。因此只需在不同机器上启动消费者,当一台机器故障的情况下,其他机器上的消费者便可以自动从断点继续消费。同时也可以在不同机器启动大于Shard数量的消费者以作备用。
数据转发:程序收到日志后,根据配置进行格式化,并发送到本地 SIEM。
程序代码示例
以下将介绍 HTTPS + Splunk HEC 与 Syslog 两种投递方式的区别,代码示例与含义,注意事项等内容。
HTTPS + Splunk HEC :HTTPS自动加密+Token认证满足安全性,支持多种数据格式。但仅支持Splunk。
Syslog:常见的日志通道,兼容大多数SIEM,支持文本类型。
HTTPS + Splunk HEC
当需要投递多源日志库(多个Project,多个Logstore)中日志数据至SIEM时,可以参考sync_data.py进行配置,代码主要由三部分内容组成:
单源日志库情况可参考python-sdk,与示例代码主要区别在于main()方法。
main()方法:主程序控制逻辑,针对多源日志库,共用一个executor以避免进程过多。
get_option() 方法:消费配置项。
基本配置项:包括日志服务连接配置和消费组配置。
消费组的高级选项:性能调参,不推荐修改。
SIEM(Splunk)相关参数与选项。
若在数据投递过程中涉及数据清洗(如行过滤、列裁剪和数据规整等)时,可以通过添加SPL语句添加规则,参考如下:
# SPL 语句 query = "* | where instance_id in ('instance-1', 'instance-2')" # 基于规则消费配置,相比普通消费在列表最后增加了参数 query option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval, query=query)
SyncData(ConsumerProcessorBase):内容包含如何从日志服务获取数据并投递到Splunk,请仔细阅读代码中相关注释并根据需求调整格式。
完整代码如下:
Syslog
Syslog主要基于RFC5424和RFC3164定义相关日志格式规范,推荐使用RFC5424协议。理论上TCP和UDP都支持Syslog,可以较好的保证数据传输稳定性,RFC5424协议也定义了TLS的安全传输层,当SIEM支持TCP通道或者TLS通道时建议优先使用。
当需要投递日志数据至SIEM时可以参考sync_data.py进行配置,代码主要由三部分内容组成:
main()方法:主程序控制逻辑。
get_monitor_option() 方法:消费配置项。
SyncData(ConsumerProcessorBase):内容包含如何从日志服务获取数据投递到SIEM Syslog服务器,请仔细阅读代码中相关注释并根据需求调整格式。
完整代码如下:
启动投递与查看状态
程序配置完成后,参考如下内容设置环境变量,并启动两个消费者进行并发消费:
export SLS_ENDPOINT=<Endpoint of your region> export SLS_PROJECT=<SLS Project Name> export SLS_LOGSTORE=<SLS Logstore Name> export SLS_AK_ID=<YOUR AK ID> export SLS_AK_KEY=<YOUR AK KEY> export SLS_CG=<CUSTOMER_GROUP> nohup python3 sync_data.py & nohup python3 sync_data.py &
环境变量名
取值
SLS_ENDPOINT
登录日志服务控制台,在Project列表中,单击目标Project。
单击Project名称右侧的
进入项目概览页面。
在访问域名中复制公网域名。
替换
<Endpoint of your region>
为https://
+公网域名。
若Endpoint前缀配置为
https://
,如https://cn-beijing.log.aliyuncs.com
,则程序自动使用HTTPS加密与日志服务连接。服务器证书*.aliyuncs.com
由GlobalSign签发,一般机器会自动信任此证书。若机器不信任此证书,通过Certificate installation下载并安装。SLS_PROJECT
在日志服务控制台,复制目标Project名称,替换
<SLS Project Name>
。SLS_LOGSTORE
在日志服务控制台,目标Project下复制目标Logstore名称,替换
<SLS Logstore Name>
。SLS_AK_ID
建议使用RAM账号的AccessKey ID替换
<YOUR AK ID>
。重要阿里云账号的AccessKey拥有所有API的访问权限,建议使用RAM用户的AccessKey进行API访问或日常运维。
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码中,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
SLS_AK_KEY
建议使用RAM账号的AccessKey Secret替换
<YOUR AK KEY>
。SLS_CG
消费组名,可以简单命名为"syc_data",替换
<CUSTOMER_GROUP>
。在控制台查看消费组状态。
通过云监控查看消费组延迟情况并配置告警。
常见问题
出现ConsumerGroupQuotaExceed
错误
此错误表示超出限制,单个日志库(Logstore)配置消费组上限为30个,请在日志服务控制台删除无用消费组。