本文说明如何创建MaxCompute Sink Connector将数据从消息队列Kafka版导出至大数据计算服务MaxCompute。

前提条件

在创建MaxCompute Sink Connector前,请确保您已完成以下操作:
  1. 根据网络类型购买并部署消息队列Kafka版实例。详情请参见VPC接入公网+VPC接入
  2. 消息队列Kafka版实例开启Connector。详情请参见开启Connector
  3. 通过MaxCompute客户端创建名称为test_kafka的表。建表语句示例如下:
    CREATE TABLE IF NOT EXISTS test_kafka(topic STRING,`partition` BIGINT,`offset` BIGINT,key STRING,value STRING,dt DATETIME) STORED AS ALIORC TBLPROPERTIES ('comment'='');
    详情请参见创建和查看表

操作流程

使用MaxCompute Sink Connector将数据从消息队列Kafka版的Topic导出至MaxCompute的操作流程如下:

  1. 创建MaxCompute Sink Connector所需的消息队列Kafka版资源
    1. 创建Consumer Group
    2. 创建Topic
  2. 创建MaxCompute Sink Connector

    创建MaxCompute Sink Connector

  3. 结果验证
    1. 发送消息
    2. 查看表数据

创建Consumer Group

创建MaxCompute Sink Connector数据同步任务所需的Consumer Group。

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击Consumer Group管理
  4. Consumer Group管理页面,选择实例,然后单击创建Consumer Group
  5. 创建Consumer Group对话框,创建任务消费组。
    1. Consumer Group文本框,输入connect-kafka-maxcompute-sink
    2. 备注文本框,输入connect-kafka-maxcompute-sink
    3. 单击创建
    CG
  6. 重复步骤4和步骤5,创建Connector消费组。
    表 1. Consumer Group参数说明
    Consumer Group类型 参数 描述 示例值
    Connector消费组 Consumer Group Consumer Group的名称。建议以connect-cluster开头。 connect-cluster-kafka-maxcompute-sink
    Consumer Group管理页面显示创建的Consumer Group。cg result

创建Topic

创建MaxCompute Sink Connector数据同步任务所需的Topic。

  1. 在左侧导航栏,单击Topic管理
  2. Topic管理页面,单击创建Topic
  3. 创建Topic对话框,创建数据源Topic。
    1. Topic文本框,输入maxcompute-test-input
    2. 备注文本框,输入maxcompute-test-input
    3. 单击创建
    maxcompute-test-input
  4. 重复步骤2和步骤3,创建以下Topic:
    Topic类型 参数 描述 示例值
    任务位点Topic Topic Topic的名称。建议以connect-offset开头。 connect-offset-maxcompute-sink
    分区数 Topic的分区数量。分区数量必须大于1。 12
    存储引擎 Topic的存储引擎。存储引擎必须为Local存储。 Local存储
    cleanup.policy Topic的日志清理策略。日志清理策略必须为compact。 compact
    任务配置Topic Topic Topic的名称。建议以connect-config开头。 connect-config-maxcompute-sink
    分区数 Topic的分区数量。分区数量必须为1。 1
    存储引擎 Topic的存储引擎。存储引擎必须为Local存储。 Local存储
    cleanup.policy Topic的日志清理策略。日志清理策略必须为compact。 compact
    任务状态Topic Topic Topic的名称。建议以connect-status开头。 connect-status-maxcompute-sink
    分区数 Topic的分区数量。分区数量建议为6。 6
    存储引擎 Topic的存储引擎。存储引擎必须为Local存储。 Local存储
    cleanup.policy Topic的日志清理策略。日志清理策略必须为compact。 compact
    死信队列Topic Topic Topic的名称。 maxcompute_dead_letter_error
    分区数 Topic的分区数量。分区数量建议为6。 6
    存储引擎 Topic的存储引擎。可以为Local存储或云存储。 云存储
    异常数据Topic Topic Topic的名称。 maxcompute_runtime_error
    分区数 Topic的分区数量。分区数量建议为6。 6
    存储引擎 Topic的存储引擎。可以为Local存储或云存储。 云存储
    说明 用于存储异常数据的死信队列Topic和异常数据Topic可以使用同一个Topic。
    Topic管理页面显示创建的Topic。topic result

