云消息队列 Kafka 版数据迁移至MaxCompute

本文介绍如何使用DataWorks数据同步功能,将云消息队列 Kafka 版集群上的数据迁移至阿里云大数据计算服务MaxCompute,方便您对离线数据进行分析加工。

前提条件

在开始本教程前,确保您在同一地域中已完成以下操作:

  • 云消息队列 Kafka 版

    • 购买并部署云消息队列 Kafka 版。具体操作,请参见购买并部署实例。 本文以部署在华东1(杭州)地域(Region)的集群为例。

      说明

      云消息队列 Kafka 版实例支持的部署版本(0.10.x版本~2.x版本)、提供的规格类型(标准版和专业版)、支持的网络属性(VPC实例和公网/VPC实例)均支持数据同步。您可以根据业务需要选择。

    • 创建Topic和Group,具体操作,请参见步骤三:创建资源。本文以Topic名称为testkafka,Group名称为console-consumer为例,Group console-consumer将用于消费Topic testkafka中的数据。

  • 开通MaxCompute和DataWorks,本文以在华东1(杭州)地域创建名为bigdata_DOC的项目为例。

背景信息

大数据计算服务MaxCompute(原ODPS)是一种大数据计算服务,能提供快速、完全托管免运维的EB级云数据仓库解决方案。

DataWorks基于MaxCompute计算和存储,提供工作流可视化开发、调度运维托管的一站式海量数据离线加工分析平台。在数加(一站式大数据平台)中,DataWorks控制台即为MaxCompute控制台。MaxCompute和DataWorks一起向用户提供完善的数据处理和数仓管理能力,以及SQL、MR、Graph等多种经典的分布式计算模型,能够更快速地解决用户海量数据计算问题,有效降低企业成本,保障数据安全。

本教程旨在帮助您使用DataWorks,将云消息队列 Kafka 版中的数据导入至MaxCompute,来进一步探索大数据的价值。

步骤一:准备云消息队列 Kafka 版数据

向Topic testkafka中写入数据,以作为迁移至MaxCompute中的数据。由于云消息队列 Kafka 版用于处理流式数据,您可以持续不断地向其中写入数据。为保证测试结果,建议您写入10条以上的数据。

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. 在左侧导航栏,单击Topic 管理

  5. Topic 管理页面,找到目标Topic,在其操作列中,选择更多 > 体验发送消息

  6. 快速体验消息收发面板,发送测试消息。

    • 发送方式选择控制台

      1. 消息 Key文本框中输入消息的Key值,例如demo。

      2. 消息内容文本框输入测试的消息内容,例如 {"key": "test"}。

      3. 设置发送到指定分区,选择是否指定分区。

        • 单击,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态

        • 单击,不指定分区。

      4. 根据界面提示信息,通过SDK订阅消息,或者执行Docker命令订阅消息。

    • 发送方式选择Docker,运行Docker容器。

      1. 执行运行 Docker 容器生产示例消息区域的Docker命令,发送消息。

      2. 执行发送后如何消费消息?区域的Docker命令,订阅消息。

    • 发送方式选择SDK,根据您的业务需求,选择需要的语言或者框架的SDK以及接入方式,通过SDK体验消息收发。

  7. 在左侧导航栏,单击消息查询,然后在消息查询页面,选择查询方式、所属的Topic、分区等信息,单击查询,查看之前写入的Topic的数据。

    关于消息查询的更多信息,请参见消息查询。以按时间查询为例,查询的一部分消息如下截图:查询Kafka消息

步骤二:创建DataWorks表

您需创建DataWorks表,以保证大数据计算服务MaxCompute可以顺利接收云消息队列 Kafka 版数据。为测试便利,本文以使用非分区表为例。

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在左侧导航栏,单击工作空间列表

    3. 在目标工作空间的操作列中,单击快速进入,选择数据开发

  2. 跳转到数据开发页面,在已有业务流程的情况下,可右键单击目标业务流程,选择新建表 > MaxCompute >

  3. 新建表页面,选择引擎类型并输入表名为testkafka,单击新建

  4. 在表编辑页面左上角,单击image图标。

  5. DDL对话框中,输入如下建表语句,单击生成表结构

    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition1      string,
     timestamp1      string,
     offset          string,
     t123            string,
     event_id        string,
     tag             string
    ) ;

    其中的每一列,对应于DataWorks数据集成Kafka Reader的默认列:

    • __key__表示消息的key。

    • __value__表示消息的完整内容 。

    • __partition__表示当前消息所在分区。

    • __headers__表示当前消息headers信息。

    • __offset__表示当前消息的偏移量。

    • __timestamp__表示当前消息的时间戳。

    您还可以自主命名,详情参见Kafka Reader

  6. 单击提交到开发环境

    具体信息,请参见表管理

