本文介绍如何通过创建SLS Sink Connector,通过事件总线EventBridge将数据从云消息队列 Kafka 版实例的数据源Topic导出至日志服务SLS。

前提条件

  • 云消息队列 Kafka 版
    • 云消息队列 Kafka 版实例开启Connector。更多信息,请参见开启Connector
    • 云消息队列 Kafka 版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic
  • 事件总线EventBridge
    • 开通事件总线EventBridge并授权
    • 创建可信实体为阿里云服务的RAM角色并授权。若您需要通过系统策略获取完整的访问权限,创建的RAM角色的权限策略中可使用AliyunLogFullAccess系统策略,信任策略管理配置如下所示:
      {
        "Statement": [
          {
            "Action": "sts:AssumeRole",
            "Effect": "Allow",
            "Principal": {
              "Service": [
                "eventbridge.aliyuncs.com"
              ]
            }
          }
        ],
        "Version": "1"
      }
      说明 如需通过自定义配置对权限进行精细管理,请参见步骤二:为RAM用户授权
  • 日志服务SLS

背景信息

您可以在云消息队列 Kafka 版控制台创建数据同步任务,将云消息队列 Kafka 版Topic中的数据同步至日志服务的日志库中。该同步任务将依赖阿里云事件总线EventBridge实现,具体为事件总线EventBridge中的事件流。更多信息,请参见事件流概述

注意事项

  • 仅支持在同地域内,将数据从云消息队列 Kafka 版实例的数据源Topic导出至日志服务。Connector的限制说明,请参见使用限制
  • 创建Connector任务时,云消息队列 Kafka 版会为您自动创建服务关联角色。
    • 如果未创建服务关联角色,云消息队列 Kafka 版会为您自动创建一个服务关联角色,以便您使用云消息队列 Kafka 版导出数据至日志服务的功能。
    • 如果已创建服务关联角色,云消息队列 Kafka 版不会重复创建。
    关于服务关联角色的更多信息,请参见服务关联角色

创建并部署SLS Sink Connector

创建并部署用于将数据从云消息队列 Kafka 版同步至日志服务的SLS Sink Connector。

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
  2. 在左侧导航栏,单击Connector 任务列表,从选择实例的下拉列表选择Connector所属的实例,然后单击创建 Sink(导出kafka)
  3. 创建 Connector配置向导页面,完成以下操作。
    1. 配置基本信息页签,按需配置以下参数,然后单击下一步
      参数描述示例值
      名称Connector的名称。

      Connector的数据同步任务必须使用名称为connect-任务名称的Group。如果您未手动创建该Group,系统将为您自动创建。

      kafka-sls-sink
      实例默认配置为实例的名称与实例ID。demo alikafka_post-cn-st21p8vj****
    2. 配置源服务页签,配置以下参数,然后单击下一步
      参数说明示例值
      数据源 Topic需要同步数据的Topic。sls-test-input
      消费线程并发数数据源Topic的消费线程并发数。默认值为6。6
      消费初始位置开始消费的位置。取值说明如下:
      • 最早位点:从最初位点开始消费。
      • 最近位点:从最新位点开始消费。
      最早位点
      VPC ID数据同步任务所在的VPC。单击配置运行环境显示该参数。默认为云消息队列 Kafka 版实例所在的VPC,您无需填写。vpc-bp1xpdnd3****
      vSwitch ID数据同步任务所在的交换机。单击配置运行环境显示该参数。该交换机必须与云消息队列 Kafka 版实例处于同一VPC。默认为部署云消息队列 Kafka 版实例时填写的交换机。vsw-bp1d2jgg8****
      失败处理消息发送失败后,是否继续订阅出现错误的Topic的分区。单击配置运行环境显示该参数。取值说明如下。
      • 继续订阅:继续订阅出现错误的Topic的分区,并打印错误日志。
      • 停止订阅:停止订阅出现错误的Topic的分区,并打印错误日志。
      说明
      继续订阅
      创建资源方式选择创建Connector所依赖的Topic与Group的方式。单击配置运行环境显示该参数。自动创建
      Connector 消费组Connector的数据同步任务使用的Group。单击配置运行环境显示该参数。该Group的名称必须为connect-任务名称connect-kafka-sls-sink
      任务位点 Topic用于存储消费位点的Topic。单击配置运行环境显示该参数。
      • Topic:建议以connect-offset开头。
      • 分区数:Topic的分区数量必须大于1。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-offset-kafka-sls-sink
      任务配置 Topic用于存储任务配置的Topic。单击配置运行环境显示该参数。
      • Topic:建议以connect-config开头。
      • 分区数:Topic的分区数量必须为1。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-config-kafka-sls-sink
      任务状态 Topic用于存储任务状态的Topic。单击配置运行环境显示该参数。
      • Topic:建议以connect-status开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-status-kafka-sls-sink
      死信队列 Topic用于存储Connect框架的异常数据的Topic。单击配置运行环境显示该参数。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。
      • Topic:建议以connect-error开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
      connect-error-kafka-sls-sink
      异常数据 Topic用于存储Sink的异常数据的Topic。单击配置运行环境显示该参数。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。
      • Topic:建议以connect-error开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
      connect-error-kafka-sls-sink
    3. 配置目标服务页签,选择目标服务为日志服务 SLS,配置以下参数,然后单击创建
      参数描述示例值
      日志项目名称日志服务的日志项目的名称。k00eny67
      日志库名称存储同步数据的日志库的名称。kafka-logstore
      日志主题可在采集日志时指定主题,用于区分日志。kafka
      角色名称授权事件总线服务进行数据同步任务的RAM角色,下拉框会自动过滤用户RAM角色中受信服务为事件总线的部分。testrole
      创建完成后,在Connector 任务列表页面,查看创建的Connector。
  4. 创建完成后,在Connector 任务列表页面,找到创建的Connector ,单击其操作列的部署
    任务创建并部署完成后,将会自动在事件总线服务的同账号同区域下创建一个同名事件流任务。

发送测试消息

部署SLS Sink Connector后,您可以向云消息队列 Kafka 版的数据源Topic发送消息,测试数据能否被同步至日志服务。

  1. Connector 任务列表页面,找到目标Connector,在其右侧操作列,单击测试
  2. 发送消息面板,设置以下参数,发送测试消息。
    参数说明示例值
    消息 Key发送的测试消息的Key值。demo
    消息内容测试的消息内容。{"key": "test"}
    发送到指定分区
    • :在分区 ID文本框中输入分区的ID。如果您需查询分区的ID,请参见查看分区状态
    • :不指定分区。

查看日志

云消息队列 Kafka 版的数据源Topic发送消息后,在日志服务控制台查看日志,验证是否收到消息。

  1. 登录日志服务控制台。在Project列表区域,单击目标Project。
  2. 日志库页面,单击目标Logstore。
  3. 单击查询/分析,查看查询分析结果。
    查询分析

查看同步进度

任务部署完成并进入运行中状态后,可在Connector任务列表页面单击目标任务右侧操作列的消费进度,查看当前任务的数据同步状态。