本文介绍如何使用DataWorks数据同步功能,将消息队列for Apache Kafka集群上的数据迁移至阿里云大数据计算服务 MaxCompute,方便您对离线数据进行分析加工。
前提条件
在开始本教程前,确保您已完成以下操作:
- 确保消息队列 for Apache Kafka集群运行正常。本文以部署在华东1(杭州)地域(Region)的集群为例。
- 开通 MaxCompute。
- 开通 DataWorks 。
- 创建项目。本文以在华东1(杭州)地域创建名为bigdata_DOC的项目为例。
背景信息
MaxCompute是一种大数据计算服务,能提供快速、完全托管免运维的 EB 级云数据仓库解决方案。
DataWorks是基于MaxCompute计算和存储,提供工作流可视化开发、调度运维托管的一站式海量数据离线加工分析平台。在数加(一站式大数据平台)中,DataWorks 控制台即为 MaxCompute 控制台。MaxCompute 和 DataWorks 一起向用户提供完善的 ETL 和数仓管理能力,以及 SQL、MR、Graph 等多种经典的分布式计算模型,能够更快速地解决用户海量数据计算问题,有效降低企业成本,保障数据安全。
本教程旨在帮助您使用 DataWorks,将消息队列for Apache Kafka中的数据导入至 MaxCompute,来进一步探索大数据的价值。
步骤一:准备 Kafka 数据
- 登录消息队列 for Apache Kafka 控制台创建 Topic 和 Consumer Group,分别命名为 testkafka 和 console-consumer。具体步骤参见步骤三:创建资源。本示例中,Consumer Group console-consumer 将用于消费 Topic testkafka 中的数据。
- 向 Topic testkafka 中写入数据。由于消息队列 for Apache Kafka 用于处理流式数据,您可以持续不断地向其中写入数据。为保证测试结果,建议您写入 10 条以上的数据。您可以直接在控制台使用发送消息功能来写入数据,也可以使用消息队列 for Apache Kafka 的 SDK 收发消息。详情参见使用 SDK 收发消息。
- 为验证写入数据生效,您可以在控制台查询消息,看到之前写入 Topic 中的数据。
步骤二:创建 DataWorks 表
您需创建 DataWorks 表,以保证大数据计算服务 MaxCompute 可以顺利接收消息队列 for Apache Kafka 数据。本例中为测试便利,使用非分区表。
步骤三:同步数据
执行结果

后续步骤
您可以新建一个数据开发任务运行 SQL 语句,查看当前表中是否已存在从 Kafka 同步过的数据。本文以select * from testkafka
为例,具体步骤如下:
- 在左侧导航栏,选择 。
- 右键选择 。
- 在新建节点对话框,输入节点名称,然后单击提交。
- 在创建的节点页面,输入
select * from testkafka
,然后单击运行图标。

在文档使用中是否遇到以下问题
更多建议
匿名提交