步骤三:新增数据源

将已经写入数据的云消息队列 Kafka 版添加至DataWorks,作为迁移数据源,并添加MaxCompute作为数据迁移的目标源。

  1. 新建独享数据集成资源组。

    由于当前DataWorks的公共资源组无法完美支持Kafka插件,您需要使用独享数据集成资源组完成数据同步。详情请参见新增和使用独享数据集成资源组

  2. 登录DataWorks控制台

  3. 在左侧导航栏,单击工作空间列表

  4. 在工作空间列表,单击目标工作空间操作列中的管理

  5. 管理中心页面左侧导航栏,选择数据源 > 数据源列表 > 新增数据源,即可新增相应的数据源。具体操作,请参见数据源配置

    • 新增数据源云消息队列 Kafka 版

      1. 新增数据源面板,选择Kafka

      2. 填写数据源Kafka信息。

        • 数据源类型:选择阿里云实例模式

        • 数据源名称:输入新增的数据源名称。

        • 适用环境:选择开发

        • 地区:选择华东1(杭州)

        • 实例ID:在云消息队列 Kafka 版控制台创建的实例ID。

        • 特殊认证方式:保持默认。

        • 资源组连通性:选择数据集成

        • 在独享集群集成资源组列表,目标资源组所在行,单击测试连通性

      3. 测试成功后,单击完成

    • 新增数据源MaxCompute

      1. 新增数据源面板,选择MaxCompute

      2. 填写数据源MaxCompute信息。

        • 数据源名称:输入新增的数据源名称。

        • 适用环境:选择开发

        • ODPS Endpoint:保持默认。

        • ODPS项目名称:输入ODPS项目名称为bigdata_DOC。

        • AccessKey ID:MaxCompute访问用户的AccessKey ID。更多信息,请参见创建AccessKey

        • AccessKey Secret:MaxCompute访问用户的AccessKey ID的密码。更多信息,请参见创建AccessKey

        • 资源组连通性:选择数据集成

        • 在独享集群集成资源组列表,目标资源组所在行,单击测试连通性

      3. 测试成功后,单击完成

步骤四:同步数据

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据建模与开发 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 数据开发页面,右键单击业务名称,选择新建节点 > 数据集成 > 离线同步

  3. 新建节点对话框,输入节点名称(即数据同步任务名称),然后单击确认

  4. 在创建的节点页面,选择数据源信息。

    切换脚本模式

    您也可以单击配置区域上方的切换代码图标,转换为脚本模式,通过脚本配置。示例如下:

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "kafka",
                "parameter": {
                    "server": "localhost:9093",
                    "fetchMaxWaitMs": "500",
                    "kafkaConfig": {
                        "group.id": "datax_consumer_group"
                    },
                    "endType": "specific",
                    "column": [
                        "__key__",
                        "__value__",
                        "__partition__",
                        "__headers__",
                        "__offset__",
                        "__timestamp__"
                    ],
                    "timeZone": "Asia/Shanghai",
                    "fetchMinBytes": "1",
                    "endDateTime": "${endDateTime}",
                    "encoding": "UTF-8",
                    "version": "10",
                    "stopWhenPollEmpty": "false",
                    "beginType": "specific",
                    "autoOffsetReset": "none",
                    "envType": 0,
                    "datasource": "kafka_001",
                    "valueType": "string",
                    "topic": "topic_c",
                    "beginDateTime": "${beginDateTime}",
                    "keyType": "string",
                    "sessionTimeoutMs": "30000",
                    "waitTime": "10"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "truncate": true,
                    "datasource": "odps_001",
                    "envType": 0,
                    "column": [
                        "key",
                        "value",
                        "partition1",
                        "timestamp1",
                        "offset",
                        "t123"
                    ],
                    "emptyAsNull": false,
                    "table": "testkafka"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "executeMode": null,
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "concurrent": 2,
                "throttle": false
            }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  5. 单击数据集成资源出配置,选择步骤三:新增数据源中第一步创建的独享资源组,单击运行图标,运行任务。

    配置任务资源

执行结果

完成运行后,运行日志中显示运行成功。运行日志

后续步骤

您可以新建一个数据开发任务运行SQL语句,查看当前表中是否已存在从云消息队列 Kafka 版同步过来的数据。本文以select * from testkafka为例,具体步骤如下:

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据建模与开发 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 临时查询面板,右键单击临时查询,选择新建节点 > ODPS SQL

  3. 新建节点对话框中,输入名称

    说明

    节点名称的长度不能超过128个字符。

  4. 单击确认

  5. 在创建的节点页面,输入select * from testkafka,单击运行图标,运行完成后,查看运行日志。

运行