创建MaxCompute Sink Connector

创建MaxCompute Sink Connector数据同步任务所需的Consumer Group和Topic后,创建MaxCompute Sink Connector将数据从消息队列Kafka版的Topic导出至MaxCompute。

  1. Connector页面,单击创建Connector
  2. 创建Connector页面,填写Connector信息,然后单击预检查并创建
    信息类型 参数 描述 示例值
    任务信息 任务名称 数据同步任务的名称。实例内保持唯一。Connector的同步任务会使用名称为connector-任务名称的Consumer Group,因此您需要在创建Connector前,创建名称为connector-任务名称的Consumer Group的格式。 kafka-maxcompute-sink
    通用信息 任务类型 数据同步任务的类型。消息队列Kafka版支持的任务类型,请参见Connector类型 KAFKA2ODPS
    用户VPC 数据同步任务所在的VPC。默认为消息队列Kafka版实例所在的VPC,您无需填写。 vpc-bp1xpdnd3l***
    用户交换机 数据同步任务所在的交换机。用户交换机必须与消息队列Kafka版处于同一VPC。默认为部署消息队列Kafka版时填写的VPC的可用区H的交换机 vsw-bp1d2jgg81***
    源实例信息 数据源Topic 需要同步数据的Topic。 maxcompute-test-input
    消费初始位置 开始消费的位置。取值:
    • latest:从最新位点开始消费。
    • earliest:从最初位点开始消费。
    latest
    Connector消费组 用于同步数据的Consumer Group。 connect-cluster-kafka-maxcompute-sink
    任务位点Topic 用于存储消费位点的Topic。 connect-offset-maxcompute-sink
    任务配置Topic 用于存储任务配置的Topic。 connect-config-maxcompute-sink
    任务状态Topic 用于存储任务状态的Topic。 connect-status-maxcompute-sink
    死信队列Topic 用于存储Connect框架的异常数据的Topic。 maxcompute_dead_letter_error
    异常数据Topic 用于存储Sink的异常数据的Topic。 maxcompute_runtime_error
    目标实例信息 MaxCompute连接地址 MaxCompute的服务接入点。详情请参见配置Endpoint
    • VPC网络Endpoint:低延迟,推荐。适用于消息队列Kafka版实例和MaxCompute处于同一地域场景。
    • 外网Endpoint:高延迟,不推荐。适用于消息队列Kafka版实例和MaxCompute处于不同地域的场景。如需使用公网Endpoint,您需要为Connector开启公网访问。详情请参见为Connector开启公网访问
    http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api
    MaxCompute工作空间 MaxCompute的工作空间。 connector_test
    MaxCompute表 MaxCompute的表。 test_kafka
    阿里云账号ID MaxCompute所属阿里云账号的AccessKey ID。 LTAI4F***
    阿里云账号Key MaxCompute所属阿里云账号的AccessKey Secret。 wvDxjjR***
    Connector页面显示创建的MaxCompute Sink Connector。

发送消息

创建MaxCompute Sink Connector将消息队列Kafka版与MaxCompute连接后,向消息队列Kafka版的数据源Topic发送测试消息。

  1. 在左侧导航栏,单击Topic管理
  2. Topic管理页面,找到maxcompute-test-input,在其右侧操作列,单击发送消息
  3. 发送消息对话框,发送测试消息。
    1. 分区文本框,输入0
    2. Message Key文本框,输入1
    3. Message Value文本框,输入1
    4. 单击发送

查看表数据

消息队列Kafka版的数据源Topic发送消息后,在MaxCompute客户端查看表数据,验证是否收到消息。

  1. 安装并配置客户端
  2. 执行以下命令查看表test_kafka的数据。
    SELECT COUNT(*) FROM test_kafka;

    返回的表数据中显示发送的测试消息。

    table_result

后续步骤