本文说明如何创建MaxCompute Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至MaxCompute的表。

前提条件

在创建MaxCompute Sink Connector前,请确保您已完成以下操作:
  1. 消息队列Kafka版实例开启Connector。更多信息,请参见开启Connector
  2. 消息队列Kafka版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic

    本文以名称为maxcompute-test-input的Topic为例。

  3. 通过MaxCompute客户端创建表。更多信息,请参见创建和查看表

    本文以名称为connector_test的项目下名称为test_kafka的表为例。该表的建表语句如下:

    CREATE TABLE IF NOT EXISTS test_kafka(topic STRING,partition BIGINT,offset BIGINT,key STRING,value STRING) PARTITIONED by (pt STRING);

操作流程

使用MaxCompute Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至MaxCompute的表操作流程如下:

  1. 授予消息队列Kafka版访问MaxCompute的权限
  2. 可选:创建MaxCompute Sink Connector依赖的Topic和Consumer Group

    如果您不需要自定义Topic和Consumer Group的名称,您可以直接跳过该步骤,在下一步骤选择自动创建。

    注意 部分MaxCompute Sink Connector依赖的Topic的存储引擎必须为Local存储,大版本为0.10.2的消息队列Kafka版实例不支持手动创建Local存储的Topic,只支持自动创建。
    1. 可选:创建MaxCompute Sink Connector依赖的Topic
    2. 可选:创建MaxCompute Sink Connector依赖的Consumer Group
  3. 创建并部署MaxCompute Sink Connector
    1. 创建MaxCompute Sink Connector
    2. 部署MaxCompute Sink Connector
  4. 结果验证
    1. 发送消息
    2. 查看表数据

创建RAM角色

由于RAM角色不支持直接选择消息队列Kafka版作为受信服务,您在创建RAM角色时,需要选择任意支持的服务作为受信服务。RAM角色创建后,手工修改信任策略。

  1. 登录访问控制控制台
  2. 在左侧导航栏,单击RAM角色管理
  3. RAM角色管理页面,单击创建RAM角色
  4. 创建RAM角色面板,执行以下操作。
    1. 当前可信实体类型区域,选择阿里云服务,然后单击下一步
    2. 角色类型区域,选择普通服务角色,在角色名称文本框,输入AliyunKafkaMaxComputeUser1,从选择受信服务列表,选择函数计算,然后单击完成
  5. RAM角色管理页面,找到AliyunKafkaMaxComputeUser1,单击AliyunKafkaMaxComputeUser1
  6. AliyunKafkaMaxComputeUser1页面,单击信任策略管理页签,单击修改信任策略
  7. 修改信任策略面板,将脚本中fc替换为alikafka,单击确定
    pg_ram

添加权限

为使Connector将消息同步到MaxCompute表,您需要为创建的RAM角色至少授予以下权限:

客体 操作 描述
Project CreateInstance 在项目中创建实例。
Table Describe 读取表的元信息。
Table Alter 修改表的元信息或添加删除分区。
Table Update 覆盖或添加表的数据。

关于以上权限的详细说明以及授权操作,请参见授权

为本文创建的AliyunKafkaMaxComputeUser1添加权限的示例步骤如下:

  1. 登录MaxCompute客户端。
  2. 执行以下命令添加RAM角色为用户。
    add user `RAM$:$<accountid>:role/aliyunkafkamaxcomputeuser1`;
    说明 将<accountid>替换为您自己的阿里云账号ID。
  3. 为RAM用户授予访问MaxCompute所需的最小权限。
    1. 执行以下命令为RAM用户授予项目相关权限。
      grant CreateInstance on project connector_test to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      说明 将<accountid>替换为您自己的阿里云账号ID。
    2. 执行以下命令为RAM用户授予表相关权限。
      grant Describe, Alter, Update on table test_kafka to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      说明 将<accountid>替换为您自己的阿里云账号ID。

创建MaxCompute Sink Connector依赖的Topic

