使用函数计算(FC)通过Syslog协议投递日志到SIEM平台

Syslog是一个常见的日志通道,几乎所有的SIEM(例如IBM Qradar, HP Arcsight)都支持通过Syslog渠道接收日志。本文主要介绍如何使用函数计算通过Syslog协议投递日志服务(SLS)中的日志到SIEM平台。

背景信息

  • Syslog主要是基于RFC5424和RFC3164定义相关格式规范,RFC3164协议是2001年发布的,RFC5424协议是2009年发布的升级版本。因为新版兼容旧版,且新版本解决了很多问题,因此推荐使用RFC5424协议。更多信息,请参见RFC5424RFC3164

  • Syslog over TCP/TLS:Syslog只规定日志格式,理论上TCP和UDP都支持Syslog,可以较好的保证数据传输稳定性。RFC5425协议也定义了TLS的安全传输层,如果您的SIEM支持TCP通道或者TLS通道,则建议优先使用TCP或者TLS。更多信息,请参见RFC5425

  • 日志服务:日志服务SLS是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务。日志服务一站式提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能。

  • 函数计算:阿里云函数计算是事件驱动的全托管计算服务。通过函数计算,您无需管理服务器等基础设施,只需编写代码并上传。函数计算会为您准备好计算资源,以弹性、可靠的方式运行您的代码,并提供日志查询、性能监控、报警等功能。

  • SLS触发器:日志服务ETL Job对应于函数计算的一个触发器,当创建日志服务ETL Job后,日志服务会根据该ETL Job的配置启动定时器,定时器轮询Logstore中的Shard信息,当发现有新的数据写入时,即生成<shard_id,begin_cursor,end_cursor >三元组信息作为函数Event,并触发函数执行。

投递流程

推荐使用日志服务消费组构建程序来进行实时消费,消费程序可以托管在函数计算,然后日志服务触发器通过Syslog over TCP/TLS来发送日志给SIEM,流程如下。

image.png

投递示例

本文以SLS的Logstore为数据源,通过阿里云函数计算,并使用Syslog协议投递到SIEM平台。

说明
  • 本文主要参考SLS触发器

  • 本文所用函数计算为函数计算2.0版本。

前提条件

投递过程

步骤一:在函数计算中创建函数

  1. 登录函数计算控制台,单击右上角返回函数计算2.0

    image

  2. 单击服务及函数

  3. 在顶部菜单栏,选择地域,然后在服务列表页面,单击创建服务

    输入名称后,高级选项配置服务角色,管理日志服务的角色需要为新建角色增加消费Logstore的权限,其他均保持默认。更多信息,请参见授予函数计算访问其他云服务的权限指定Logstore的消费权限

  4. 创建服务完成后,在函数管理页面,单击创建函数

    image

    函数配置如下:

    1. 创建函数方式选择使用内置运行时创建

    2. 基本设置

      • 输入函数名称

      • 请求处理程序类型选择处理事件请求

    3. 函数代码

      • 运行环境选择Python 3.9

      • 代码上传方式选择使用示例代码,在示例代码描述中选择日志服务SLS触发函数

    4. 环境变量

      1. 单击使用表单编辑

      2. 单击+添加变量

      3. 配置环境变量的键值对:

      • SYSLOG_HOST:Syslog服务端的Host(数据接收端),格式为hostName.日志服务Endpoint。关于Endpoint,请参见服务入口

      • SYSLOG_PORT:Syslog服务端的端口,默认为10009(数据接收端)。

      • SYSLOG_PROTOCOL:Syslog使用的传输层协议,默认为tcp。

        image.png

    5. 触发器配置

      具体配置请参见SLS触发器

      image

      1. 触发器类型选择日志服务 SLS

      2. 日志项目日志库触发器间隔重试次数触发器日志根据实际需要填写,其中触发器日志会记录触发器的触发日志。

      3. 调用参数,保持默认。

      4. 角色名称选择AliyunLogETLRole

        说明

        如果您第一次创建该类型的触发器,则需要在单击确定后,在弹出的对话框中单击立即授权,跳转到授权界面,创建日志服务触发器的默认角色 AliyunLogETLRole

  5. 单击创建

    image

