投递日志到SIEM

SIEM(如Splunk,QRadar)位于组织内部环境(on-premise)而非云端,且出于安全考虑,对外界未开放访问端口时,可通过部署一个可以连接日志服务与SIEM的应用程序,利用 SLS 消费组拉取日志,并使用 HTTPS + Splunk HEC 或 Syslog 将数据推送到内部 SIEM,实现云上日志与本地安全分析平台的集成。

业务背景

企业通常将安全信息和事件管理(SIEM)平台(如 Splunk、QRadar)部署在本地数据中心,且不向公网暴露接收端口以保证安全。当业务上云后,云上资源产生的日志数据又需要纳入本地 SIEM 进行统一的监控、审计和威胁分析。因此在不降低本地系统安全性的前提下,需要建立一条从日志服务到本地 SIEM 的日志投递管道,以实现云上日志的投递。

投递流程

在数据投递场景中,建议采用日志服务消费组来实现实时消费,并利用 Splunk 的 API(HTTP事件收集,HEC)或 Syslog over TCP/TLS 将日志传输至 SIEM。

image

核心逻辑

  1. 日志拉取:基于消费组构建程序,从日志服务拉取数据。此机制支持并发消费和故障转移。

    • 并发与吞吐

      • 并发度:可通过多次启动程序来实现并发效果。多个消费者属于同一消费组,且名称均不相同(消费者名以进程ID为后缀)。由于一个分区(Shard)只能被一个消费者消费,因此并发上限为Shard数量。例如一个日志库有10个分区,那么最多有10个消费者同时消费。

        nohup python3 sync_data.py &
        nohup python3 sync_data.py &
        nohup python3 sync_data.py &
        ...
      • 吞吐量:在理想网络条件下,用python3运行示例,单个消费者(约占用20%单核CPU)可达10 MB/s原始日志消费速率。因此10个消费者理论上可以消费100 MB/s原始日志。

      • 高可用:消费组将检测点(Checkpoint)存储于服务端。当某一消费者实例终止运行,另一个消费者实例将自动接管并从断点继续消费。因此只需在不同机器上启动消费者,当一台机器故障的情况下,其他机器上的消费者便可以自动从断点继续消费。同时也可以在不同机器启动大于Shard数量的消费者以作备用。

  2. 数据转发:程序收到日志后,根据配置进行格式化,并发送到本地 SIEM。

程序代码示例

以下将介绍 HTTPS + Splunk HEC 与 Syslog 两种投递方式的区别,代码示例与含义,注意事项等内容。

  • HTTPS + Splunk HEC :HTTPS自动加密+Token认证满足安全性,支持多种数据格式。但仅支持Splunk。

  • Syslog:常见的日志通道,兼容大多数SIEM,支持文本类型。

HTTPS + Splunk HEC

当需要投递多源日志库(多个Project,多个Logstore)中日志数据至SIEM时,可以参考sync_data.py进行配置,代码主要由三部分内容组成:

单源日志库情况可参考python-sdk,与示例代码主要区别在于main()方法。
  • main()方法:主程序控制逻辑,针对多源日志库,共用一个executor以避免进程过多。

  • get_option() 方法:消费配置项。

    • 基本配置项:包括日志服务连接配置和消费组配置。

    • 消费组的高级选项:性能调参,不推荐修改。

    • SIEM(Splunk)相关参数与选项。

    • 若在数据投递过程中涉及数据清洗(如行过滤、列裁剪和数据规整等)时,可以通过添加SPL语句添加规则,参考如下:

      # SPL 语句
          query = "* | where instance_id in ('instance-1', 'instance-2')"
      # 基于规则消费配置,相比普通消费在列表最后增加了参数 query
          option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                                cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                                cursor_start_time=cursor_start_time,
                                heartbeat_interval=heartbeat_interval,
                                data_fetch_interval=data_fetch_interval,
                                query=query)
  • SyncData(ConsumerProcessorBase):内容包含如何从日志服务获取数据并投递到Splunk,请仔细阅读代码中相关注释并根据需求调整格式。

完整代码如下:

sync_data.py

python依赖请参考requirements-py3

# -*- coding: utf-8 -*-

import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import json
import socket
import requests
from concurrent.futures import ThreadPoolExecutor

