创建OSS Sink Connector(旧版)

本文介绍如何创建使用OSS Sink Connector,您可以通过OSS Sink Connector将数据从云消息队列 Kafka 版的数据源Topic导出至对象存储OSS的Object中。

前提条件

详细步骤,请参见创建前提

步骤一:创建目标服务资源

在对象存储OSS控制台创建一个存储空间(Bucket)。详细步骤,请参见控制台创建存储空间

本文以oss-sink-connector-bucket Bucket为例。

步骤二:创建OSS Sink Connector并启动

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  2. 在左侧导航栏,选择Connector生态集成 > 消息流出(Sink)

  3. 消息流出(Sink)页面,单击创建任务

  4. 消息流出创建面板,配置以下参数,单击确定

    1. 基础信息区域,设置任务名称,将流出类型选择为对象存储OSS

    2. 资源配置区域,设置以下参数。

      表 1. 源(云消息队列 Kafka 版

      参数

      说明

      示例

      地域

      源Kafka实例所在的地域。

      华东1(杭州)

      kafka实例

      数据源所在的Kafka实例ID。

      alikafka_post-cn-9hdsbdhd****

      Topic

      数据源所在的Kafka实例Topic。

      guide-sink-topic

      Group ID

      数据源所在的Kafka实例中的Group ID。

      • 快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。

      • 使用已有:选择已创建的Group,请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。

      使用已有

      并发配额(消费者数)

      消费Topic数据的并发线程数,线程和Topic分区的对应关系如下:

      • Topic分区数=并发消费数:一个线程消费一个Topic分区。建议使用。

      • Topic分区数>并发消费数:多个并发消费会均摊所有分区消费。

      • Topic分区数<并发消费数:一个线程消费一个Topic分区,多出的消费数无效。

      2

      消费位点

      • 最新位点:从最新位点开始消费。

      • 最早位点:从最初位点开始消费。

      最新位点

      网络配置

      有跨境传输数据需求时选择自建公网,其他情况可选择默认网络

      默认网络

      表 1. 目标(对象存储OSS)

      参数

      说明

      示例

      OSS Bucket

      已创建的对象存储OSS Bucket。

      oss-sink-connector-bucket

      保存路径

      • 无需分区:数据保存路径为{Kafka Instance ID}/{Topic Name}

      • 时间分区

        • YYYY/MM/dd/HH:生成的OSS文件目录为{Kafka Instance ID}/{Topic Name}/YYYY/MM/dd/HH

        • YYYY/MM/dd:生成的OSS文件目录为{Kafka Instance ID}/{Topic Name}/YYYY/MM/dd

        • YYYYMMddHH:生成的OSS文件目录为{Kafka Instance ID}/{Topic Name}/YYYYMMddHH

        • YYYYMMdd:生成的OSS文件目录为{Kafka Instance ID}/{Topic Name}/YYYYMMdd

      说明

      其中,YYYY、MM、dd、HH分别代表年、月、日、时。

      alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH

      高级配置

      当积攒的消息满足批量聚合文件大小批量聚合事件窗口两个条件中的任意一个时,新的消息将会写入新的文件。

      批量聚合文件大小

      配置需要聚合的文件大小,取值范围为[1,128],单位:MiB。

      5

      批量聚合事件窗口

      配置需要聚合的时间窗口。单位:分钟。

      1

    完成上述配置后,在消息流出(Sink)页面,找到刚创建的OSS Sink Connector任务,单击其右侧操作列的启动。当状态栏由启动中变为运行中时,Connector创建成功。

步骤三:测试OSS Sink Connector

  1. 消息流出(Sink)页面,在OSS Sink Connector任务的事件源列单击源Topic。

  2. 在Topic详情页面,单击体验发送消息

  3. 快速体验消息收发面板,按照下图配置消息内容,然后单击确定

    发送消息

  4. 消息流出(Sink)页面,在OSS Sink Connector任务的事件目标列单击目标Bucket。

  5. 在Bucket页面,选择左侧导航栏的文件管理 > 文件列表,然后进入Bucket的最深层路径。

    最深层路径

    可以看到此路径中有如下两类Object:

    • 系统meta文件:格式为.oss_meta_file_partition_{partitionID},文件数量和上游Topic的Partition数量相同,用于记录攒批信息,您无需关注。

    • 数据文件:格式为partition_{partitionID}_offset_{offset}_{8位Random字符串},如果一个Object中聚合了一个Partition的多条消息,Object名称中的Offset为这批消息中的最小Offset值。

  6. 在对应Object右侧操作列,选择图标 > 下载

  7. 打开下载的文件,查看消息内容。

    消息

    如图所示,多条消息之间通过换行分隔。