本文介绍如何通过创建SLS Source Connector,通过事件总线EventBridge将数据从日志服务SLS导出至消息队列Kafka版实例的Topic。
前提条件
- 消息队列Kafka版
- 事件总线EventBridge
- 开通事件总线EventBridge并授权
- 创建可信实体为阿里云服务的RAM角色并授权。若您需要通过系统策略获取完整的访问权限,创建的RAM角色的权限策略中可使用
AliyunLogFullAccess
系统策略,信任策略管理配置如下所示:{
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": [
"eventbridge.aliyuncs.com"
]
}
}
],
"Version": "1"
}
- 日志服务SLS
背景信息
您可以在消息队列Kafka版控制台创建数据同步任务,将日志服务的日志库中的数据同步至消息队列Kafka版的Topic中。该同步任务将依赖阿里云事件总线EventBridge实现,具体为事件总线EventBridge中的事件流。更多信息,请参见事件流概述。
注意事项
- 仅支持在同地域内,将数据从日志服务导出至消息队列Kafka版实例的数据源Topic。Connector的限制说明,请参见使用限制。
- 创建Connector任务时,消息队列Kafka版会为您自动创建服务关联角色。
- 如果未创建服务关联角色,消息队列Kafka版会为您自动创建一个服务关联角色,以便您使用消息队列Kafka版接收来自日志服务的数据的功能。
- 如果已创建服务关联角色,消息队列Kafka版不会重复创建。
关于服务关联角色的更多信息,请参见服务关联角色。
创建并部署SLS Source Connector
创建并部署用于将数据从日志服务同步至消息队列Kafka版的SLS Source Connector。
- 登录消息队列Kafka版控制台,在概览页面的资源分布区域,选择地域。
- 在左侧导航栏,单击Connector 任务列表,从选择实例的下拉列表选择Connector所属的实例,然后单击创建 Source(写入kafka)。
- 在创建 Connector配置向导页面,完成以下操作。
- 在配置基本信息页签,配置以下参数,然后单击下一步。
参数 |
描述 |
示例值 |
名称 |
Connector的名称。同一个消息队列Kafka版实例内保持唯一。
|
kafka-sls-sink |
实例 |
默认配置为实例的名称与实例ID。 |
demo alikafka_post-cn-st21**** |
- 在配置源服务页签,选择源服务为日志服务SLS,配置以下参数,然后单击下一步。
参数 |
描述 |
示例值 |
日志项目名称 |
日志服务的日志项目的名称。 |
test |
日志库名称 |
存储同步数据的日志库的名称。 |
kafka-logstore |
消费初始位置 |
可选择数据同步的起始位置。
- 从指定时间开始同步:手动选择时间点作为数据同步的最早时间点,将从此时间开始出现的第一条日志开始同步。
- 从最早位置开始同步:从日志库中的第一条日志开始同步。
- 从最晚位置开始同步:当任务部署完成后,从最新的日志开始同步。
|
从最晚位置开始同步 |
时间点 |
可选择时间点,精确到秒。当消费初始位置参数配置为从指定时间开始同步时,需配置该参数。
|
2022-09-15 10:00:00 |
角色名称 |
授权事件总线服务进行数据同步任务的RAM角色,下拉框会自动过滤用户RAM角色中受信服务为事件总线的部分。 |
role |
- 在配置目标服务页签,显示数据将同步到目标消息队列Kafka版实例,配置以下参数,然后单击创建。
参数 |
描述 |
示例值 |
目标Topic |
数据同步的目标位置,消息队列Kafka版中的Topic。
|
sls-source |
确认模式 |
数据同步至消息队列Kafka版时的确认模式。等同于生产者Acks配置。
- None:无需确认,等同于
acks=0 。
- LeaderOnly:主节点确认,等同于
acks=1 。
- All:全部节点确认,等同于
acks=all 。
关于各个模式的性能及安全性,请参见Acks 。
|
LeaderOnly |
容错策略 |
当错误发生时的处理方式:
- 允许:允许异常容错,当异常发生时不会阻塞执行。
- 禁止:不允许容错,异常发生时会阻塞运行。
|
允许 |
- 创建完成后,在Connector 任务列表页面,找到创建的Connector ,单击其操作列的部署。
在
Connector 任务列表页面,您可以看到创建的任务
状态为
运行中,则说明任务创建成功。
说明 如果创建失败,请再次检查本文前提条件中的操作是否已全部完成。
验证结果
- 向阿里云日志服务SLS导入数据,例如MySQL数据。更多信息,请参见导入MySQL数据。
- 使用消息队列Kafka版提供的消息查询功能,验证数据能否被导出至消息队列Kafka版目标Topic。