# 配置程序日志文件,以便后续测试或者诊断问题
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
    这个消费者从日志服务消费数据并发送给Splunk
    """
    def __init__(self, splunk_setting=None):
        
        """初始化并验证Splunk连通性"""
        super(SyncData, self).__init__()   # remember to call base's init

        assert splunk_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = splunk_setting
        self.timeout = self.option.get("timeout", 120)

        # 测试Splunk连通性
        s = socket.socket()
        s.settimeout(self.timeout)
        s.connect((self.option["host"], self.option['port']))

        self.r = requests.session()
        self.r.max_redirects = 1
        self.r.verify = self.option.get("ssl_verify", True)
        self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
        self.url = "{0}://{1}:{2}/services/collector".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])

        self.default_fields = {}
        if self.option.get("sourcetype"):
            self.default_fields['sourcetype'] = self.option.get("sourcetype")
        if self.option.get("source"):
            self.default_fields['source'] = self.option.get("source")
        if self.option.get("index"):
            self.default_fields['index'] = self.option.get("index")

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
        for log in logs:
            # 将日志发送到远端的同步代码置于此处
            # 日志格式为字典类型,示例如下(注意:所有字符串必须为Unicode编码):
            #    Python2: {u"__time__": u"12312312", u"__topic__": u"topic", u"field1": u"value1", u"field2": u"value2"}
            #    Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
            event = {}
            event.update(self.default_fields)
            event['time'] = log[u'__time__']
            del log['__time__']

            json_topic = {"actiontrail_audit_event": ["event"] }
            topic = log.get("__topic__", "")
            if topic in json_topic:
                try:
                    for field in json_topic[topic]:
                        log[field] = json.loads(log[field])
                except Exception as ex:
                    pass
            event['event'] = json.dumps(log)

            data = json.dumps(event, sort_keys=True)

            try:
                req = self.r.post(self.url, data=data, timeout=self.timeout)
                req.raise_for_status()
            except Exception as err:
                logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}".format(self.url, err))
                raise err

                #根据需要,添加一些重试或者报告。

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_option():
    ##########################
    # 基本配置项
    ##########################

    # 从环境变量中加载日志服务参数与选项
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    endpoints = os.environ.get('SLS_ENDPOINTS', '').split(";")  # endpoints列表以;分隔
    projects = os.environ.get('SLS_PROJECTS', '').split(";")    # projects列表以;分隔,必须与endpoints列表数量相同
    logstores = os.environ.get('SLS_LOGSTORES', '').split(";")  # logstores列表以;分隔,与project匹配,单个project中的多个Logstore用英文逗号分隔
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoints and accessKeyId and accessKey and projects and logstores and consumer_group, \
        ValueError("endpoints/access_id/key/projects/logstores/consumer_group/name cannot be empty")

    assert len(endpoints) == len(projects) == len(logstores), ValueError("endpoints/projects/logstores must be paired")

    ##########################
    # 消费组的高级选项
    ##########################

    # 一般不建议修改消费者名称,尤其是需要进行并发消费时。
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # 消费的起点。这个参数在首次运行程序的时候有效,后续再次运行时将从上一次消费的保存点继续消费。
    # 可以使用“begin”、“end”,或者特定的ISO时间格式。
    cursor_start_time = "begin"

    # 心跳时长,当服务器在2倍时间内没有收到特定Shard的心跳报告时,服务器会认为对应消费者离线并重新调配任务。
    # 当网络环境不佳时,不建议将时长设置的比较小。
    heartbeat_interval = 60

    # 消费数据的最大间隔,如果数据生成的速度很快,不需要调整这个参数
    data_fetch_interval = 1

    exeuctor = ThreadPoolExecutor(max_workers=2)

    options = []
    for i in range(len(endpoints)):
        endpoint = endpoints[i].strip()
        project = projects[i].strip()
        if not endpoint or not project:
            logger.error("project: {0} or endpoint {1} is empty, skip".format(project, endpoint))
            continue

        logstore_list = logstores[i].split(",")
        for logstore in logstore_list:
            logstore = logstore.strip()
            if not logstore:
                logger.error("logstore for project: {0} or endpoint {1} is empty, skip".format(project, endpoint))
                continue

            # 构建一个消费组和消费者
            option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                                  cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                                  cursor_start_time=cursor_start_time,
                                  heartbeat_interval=heartbeat_interval,
                                  data_fetch_interval=data_fetch_interval)
            options.append(option)
             """
              #基于规则构建消费者时可使用如下代码:
              
              # SPL 语句
              query = "* | where instance_id in ('instance-1', 'instance-2')"
              # 基于规则消费配置,相比普通消费在列表最后增加了参数 query
              option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval,
                          query=query)
             """

    # Splunk选项
    settings = {
                "host": "10.1.2.3",
                "port": 80,
                "token": "a023nsdu123123123",
                'https': False,              # 可选, bool
                'timeout': 120,             # 可选, int
                'ssl_verify': True,         # 可选, bool
                "sourcetype": "",            # 可选, sourcetype
                "index": "",                # 可选, index
                "source": "",               # 可选, source
            }

    return exeuctor, options, settings

#主程序控制逻辑,针对多源日志库,共用一个executor以避免进程过多
def main():
    exeuctor, options, settings = get_option()

    logger.info("*** start to consume data...")
    workers = []

    for option in options:
        worker = ConsumerWorker(SyncData, option, args=(settings,) )
        workers.append(worker)
        worker.start()

    try:
        for i, worker in enumerate(workers):
            while worker.is_alive():
                worker.join(timeout=60)
            logger.info("worker project: {0} logstore: {1} exit unexpected, try to shutdown it".format(
                options[i].project, options[i].logstore))
            worker.shutdown()
    except KeyboardInterrupt:
        logger.info("*** try to exit **** ")
        for worker in workers:
            worker.shutdown()

        # 等待所有workers结束后再停止executor
        for worker in workers:
            while worker.is_alive():
                worker.join(timeout=60)

    exeuctor.shutdown()


if __name__ == '__main__':
    main()

Syslog

Syslog主要基于RFC5424RFC3164定义相关日志格式规范,推荐使用RFC5424协议。理论上TCPUDP都支持Syslog,可以较好的保证数据传输稳定性,RFC5424协议也定义了TLS的安全传输层,当SIEM支持TCP通道或者TLS通道时建议优先使用。

当需要投递日志数据至SIEM时可以参考sync_data.py进行配置,代码主要由三部分内容组成:

  • main()方法:主程序控制逻辑。

  • get_monitor_option() 方法:消费配置项。

    • 基本配置项:包括日志服务连接配置和消费组配置。

    • 消费组的高级选项:性能调参,不推荐修改。

    • SIEMSyslog server相关参数与选项。

      • Syslog facility:程序组件,此处选择syslogclient.FAC_USER作为默认组件。

      • Syslog severity:日志级别,可根据需求设置指定内容的日志级别。此处选择syslogclient.SEV_INFO

      • SIEM支持基于TCPTLSSyslog通道,请配置protoTLS及配置正确的SSL证书。

  • SyncData(ConsumerProcessorBase):内容包含如何从日志服务获取数据投递到SIEM Syslog服务器,请仔细阅读代码中相关注释并根据需求调整格式。

完整代码如下:

sync_data.py

python依赖请参考requirements-py3

# -*- coding: utf-8 -*-

import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import aliyun.log.ext.syslogclient as syslogclient
from aliyun.log.ext.syslogclient import SyslogClientRFC5424 as SyslogClient
import six
from datetime import datetime

# 配置程序日志文件,以便后续测试或者诊断问题
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
   消费者从日志服务消费数据并发送给Syslog server
    """
    def __init__(self, target_setting=None):
        """
        初始化并验证Syslog server连通性
        """

        super(SyncData, self).__init__()   # remember to call base's init

        assert target_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = target_setting
        self.protocol = self.option['protocol']
        self.timeout = int(self.option.get('timeout', 120))
        self.sep = self.option.get('sep', "||")
        self.host = self.option["host"]
        self.port = int(self.option.get('port', 514))
        self.cert_path=self.option.get('cert_path', None)

        # 测试连通性 
        with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
            pass

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))

        try:
            with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
                for log in logs:
                    # 将日志发送到远端的同步代码置于此处
                    # 日志格式为字典类型,示例如下(注意:所有字符串必须为Unicode编码):
                    #    Python2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
                    #    Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
                
                    timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                    del log['__time__']

                    io = six.StringIO()
                    first = True
                    # 可以根据需要修改格式化内容,这里使用Key=Value传输,并使用默认的双竖线(||)进行分割
                    for k, v in six.iteritems(log):
                        io.write("{0}{1}={2}".format(self.sep, k, v))

                    data = io.getvalue()
                    # 可以根据需要修改facility或者severity
                    client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))

        except Exception as err:
            logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
            # 需要添加一些错误处理的代码,例如重试或者通知等 
            raise err

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_monitor_option():
    ##########################
    # 基本配置项
    ##########################

    # 从环境变量中加载日志服务参数与选项
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/access_id/key/project/logstore/consumer_group/name cannot be empty")

    ##########################
    # 消费组的高级选项
    ##########################

    # 一般不建议修改消费者名称,尤其是需要进行并发消费时。
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # 消费的起点。这个参数在首次运行程序的时候有效,后续再次运行时将从上一次消费的保存点继续消费。
    # 可以使用“begin”、“end”,或者特定的ISO时间格式。
    cursor_start_time = "2019-1-1 0:0:0+8:00"

    # 心跳时长,当服务器在2倍时间内没有收到特定Shard的心跳报告时,服务器会认为对应消费者离线并重新调配任务。
    # 当网络环境不佳时,不建议将时长设置的比较小。
    heartbeat_interval = 20

    # 消费数据的最大间隔,如果数据生成的速度很快,不需要调整这个参数
    data_fetch_interval = 1

    # 构建一个消费组和消费者
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)

    # Syslog server相关参数与选项
    settings = {
                "host": "1.2.3.4", # 必选
                "port": 514,       # 必选, 端口
                "protocol": "tcp", # 必选, TCP、UDP或TLS(仅Python3)
                "sep": "||",      # 必选, key=value键值对的分隔符,这里用双竖线(||)分隔
                "cert_path": None,  # 可选,TLS的证书位置
                "timeout": 120,   # 可选,超时时间,默认120秒
                "facility": syslogclient.FAC_USER,  # 可选,可以参考其他syslogclient.FAC_*的值
                "severity": syslogclient.SEV_INFO,  # 可选,可以参考其他syslogclient.SEV_*的值
                "hostname": None, # 可选,机器名,默认选择本机机器名
                "tag": None # 可选,标签,默认是短划线(-)
    }

    return option, settings

