文档

将云消息队列 Kafka 版的数据迁移至MaxCompute

更新时间:

本文介绍如何使用DataWorks数据同步功能,将云消息队列 Kafka 版集群上的数据迁移至阿里云大数据计算服务MaxCompute,方便您对离线数据进行分析加工。

背景信息

大数据计算服务MaxCompute(原ODPS)是一种大数据计算服务,能提供快速、完全托管免运维的EB级云数据仓库解决方案。

DataWorks基于MaxCompute计算和存储,提供工作流可视化开发、调度运维托管的一站式海量数据离线加工分析平台。在数加(一站式大数据平台)中,DataWorks控制台即为MaxCompute控制台。MaxCompute和DataWorks一起向用户提供完善的数据处理和数仓管理能力,以及SQL、MR、Graph等多种经典的分布式计算模型,能够更快速地解决用户海量数据计算问题,有效降低企业成本,保障数据安全。

本教程旨在帮助您使用DataWorks,将云消息队列 Kafka 版中的数据导入至MaxCompute,来进一步探索大数据的价值。

前提条件

云消息队列 Kafka 版实例版本需要大于等于0.10.2小于等于2.2.x。

  • 购买并部署云消息队列 Kafka 版。具体操作,请参见购买并部署实例。 本文以部署在华东1(杭州)地域(Region)的集群为例。

  • 创建Topic和Group,具体操作,请参见创建资源。本文以Topic名称为testkafka,Group名称为console-consumer为例。

1.准备云消息队列 Kafka 版数据

向Topic testkafka中写入数据,以作为迁移至MaxCompute中的数据。由于云消息队列 Kafka 版用于处理流式数据,您可以持续不断地向其中写入数据。为保证测试结果,建议您写入10条以上的数据。

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. 在左侧导航栏,单击Topic 管理

  5. Topic 管理页面,找到目标Topic,在其操作列中,选择更多 > 体验发送消息

  6. 快速体验消息收发面板,发送如下的测试消息。

    image

  7. 在左侧导航栏,单击消息查询,然后在消息查询页面,选择查询方式、所属的Topic、分区等信息,单击查询,查看之前写入的Topic的数据。

    关于消息查询的更多信息,请参见消息查询。以按时间查询为例,查询的一部分消息如下截图:查询Kafka消息

2.创建MaxCompute项目

2.1.开通MaxCompute(可选)

只有开通了MaxCompute,才可以在MaxCompute中执行创建项目等操作。具体操作,请参见开通MaxCompute

2.2.创建MaxCompute项目

本文以在华东1(杭州)地域创建名为kafka_bigdata_doc的项目为例。具体操作,请参见通过MaxCompute控制台创建项目

3.创建DataWorks工作空间

3.1.开通DataWorks(可选)

当前所在地域首次开通DataWorks服务时,必须购买DataWorks任意产品版本和按量付费新版资源组,才能开通并使用DataWorks。具体操作,请参见开通DataWorks服务

3.2.创建工作空间

本文以在华东1(杭州)地域创建名为kafka_workspace的工作空间为例。具体操作,请参见创建工作空间

4.添加数据源

4.1.创建独享数据集成资源组

  1. 创建一个名为kafka_dx的独享数据集成资源组。具体操作,请参见购买资源组

  2. 绑定3.创建DataWorks工作空间步骤中创建的名为kafka_workspace的工作空间。具体操作,请参见绑定归属工作空间

4.2.创建MaxCompute数据源

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的管理中心,在下拉框中选择对应工作空间后单击进入管理中心

  2. 进入工作空间管理中心页面后,单击左侧导航栏的数据源 > 数据源列表,进入数据源页面。

  3. 单击新增数据源,选择MaxCompute,根据界面指引创建数据源。如下图所示,创建一个名为MaxCompute_data的数据源:

    image

4.3.创建Kafka数据源

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的管理中心,在下拉框中选择对应工作空间后单击进入管理中心

  2. 进入工作空间管理中心页面后,单击左侧导航栏的数据源 > 数据源列表,进入数据源页面。

  3. 单击新增数据源,选择Kafka,根据界面指引创建数据源。如下图所示,创建一个名为kafka_data的数据源:

    image

    说明
    • 实例ID填写已部署的云消息队列 Kafka 版的实例ID。

    • 测试连通性时,如果出现无法连通的情况,单击自助排查解决,在连通性诊断工具面板中,按照指引完成测试即可。

5.创建DataWorks表

您需创建DataWorks表,以保证大数据计算服务MaxCompute可以顺利接收云消息队列 Kafka 版数据。为测试便利,本文以使用非分区表为例。

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在左侧导航栏,单击工作空间列表

    3. 在目标工作空间的操作列中,单击快速进入,选择数据开发

  2. 数据开发页面,右键单击目标业务名称,选择新建表 > MaxCompute >

  3. 新建表页面,选择引擎类型并输入表名为testkafka。

  4. DDL对话框中,输入如下建表语句,单击生成表结构

    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition       string,
     headers         string,
     offset          string,
     timestamp       string
    ) ;

    image

  5. 单击提交到生产环境确认

6.创建并启动离线同步任务

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在左侧导航栏,单击工作空间列表

    3. 在目标工作空间的操作列中,单击快速进入,选择数据开发

  2. 数据开发页面,右键单击业务名称,选择新建节点 > 数据集成 > 离线同步

  3. 新建节点对话框,输入节点名称(即数据同步任务名称),然后单击确认

  4. 在创建的节点页面,填写网络与资源配置信息。

    image

  5. 单击下一步,填写配置任务信息,单击运行图标,运行任务。

    image

7.结果验证

7.1验证离线同步任务运行结果

完成运行后,运行日志中显示运行成功。

image

7.2验证数据同步结果

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在左侧导航栏,单击工作空间列表

    3. 在目标工作空间的操作列中,单击快速进入,选择数据开发

  2. 临时查询面板,右键单击临时查询,选择新建节点 > ODPS SQL

  3. 新建节点对话框中,输入名称

  4. 单击确认

  5. 在创建的节点页面,输入select * from testkafka,单击图标,运行完成后,查看运行日志。

    image