Syslog是一个常见的日志通道,几乎所有的SIEM(例如IBM Qradar, HP Arcsight)都支持通过Syslog渠道接收日志。本文主要介绍如何使用函数计算通过Syslog协议投递日志服务(SLS)中的日志到SIEM平台。
背景信息
Syslog主要是基于RFC5424和RFC3164定义相关格式规范,RFC3164协议是2001年发布的,RFC5424协议是2009年发布的升级版本。因为新版兼容旧版,且新版本解决了很多问题,因此推荐使用RFC5424协议。更多信息,请参见RFC5424和RFC3164。
Syslog over TCP/TLS:Syslog只规定日志格式,理论上TCP和UDP都支持Syslog,可以较好的保证数据传输稳定性。RFC5425协议也定义了TLS的安全传输层,如果您的SIEM支持TCP通道或者TLS通道,则建议优先使用TCP或者TLS。更多信息,请参见RFC5425。
日志服务:日志服务SLS是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务。日志服务一站式提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能。
函数计算:阿里云函数计算是事件驱动的全托管计算服务。通过函数计算,您无需管理服务器等基础设施,只需编写代码并上传。函数计算会为您准备好计算资源,以弹性、可靠的方式运行您的代码,并提供日志查询、性能监控、报警等功能。
SLS触发器:日志服务ETL Job对应于函数计算的一个触发器,当创建日志服务ETL Job后,日志服务会根据该ETL Job的配置启动定时器,定时器轮询Logstore中的Shard信息,当发现有新的数据写入时,即生成
<shard_id,begin_cursor,end_cursor >
三元组信息作为函数Event,并触发函数执行。
投递流程
推荐使用日志服务消费组构建程序来进行实时消费,消费程序可以托管在函数计算,然后日志服务触发器通过Syslog over TCP/TLS来发送日志给SIEM,流程如下。
投递示例
本文以SLS的Logstore为数据源,通过阿里云函数计算,并使用Syslog协议投递到SIEM平台。
本文主要参考SLS触发器。
本文所用函数计算为函数计算2.0版本。
前提条件
投递过程
步骤一:在函数计算中创建函数
登录函数计算控制台,单击右上角返回函数计算2.0。
单击服务及函数。
在顶部菜单栏,选择地域,然后在服务列表页面,单击创建服务。
输入名称后,高级选项配置服务角色,管理日志服务的角色需要为新建角色增加消费Logstore的权限,其他均保持默认。更多信息,请参见授予函数计算访问其他云服务的权限和指定Logstore的消费权限。
创建服务完成后,在函数管理页面,单击创建函数。
函数配置如下:
创建函数方式选择使用内置运行时创建。
单击创建。
步骤二:配置函数代码
在函数代码页签中,替换index.py代码如下:
""" 本代码样例主要实现以下功能: * 从 event 中解析出 SLS 事件触发相关信息 * 根据以上获取的信息,初始化 SLS 客户端 * 从源 log store 获取实时日志数据 * 将日志数据通过syslog协议输出 This sample code is mainly doing the following things: * Get SLS processing related information from event * Initiate SLS client * Pull logs from source log store * Send logs with syslog """ # !/usr/bin/env python # -*- coding: utf-8 -*- import json from aliyun.log import LogClient import os import logging import six from datetime import datetime from aliyun.log import PullLogResponse from aliyun.log.ext import syslogclient from pysyslogclient import SyslogClientRFC5424 as SyslogClient logger = logging.getLogger() def get_syslog_config(): config = { 'host': os.environ.get('SYSLOG_HOST', ''), 'port': int(os.environ.get('SYSLOG_PORT', '514')), 'protocol': os.environ.get('SYSLOG_PROTOCOL', 'tcp'), "facility": syslogclient.FAC_USER, # 可选,可以参考其他syslogclient.FAC_*的值。 "severity": syslogclient.SEV_INFO, # 可选,可以参考其他syslogclient.SEV_*的值。 "hostname": "aliyun.example.com", # 可选,机器名,默认选择本机机器名。 "tag": "tag" # 可选,标签,默认是短划线(-)。 } return config def init_syslog_client(config): client = SyslogClient(config.get('host'), config.get('port'), config.get('protocol')) return client def process(shard_id, log_groups, config): logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True) logger.info("Get data from shard {0}, log count: {1}".format(shard_id, len(logs))) try: client = init_syslog_client(config) for log in logs: # suppose we only care about audit log timestamp = datetime.fromtimestamp(int(log[u'__time__'])) del log['__time__'] io = six.StringIO() # 可以根据需要修改格式化内容,这里使用Key=Value传输,并使用默认的双竖线(||)进行分割。 for k, v in six.iteritems(log): io.write("{0}{1}={2}".format('||', k, v)) data = io.getvalue() # 可以根据需要修改facility或者severity。 client.log(data, facility=config.get("facility", None), severity=config.get("severity", None), timestamp=timestamp, program=config.get("tag", None), hostname=config.get("hostname", None)) except Exception as err: logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(config, err)) # 需要添加一些错误处理的代码,例如重试或者通知等。 raise err logger.info("Complete send data to remote") def handler(event, context): # 可以通过 context.credentials 获取密钥信息 # Access keys can be fetched through context.credentials print("The content in context entity is: \n") print(context) creds = context.credentials access_key_id = creds.access_key_id access_key_secret = creds.access_key_secret security_token = creds.security_token # 解析 event 参数至 object 格式 # parse event in object event_obj = json.loads(event.decode()) print("The content in event entity is: \n") print(event_obj) # 从 event.source 中获取日志项目名称、日志仓库名称、日志服务访问 endpoint、日志起始游标、日志终点游标以及分区 id # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source source = event_obj['source'] log_project = source['projectName'] log_store = source['logstoreName'] endpoint = source['endpoint'] begin_cursor = source['beginCursor'] end_cursor = source['endCursor'] shard_id = source['shardId'] # 初始化 sls 客户端 # Initialize client of sls client = LogClient(endpoint=endpoint, accessKeyId=access_key_id, accessKey=access_key_secret, securityToken=security_token) syslog_config = get_syslog_config() # 基于日志的游标从源日志库中读取日志,本示例中的游标范围包含了触发本次执行的所有日志内容 # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation while True: response = client.pull_logs(project_name=log_project, logstore_name=log_store, shard_id=shard_id, cursor=begin_cursor, count=100, end_cursor=end_cursor, compress=False) log_group_cnt = response.get_loggroup_count() if log_group_cnt == 0: break logger.info("get %d log group from %s" % (log_group_cnt, log_store)) process(shard_id, response.get_loggroup_list(), syslog_config) begin_cursor = response.get_next_cursor() return 'success'
步骤三:添加自定义层
此文档通过Syslog导入,需要引入Syslog的依赖库,添加自定义层导入Syslog的依赖库。
单击编辑层。
在弹出的面板中,选择
在编辑函数面板,单击创建自定义层,并配置如下参数。
兼容运行时选择Python 3.9。
层上传方式选择在线构建依赖层。
构建环境选择Python 3.9。
requirements.txt 文件输入
pysyslogclient
。单击创建。
返回编辑函数面板,在第一层选择3创建的自定义层,单击确定。
步骤四:测试并部署
单击测试函数,返回执行成功,表示投递成功。
单击部署代码,完成发布。
常见问题
测试函数报错UnhandleInvocationError
后续操作
通过syslog服务器查看日志接收情况。
通过调用日志页签查看函数触发、调用情况。
相关文档
本文介绍的SLS触发器中,使用日志服务的消费者模式对日志进行消费,消费者使用方法可以参考通过API消费。
将日志服务的日志投递到SIEM平台,也可以通过日志服务的消费组对日志进行消费,具体可以参考通过HTTPS投递日志到SIEM和通过Syslog投递日志到SIEM。
日志服务SLS的消费组的介绍可以参考通过消费组消费数据。