步骤二:配置函数代码

  1. 函数代码页签中,替换index.py代码如下:

    """
    本代码样例主要实现以下功能:
    * 从 event 中解析出 SLS 事件触发相关信息
    * 根据以上获取的信息,初始化 SLS 客户端
    * 从源 log store 获取实时日志数据
    * 将日志数据通过syslog协议输出
    
    
    This sample code is mainly doing the following things:
    * Get SLS processing related information from event
    * Initiate SLS client
    * Pull logs from source log store
    * Send logs with syslog
    
    """
    # !/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import json
    from aliyun.log import LogClient
    import os
    import logging
    import six
    from datetime import datetime
    
    from aliyun.log import PullLogResponse
    from aliyun.log.ext import syslogclient
    from pysyslogclient import SyslogClientRFC5424 as SyslogClient
    
    logger = logging.getLogger()
    
    
    def get_syslog_config():
        config = {
            'host': os.environ.get('SYSLOG_HOST', ''),
            'port': int(os.environ.get('SYSLOG_PORT', '514')),
            'protocol': os.environ.get('SYSLOG_PROTOCOL', 'tcp'),
            "facility": syslogclient.FAC_USER,  # 可选,可以参考其他syslogclient.FAC_*的值。
            "severity": syslogclient.SEV_INFO,  # 可选,可以参考其他syslogclient.SEV_*的值。
            "hostname": "aliyun.example.com",  # 可选,机器名,默认选择本机机器名。
            "tag": "tag"  # 可选,标签,默认是短划线(-)。
        }
        return config
    
    
    def init_syslog_client(config):
        client = SyslogClient(config.get('host'), config.get('port'), config.get('protocol'))
        return client
    
    
    def process(shard_id, log_groups, config):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(shard_id, len(logs)))
        try:
            client = init_syslog_client(config)
            for log in logs:
                # suppose we only care about audit log
                timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                del log['__time__']
    
                io = six.StringIO()
                # 可以根据需要修改格式化内容,这里使用Key=Value传输,并使用默认的双竖线(||)进行分割。
                for k, v in six.iteritems(log):
                    io.write("{0}{1}={2}".format('||', k, v))
    
                data = io.getvalue()
    
                # 可以根据需要修改facility或者severity。
                client.log(data,
                           facility=config.get("facility", None),
                           severity=config.get("severity", None),
                           timestamp=timestamp,
                           program=config.get("tag", None),
                           hostname=config.get("hostname", None))
    
        except Exception as err:
            logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(config, err))
            # 需要添加一些错误处理的代码,例如重试或者通知等。
            raise err
    
        logger.info("Complete send data to remote")
    
    
    def handler(event, context):
        # 可以通过 context.credentials 获取密钥信息
        # Access keys can be fetched through context.credentials
        print("The content in context entity is: \n")
        print(context)
        creds = context.credentials
        access_key_id = creds.access_key_id
        access_key_secret = creds.access_key_secret
        security_token = creds.security_token
    
        # 解析 event 参数至 object 格式
        # parse event in object
        event_obj = json.loads(event.decode())
        print("The content in event entity is: \n")
        print(event_obj)
    
        # 从 event.source 中获取日志项目名称、日志仓库名称、日志服务访问 endpoint、日志起始游标、日志终点游标以及分区 id
        # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source
        source = event_obj['source']
        log_project = source['projectName']
        log_store = source['logstoreName']
        endpoint = source['endpoint']
        begin_cursor = source['beginCursor']
        end_cursor = source['endCursor']
        shard_id = source['shardId']
    
        # 初始化 sls 客户端
        # Initialize client of sls
        client = LogClient(endpoint=endpoint,
                           accessKeyId=access_key_id,
                           accessKey=access_key_secret,
                           securityToken=security_token)
    
        syslog_config = get_syslog_config()
    
        # 基于日志的游标从源日志库中读取日志,本示例中的游标范围包含了触发本次执行的所有日志内容
        # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation
        while True:
            response = client.pull_logs(project_name=log_project,
                                        logstore_name=log_store,
                                        shard_id=shard_id,
                                        cursor=begin_cursor,
                                        count=100,
                                        end_cursor=end_cursor,
                                        compress=False)
            log_group_cnt = response.get_loggroup_count()
            if log_group_cnt == 0:
                break
            logger.info("get %d log group from %s" % (log_group_cnt, log_store))
            process(shard_id, response.get_loggroup_list(), syslog_config)
    
            begin_cursor = response.get_next_cursor()
    
        return 'success'
    

    image

步骤三:添加自定义层

此文档通过Syslog导入,需要引入Syslog的依赖库,添加自定义层导入Syslog的依赖库。

  1. 单击编辑层

  2. 在弹出的面板中,选择添加层 > 添加自定义层

  3. 编辑函数面板,单击创建自定义层,并配置如下参数。

    1. 兼容运行时选择Python 3.9

    2. 层上传方式选择在线构建依赖层

    3. 构建环境选择Python 3.9

    4. requirements.txt 文件输入pysyslogclient

    5. 单击创建。

  4. 返回编辑函数面板,在第一层选择3创建的自定义层,单击确定

步骤四:测试并部署

  1. 单击测试函数,返回执行成功,表示投递成功。

    image

  2. 单击部署代码,完成发布。

常见问题

测试函数报错UnhandleInvocationError

  1. 创建服务时需要配置角色的权限,请参见配置角色

  2. 环境变量配置问题,请参见环境变量配置

  3. 如果还有疑问,欢迎加入钉钉用户群(钉钉群号:11721331),与函数计算工程师即时沟通。

后续操作

  • 通过syslog服务器查看日志接收情况。

  • 通过调用日志页签查看函数触发、调用情况。

    image

相关文档