本文介绍如何通过创建SLS Source Connector,通过事件总线EventBridge将数据从日志服务SLS导出至云消息队列 Kafka 版实例的Topic。
前提条件
- 云消息队列 Kafka 版
- 为云消息队列 Kafka 版实例开启Connector。更多信息,请参见开启Connector。
- 为云消息队列 Kafka 版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic。
- 事件总线EventBridge
- 开通事件总线EventBridge并授权
- 创建可信实体为阿里云服务的RAM角色并授权。若您需要通过系统策略获取完整的访问权限,创建的RAM角色的权限策略中可使用
AliyunLogFullAccess系统策略,信任策略管理配置如下所示:{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "eventbridge.aliyuncs.com" ] } } ], "Version": "1" }说明 如需通过自定义配置对权限进行精细管理,请参见创建RAM用户及授权。
- 日志服务SLS
- 创建日志项目。详细步骤,请参见管理Project。
- 创建日志库。详细步骤,请参见创建基础LogStore。
背景信息
您可以在云消息队列 Kafka 版控制台创建数据同步任务,将日志服务的日志库中的数据同步至云消息队列 Kafka 版的Topic中。该同步任务将依赖阿里云事件总线EventBridge实现,具体为事件总线EventBridge中的事件流。更多信息,请参见事件流概述。注意事项
创建并部署SLS Source Connector
创建并部署用于将数据从日志服务同步至云消息队列 Kafka 版的SLS Source Connector。
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,单击Connector 任务列表,从选择实例的下拉列表选择Connector所属的实例,然后单击创建 Source(写入kafka)。
- 在创建 Connector配置向导页面,完成以下操作。
- 在配置基本信息页签,配置以下参数,然后单击下一步。
参数 描述 示例值 名称 Connector的名称。同一个云消息队列 Kafka 版实例内保持唯一。 kafka-sls-sink 实例 默认配置为实例的名称与实例ID。 demo alikafka_post-cn-st21**** - 在配置源服务页签,选择源服务为日志服务SLS,配置以下参数,然后单击下一步。
参数 描述 示例值 日志项目名称 日志服务的日志项目的名称。 test 日志库名称 存储同步数据的日志库的名称。 kafka-logstore 消费初始位置 可选择数据同步的起始位置。 - 从指定时间开始同步:手动选择时间点作为数据同步的最早时间点,将从此时间开始出现的第一条日志开始同步。
- 从最早位置开始同步:从日志库中的第一条日志开始同步。
- 从最晚位置开始同步:当任务部署完成后,从最新的日志开始同步。
从最晚位置开始同步 时间点 可选择时间点,精确到秒。当消费初始位置参数配置为从指定时间开始同步时,需配置该参数。 2022-09-15 10:00:00 角色名称 授权事件总线服务进行数据同步任务的RAM角色,下拉框会自动过滤用户RAM角色中受信服务为事件总线的部分。 role - 在配置目标服务页签,显示数据将同步到目标云消息队列 Kafka 版实例,配置以下参数,然后单击创建。
参数 描述 示例值 目标Topic 数据同步的目标位置,云消息队列 Kafka 版中的Topic。 sls-source 确认模式 数据同步至云消息队列 Kafka 版时的确认模式。等同于生产者Acks配置。 - None:无需确认,等同于
acks=0。 - LeaderOnly:主节点确认,等同于
acks=1。 - All:全部节点确认,等同于
acks=all。
关于各个模式的性能及安全性,请参见Acks
。LeaderOnly 容错策略 当错误发生时的处理方式: - 允许:允许异常容错,当异常发生时不会阻塞执行。
- 禁止:不允许容错,异常发生时会阻塞运行。
允许 - None:无需确认,等同于
- 在配置基本信息页签,配置以下参数,然后单击下一步。
- 创建完成后,在Connector 任务列表页面,找到创建的Connector ,单击其操作列的部署。在Connector 任务列表页面,您可以看到创建的任务状态为运行中,则说明任务创建成功。说明 如果创建失败,请再次检查本文前提条件中的操作是否已全部完成。
验证结果
该文章对您有帮助吗?