您可以在消息队列Kafka版控制台手动创建MaxCompute Sink Connector依赖的5个Topic。

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击Topic管理
  4. Topic管理页面,选择实例,单击创建Topic
  5. 创建Topic对话框,设置Topic属性,然后单击创建
    Topic 描述
    任务位点Topic 用于存储消费位点的Topic。
    • Topic名称:建议以connect-offset开头。
    • 分区数:Topic的分区数量必须大于1。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    任务配置Topic 用于存储任务配置的Topic。
    • Topic名称:建议以connect-config开头。
    • 分区数:Topic的分区数量必须为1。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    任务状态Topic 用于存储任务状态的Topic。
    • Topic名称:建议以connect-status开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    死信队列Topic 用于存储Connect框架的异常数据的Topic。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。
    • Topic名称:建议以connect-error开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
    异常数据Topic 用于存储Sink的异常数据的Topic。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。
    • Topic名称:建议以connect-error开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。

创建MaxCompute Sink Connector依赖的Consumer Group

您可以在消息队列Kafka版控制台手动创建MaxCompute Sink Connector依赖的2个Consumer Group。

  1. 在左侧导航栏,单击Consumer Group管理
  2. Consumer Group管理页面,选择实例,单击创建Consumer Group
  3. 创建Consumer Group对话框,设置Topic属性,然后单击创建
    Consumer Group 描述
    Connector任务消费组 Connector的数据同步任务使用的Consumer Group。该Consumer Group的名称必须为connect-任务名称
    Connector消费组 Connector使用的Consumer Group。该Consumer Group的名称建议以connect-cluster开头。

创建MaxCompute Sink Connector

创建用于将数据从消息队列Kafka版同步至MaxCompute的MaxCompute Sink Connector。

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. Connector页面,选择实例,单击创建Connector
  4. 创建Connector面板,完成以下操作。
    1. 基础信息下方的Connector名称文本框,输入Connector名称,从转储路径列表,选择消息队列Kafka版,从转储到列表,选择MaxCompute,然后单击下一步
      参数 描述 示例值
      Connector名称 Connector的名称。取值:
      • 可以包含数字、小写英文字母和短划线(-),但不能以短划线(-)开头,长度限制为48个字符。
      • 同一个消息队列Kafka版实例内保持唯一。

      Connector的数据同步任务必须使用名称为connect-任务名称的Consumer Group。如果您未手动创建该Consumer Group,系统将为您自动创建。

      kafka-maxcompute-sink
      任务类型 Connector的数据同步任务类型。本文以数据从消息队列Kafka版同步至MaxCompute为例。更多任务类型,请参见Connector类型 KAFKA2ODPS
    2. 源实例配置下方的数据源Topic文本框,输入数据源Topic名称,从消费初始位置列表,选择消费初始位置,在创建资源,选择自动创建或者选择手动创建并输入手动创建的Topic的名称,然后单击下一步
      参数 描述 示例值
      VPC ID 数据同步任务所在的VPC。默认为消息队列Kafka版实例所在的VPC,您无需填写。 vpc-bp1xpdnd3l***
      VSwitch ID 数据同步任务所在的交换机。用户交换机必须与消息队列Kafka版实例处于同一VPC。默认为部署消息队列Kafka版实例时填写的交换机。 vsw-bp1d2jgg81***
      数据源Topic 需要同步数据的Topic。 maxcompute-test-input
      消费初始位置 开始消费的位置。取值:
      • latest:从最新位点开始消费。
      • earliest:从最初位点开始消费。
      latest
      Connector消费组 Connector使用的Consumer Group。该Consumer Group的名称建议以connect-cluster开头。 connect-cluster-kafka-maxcompute-sink
      任务位点Topic 用于存储消费位点的Topic。
      • Topic名称:建议以connect-offset开头。
      • 分区数:Topic的分区数量必须大于1。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-offset-kafka-maxcompute-sink
      任务配置Topic 用于存储任务配置的Topic。
      • Topic名称:建议以connect-config开头。
      • 分区数:Topic的分区数量必须为1。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-config-kafka-maxcompute-sink
      任务状态Topic 用于存储任务状态的Topic。
      • Topic名称:建议以connect-status开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-status-kafka-maxcompute-sink
      死信队列Topic 用于存储Connect框架的异常数据的Topic。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。
      • Topic名称:建议以connect-error开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
      connect-error-kafka-maxcompute-sink
      异常数据Topic 用于存储Sink的异常数据的Topic。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。
      • Topic名称:建议以connect-error开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
      connect-error-kafka-maxcompute-sink
    3. 目标实例配置下方,输入MaxCompute的属性,然后单击下一步
      参数 描述 示例值
      MaxCompute连接地址 MaxCompute的服务接入点。更多信息,请参见配置Endpoint
      • VPC网络Endpoint:低延迟,推荐。适用于消息队列Kafka版实例和MaxCompute处于同一地域场景。
      • 外网Endpoint:高延迟,不推荐。适用于消息队列Kafka版实例和MaxCompute处于不同地域的场景。如需使用公网Endpoint,您需要为Connector开启公网访问。更多信息,请参见为Connector开启公网访问
      http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api
      MaxCompute工作空间 MaxCompute的工作空间。 connector_test
      MaxCompute表 MaxCompute的表。 test_kafka
      MaxCompute表地域 MaxCompute表所在地域。 华东1(杭州)
      服务账号 MaxCompute的阿里云账号ID。 188***
      授权角色名 消息队列Kafka版的RAM角色的名称。更多信息,请参见创建RAM角色 AliyunKafkaMaxComputeUser1
      模式 消息同步到Connector的模式。默认为DEFAULT。取值:
      • KEY:只保留消息的Key,并将Key写入MaxCompute表的key列。
      • VALUE:只保留消息的Value,并将Value写入MaxCompute表的value列。
      • DEFAULT:同时保留消息的Key和Value,并将Key和Value分别写入MaxCompute表的key列和value列。
        注意 DEFAULT模式下,不支持选择CSV格式,只支持TEXT格式和BINARY格式。
      DEFAULT
      格式 消息同步到Connector的格式。默认为TEXT。取值:
      • TEXT:消息的格式为字符串。
      • BINARY:消息的格式为字节数组。
      • CSV:消息的格式为逗号(,)分隔的字符串。
        注意 CSV格式下,不支持DEFAULT模式,只支持KEY模式和VALUE模式:
        • KEY模式:只保留消息的Key,根据逗号(,)分隔Key字符串,并将分隔后的字符串按照索引顺序写入表。
        • VALUE模式:只保留消息的Value,根据逗号(,)分隔Value字符串,并将分隔后的字符串按照索引顺序写入表。
      TEXT
      分区 分区的粒度。默认为HOUR。取值:
      • DAY:每天将数据写入一个新分区。
      • HOUR:每小时将数据写入一个新分区。
      • MINUTE:每分钟将数据写入一个新分区。
      HOUR
      时区 向Connector的数据源Topic发送消息的消息队列Kafka版生产者客户端所在时区。默认为GMT 08:00。 GMT 08:00
  5. 预览/创建下方,确认Connector的配置,然后单击提交
    提交完成后,刷新Connector页面以显示创建的Connector。

