本文介绍如何通过创建SLS Source Connector,通过事件总线EventBridge将数据从日志服务SLS导出至云消息队列 Kafka 版实例的Topic。

前提条件

  • 云消息队列 Kafka 版
    • 云消息队列 Kafka 版实例开启Connector。更多信息,请参见开启Connector
    • 云消息队列 Kafka 版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic
  • 事件总线EventBridge
    • 开通事件总线EventBridge并授权
    • 创建可信实体为阿里云服务的RAM角色并授权。若您需要通过系统策略获取完整的访问权限,创建的RAM角色的权限策略中可使用AliyunLogFullAccess系统策略,信任策略管理配置如下所示:
      {
        "Statement": [
          {
            "Action": "sts:AssumeRole",
            "Effect": "Allow",
            "Principal": {
              "Service": [
                "eventbridge.aliyuncs.com"
              ]
            }
          }
        ],
        "Version": "1"
      }
      说明 如需通过自定义配置对权限进行精细管理,请参见步骤二:为RAM用户授权
  • 日志服务SLS

背景信息

您可以在云消息队列 Kafka 版控制台创建数据同步任务,将日志服务的日志库中的数据同步至云消息队列 Kafka 版的Topic中。该同步任务将依赖阿里云事件总线EventBridge实现,具体为事件总线EventBridge中的事件流。更多信息,请参见事件流概述

注意事项

  • 仅支持在同地域内,将数据从日志服务导出至云消息队列 Kafka 版实例的数据源Topic。Connector的限制说明,请参见使用限制
  • 创建Connector任务时,云消息队列 Kafka 版会为您自动创建服务关联角色。
    • 如果未创建服务关联角色,云消息队列 Kafka 版会为您自动创建一个服务关联角色,以便您使用云消息队列 Kafka 版接收来自日志服务的数据的功能。
    • 如果已创建服务关联角色,云消息队列 Kafka 版不会重复创建。
    关于服务关联角色的更多信息,请参见服务关联角色

创建并部署SLS Source Connector

创建并部署用于将数据从日志服务同步至云消息队列 Kafka 版的SLS Source Connector。

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
  2. 在左侧导航栏,单击Connector 任务列表,从选择实例的下拉列表选择Connector所属的实例,然后单击创建 Source(写入kafka)
  3. 创建 Connector配置向导页面,完成以下操作。
    1. 配置基本信息页签,配置以下参数,然后单击下一步
      参数描述示例值
      名称Connector的名称。同一个云消息队列 Kafka 版实例内保持唯一。kafka-sls-sink
      实例默认配置为实例的名称与实例ID。demo alikafka_post-cn-st21****
    2. 配置源服务页签,选择源服务为日志服务SLS,配置以下参数,然后单击下一步
      参数描述示例值
      日志项目名称日志服务的日志项目的名称。test
      日志库名称存储同步数据的日志库的名称。kafka-logstore
      消费初始位置可选择数据同步的起始位置。
      • 从指定时间开始同步:手动选择时间点作为数据同步的最早时间点,将从此时间开始出现的第一条日志开始同步。
      • 从最早位置开始同步:从日志库中的第一条日志开始同步。
      • 从最晚位置开始同步:当任务部署完成后,从最新的日志开始同步。
      从最晚位置开始同步
      时间点可选择时间点,精确到秒。当消费初始位置参数配置为从指定时间开始同步时,需配置该参数。2022-09-15 10:00:00
      角色名称授权事件总线服务进行数据同步任务的RAM角色,下拉框会自动过滤用户RAM角色中受信服务为事件总线的部分。role
    3. 配置目标服务页签,显示数据将同步到目标云消息队列 Kafka 版实例,配置以下参数,然后单击创建
      参数描述示例值
      目标Topic数据同步的目标位置,云消息队列 Kafka 版中的Topic。sls-source
      确认模式数据同步至云消息队列 Kafka 版时的确认模式。等同于生产者Acks配置。
      • None:无需确认,等同于acks=0
      • LeaderOnly:主节点确认,等同于acks=1
      • All:全部节点确认,等同于acks=all

      关于各个模式的性能及安全性,请参见Acks

      LeaderOnly
      容错策略当错误发生时的处理方式:
      • 允许:允许异常容错,当异常发生时不会阻塞执行。
      • 禁止:不允许容错,异常发生时会阻塞运行。
      允许
  4. 创建完成后,在Connector 任务列表页面,找到创建的Connector ,单击其操作列的部署
    Connector 任务列表页面,您可以看到创建的任务状态运行中,则说明任务创建成功。
    说明 如果创建失败,请再次检查本文前提条件中的操作是否已全部完成。

验证结果

  1. 向阿里云日志服务SLS导入数据,例如MySQL数据。更多信息,请参见导入MySQL数据
  2. 使用云消息队列 Kafka 版提供的消息查询功能,验证数据能否被导出至云消息队列 Kafka 版目标Topic。
    查询的具体步骤,请参见查询消息