本文介绍如何通过创建SLS Sink Connector,通过事件总线EventBridge将数据从云消息队列 Kafka 版实例的数据源Topic导出至日志服务SLS。
前提条件
- 云消息队列 Kafka 版
- 为云消息队列 Kafka 版实例开启Connector。更多信息,请参见开启Connector。
- 为云消息队列 Kafka 版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic。
- 事件总线EventBridge
- 开通事件总线EventBridge并授权
- 创建可信实体为阿里云服务的RAM角色并授权。若您需要通过系统策略获取完整的访问权限,创建的RAM角色的权限策略中可使用
AliyunLogFullAccess系统策略,信任策略管理配置如下所示:{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "eventbridge.aliyuncs.com" ] } } ], "Version": "1" }说明 如需通过自定义配置对权限进行精细管理,请参见创建RAM用户及授权。
- 日志服务SLS
- 创建日志项目。详细步骤,请参见管理Project。
- 创建日志库。详细步骤,请参见创建基础LogStore。
背景信息
您可以在云消息队列 Kafka 版控制台创建数据同步任务,将云消息队列 Kafka 版Topic中的数据同步至日志服务的日志库中。该同步任务将依赖阿里云事件总线EventBridge实现,具体为事件总线EventBridge中的事件流。更多信息,请参见事件流概述。注意事项
创建并部署SLS Sink Connector
创建并部署用于将数据从云消息队列 Kafka 版同步至日志服务的SLS Sink Connector。
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,单击Connector 任务列表,从选择实例的下拉列表选择Connector所属的实例,然后单击创建 Sink(导出kafka)。
- 在创建 Connector配置向导页面,完成以下操作。
- 在配置基本信息页签,按需配置以下参数,然后单击下一步。
参数 描述 示例值 名称 Connector的名称。 Connector的数据同步任务必须使用名称为connect-任务名称的Group。如果您未手动创建该Group,系统将为您自动创建。
kafka-sls-sink 实例 默认配置为实例的名称与实例ID。 demo alikafka_post-cn-st21p8vj**** - 在配置源服务页签,配置以下参数,然后单击下一步。
参数 说明 示例值 数据源 Topic 需要同步数据的Topic。 sls-test-input 消费线程并发数 数据源Topic的消费线程并发数。默认值为6。 6 消费初始位置 开始消费的位置。取值说明如下: - 最早位点:从最初位点开始消费。
- 最近位点:从最新位点开始消费。
最早位点 VPC ID 数据同步任务所在的VPC。单击配置运行环境显示该参数。默认为云消息队列 Kafka 版实例所在的VPC,您无需填写。 vpc-bp1xpdnd3**** vSwitch ID 数据同步任务所在的交换机。单击配置运行环境显示该参数。该交换机必须与云消息队列 Kafka 版实例处于同一VPC。默认为部署云消息队列 Kafka 版实例时填写的交换机。 vsw-bp1d2jgg8**** 失败处理 消息发送失败后,是否继续订阅出现错误的Topic的分区。单击配置运行环境显示该参数。取值说明如下。 - 继续订阅:继续订阅出现错误的Topic的分区,并打印错误日志。
- 停止订阅:停止订阅出现错误的Topic的分区,并打印错误日志。
说明- 如何查看日志,请参见Connector相关操作。
- 如何根据错误码查找解决方案,请参见错误码。
继续订阅 创建资源方式 选择创建Connector所依赖的Topic与Group的方式。单击配置运行环境显示该参数。 自动创建 Connector 消费组 Connector的数据同步任务使用的Group。单击配置运行环境显示该参数。该Group的名称必须为connect-任务名称。 connect-kafka-sls-sink 任务位点 Topic 用于存储消费位点的Topic。单击配置运行环境显示该参数。 - Topic:建议以connect-offset开头。
- 分区数:Topic的分区数量必须大于1。
- 存储引擎:Topic的存储引擎必须为Local存储。
- cleanup.policy:Topic的日志清理策略必须为compact。
connect-offset-kafka-sls-sink 任务配置 Topic 用于存储任务配置的Topic。单击配置运行环境显示该参数。 - Topic:建议以connect-config开头。
- 分区数:Topic的分区数量必须为1。
- 存储引擎:Topic的存储引擎必须为Local存储。
- cleanup.policy:Topic的日志清理策略必须为compact。
connect-config-kafka-sls-sink 任务状态 Topic 用于存储任务状态的Topic。单击配置运行环境显示该参数。 - Topic:建议以connect-status开头。
- 分区数:Topic的分区数量建议为6。
- 存储引擎:Topic的存储引擎必须为Local存储。
- cleanup.policy:Topic的日志清理策略必须为compact。
connect-status-kafka-sls-sink 死信队列 Topic 用于存储Connect框架的异常数据的Topic。单击配置运行环境显示该参数。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。 - Topic:建议以connect-error开头。
- 分区数:Topic的分区数量建议为6。
- 存储引擎:Topic的存储引擎可以为Local存储或云存储。
connect-error-kafka-sls-sink 异常数据 Topic 用于存储Sink的异常数据的Topic。单击配置运行环境显示该参数。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。 - Topic:建议以connect-error开头。
- 分区数:Topic的分区数量建议为6。
- 存储引擎:Topic的存储引擎可以为Local存储或云存储。
connect-error-kafka-sls-sink - 在配置目标服务页签,选择目标服务为日志服务 SLS,配置以下参数,然后单击创建。
参数 描述 示例值 日志项目名称 日志服务的日志项目的名称。 k00eny67 日志库名称 存储同步数据的日志库的名称。 kafka-logstore 日志主题 可在采集日志时指定主题,用于区分日志。 kafka 角色名称 授权事件总线服务进行数据同步任务的RAM角色,下拉框会自动过滤用户RAM角色中受信服务为事件总线的部分。 testrole 创建完成后,在Connector 任务列表页面,查看创建的Connector。
- 在配置基本信息页签,按需配置以下参数,然后单击下一步。
- 创建完成后,在Connector 任务列表页面,找到创建的Connector ,单击其操作列的部署。任务创建并部署完成后,将会自动在事件总线服务的同账号同区域下创建一个同名事件流任务。
发送测试消息
部署SLS Sink Connector后,您可以向云消息队列 Kafka 版的数据源Topic发送消息,测试数据能否被同步至日志服务。
在Connector 任务列表页面,找到目标Connector,在其右侧操作列,单击测试。
- 在发送消息面板,设置以下参数,发送测试消息。
参数 说明 示例值 消息 Key 发送的测试消息的Key值。 demo 消息内容 测试的消息内容。 {"key": "test"} 发送到指定分区 - 是:在分区 ID文本框中输入分区的ID。如果您需查询分区的ID,请参见查看分区状态。
- 否:不指定分区。
否
查看日志
向云消息队列 Kafka 版的数据源Topic发送消息后,在日志服务控制台查看日志,验证是否收到消息。
- 登录日志服务控制台。在Project列表区域,单击目标Project。
- 在日志库页面,单击目标Logstore。
- 单击查询 / 分析,查看查询分析结果。

查看同步进度
任务部署完成并进入运行中状态后,可在Connector任务列表页面单击目标任务右侧操作列的消费进度,查看当前任务的数据同步状态。
该文章对您有帮助吗?