部署MaxCompute Sink Connector

创建MaxCompute Sink Connector后,您需要部署该MaxCompute Sink Connector以使其将数据从消息队列Kafka版同步至MaxCompute。

Connector页面,找到创建的MaxCompute Sink Connector,在其右侧操作列,单击部署
部署完成后,Connector状态显示运行中。

发送消息

部署MaxCompute Sink Connector后,您可以向消息队列Kafka版的数据源Topic发送消息,测试数据能否被同步至MaxCompute。

  1. 在左侧导航栏,单击Topic管理
  2. Topic管理页面,选择实例,找到maxcompute-test-input,在其右侧操作列,单击发送消息
  3. 发送消息对话框,发送测试消息。
    1. 分区文本框,输入0
    2. Message Key文本框,输入1
    3. Message Value文本框,输入1
    4. 单击发送

查看表数据

消息队列Kafka版的数据源Topic发送消息后,在MaxCompute客户端查看表数据,验证是否收到消息。

查看本文写入的test_kafka的示例步骤如下:

  1. 登录MaxCompute客户端。
  2. 执行以下命令查看表的数据分区。
    show partitions test_kafka;
    返回结果示例如下:
    pt=11-17-2020 15
    
    OK
  3. 执行以下命令查看分区的数据。
    select * from test_kafka where pt ="11-17-2020 14";
    返回结果示例如下:
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | maxcompute-test-input | 0          | 0          | 1   | 1     | 11-17-2020 14 |
    +-------+------------+------------+-----+-------+----+