#主程序控制逻辑
def main():
    option, settings = get_monitor_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)


if __name__ == '__main__':
    main()

启动投递与查看状态

  1. 程序配置完成后,参考如下内容设置环境变量,并启动两个消费者进行并发消费:

    export SLS_ENDPOINT=<Endpoint of your region> 
    export SLS_PROJECT=<SLS Project Name>
    export SLS_LOGSTORE=<SLS Logstore Name>
    export SLS_AK_ID=<YOUR AK ID>
    export SLS_AK_KEY=<YOUR AK KEY>
    export SLS_CG=<CUSTOMER_GROUP>
    
    nohup python3 sync_data.py &
    nohup python3 sync_data.py &

    环境变量名

    取值

    SLS_ENDPOINT

    1. 登录日志服务控制台,在Project列表中,单击目标Project。

    2. 单击Project名称右侧的image进入项目概览页面。

    3. 在访问域名中复制公网域名。

    4. 替换<Endpoint of your region>https://+公网域名

    Endpoint前缀配置为https://,如https://cn-beijing.log.aliyuncs.com,则程序自动使用HTTPS加密与日志服务连接。服务器证书*.aliyuncs.comGlobalSign签发,一般机器会自动信任此证书。若机器不信任此证书,通过Certificate installation下载并安装。

    SLS_PROJECT

    在日志服务控制台,复制目标Project名称,替换<SLS Project Name>

    SLS_LOGSTORE

    在日志服务控制台,目标Project下复制目标Logstore名称,替换<SLS Logstore Name>

    SLS_AK_ID

    建议使用RAM账号的AccessKey ID替换<YOUR AK ID>

    重要
    • 阿里云账号的AccessKey拥有所有API的访问权限,建议使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey IDAccessKey Secret保存到工程代码中,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

    SLS_AK_KEY

    建议使用RAM账号的AccessKey Secret替换<YOUR AK KEY>

    SLS_CG

    消费组名,可以简单命名为"syc_data",替换<CUSTOMER_GROUP>

  2. 在控制台查看消费组状态

  3. 通过云监控查看消费组延迟情况并配置告警

常见问题

出现ConsumerGroupQuotaExceed错误

此错误表示超出限制,单个日志库(Logstore)配置消费组上限为30个,请在日志服务控制台删除无用消费组。