本文介绍如何使用DataWorks数据同步功能,将消息队列Kafka版集群上的数据迁移至阿里云大数据计算服务MaxCompute,方便您对离线数据进行分析加工。
前提条件
在开始本教程前,确保您在同一地域中已完成以下操作:
- 消息队列Kafka版
- 购买并部署消息队列Kafka版。具体操作,请参见购买并部署实例。 本文以部署在华东1(杭州)地域(Region)的集群为例。
说明 消息队列Kafka版实例支持的部署版本(0.10.x版本~2.x版本)、提供的规格类型(标准版和专业版)、支持的网络属性(VPC实例和公网/VPC实例)均支持数据同步。您可以根据业务需要选择。
- 创建Topic和Group,具体操作,请参见步骤三:创建资源。本文以Topic名称为testkafka,Group名称为console-consumer为例,Group console-consumer将用于消费Topic testkafka中的数据。
- DataWorks
- MaxCompute
背景信息
大数据计算服务MaxCompute(原ODPS)是一种大数据计算服务,能提供快速、完全托管免运维的EB级云数据仓库解决方案。
DataWorks基于MaxCompute计算和存储,提供工作流可视化开发、调度运维托管的一站式海量数据离线加工分析平台。在数加(一站式大数据平台)中,DataWorks控制台即为MaxCompute控制台。MaxCompute和DataWorks一起向用户提供完善的ETL和数仓管理能力,以及SQL、MR、Graph等多种经典的分布式计算模型,能够更快速地解决用户海量数据计算问题,有效降低企业成本,保障数据安全。
本教程旨在帮助您使用DataWorks,将消息队列Kafka版中的数据导入至MaxCompute,来进一步探索大数据的价值。
步骤一:准备消息队列Kafka版数据
向Topic testkafka中写入数据,以作为迁移至MaxCompute中的数据。由于消息队列Kafka版用于处理流式数据,您可以持续不断地向其中写入数据。为保证测试结果,建议您写入10条以上的数据。
- 登录消息队列Kafka版控制台。
- 在概览页面的资源分布区域,选择地域。
- 在实例列表页面,单击目标实例名称。
- 在左侧导航栏,单击Topic 管理。
- 在Topic 管理页面,找到目标Topic,在其操作列中,选择。
- 在快速体验消息收发面板,发送测试消息。
- 发送方式选择控制台。
- 在消息 Key文本框中输入消息的Key值,例如demo。
- 在消息内容文本框输入测试的消息内容,例如 {"key": "test"}。
- 设置发送到指定分区,选择是否指定分区。
- 单击是,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态。
- 单击否,不指定分区。
- 根据界面提示信息,通过SDK订阅消息,或者执行Docker命令订阅消息。
- 发送方式选择Docker,运行Docker容器。
- 执行运行 Docker 容器生产示例消息区域的Docker命令,发送消息。
- 执行发送后如何消费消息?区域的Docker命令,订阅消息。
- 发送方式选择 SDK,根据您的业务需求,选择需要的语言或者框架的SDK以及接入方式,通过SDK体验消息收发。
- 在左侧导航栏,单击消息查询,然后在消息查询页面,选择查询方式、所属的Topic、分区等信息,单击查询,查看之前写入的Topic的数据。
关于消息查询的更多信息,请参见
查询消息。以按时间查询为例,查询的一部分消息如下截图:

步骤二:创建DataWorks表
您需创建DataWorks表,以保证大数据计算服务MaxCompute可以顺利接收消息队列Kafka版数据。为测试便利,本文以使用非分区表为例。
- 进入数据开发页面。
- 登录DataWorks控制台。
- 在左侧导航栏,单击工作空间列表。
- 单击相应工作空间后的进入数据开发。
- 在数据开发页面,右键单击目标业务名称,选择。
- 在新建表页面,选择引擎类型并输入表名为testkafka。
- 单击DDL模式。在DDL模式对话框中,输入如下建表语句,单击生成表结构。
CREATE TABLE testkafka
(
key string,
value string,
partition1 string,
timestamp1 string,
offset string,
t123 string,
event_id string,
tag string
) ;
其中的每一列,对应于DataWorks数据集成Kafka Reader的默认列:
- __key__表示消息的key。
- __value__表示消息的完整内容 。
- __partition__表示当前消息所在分区。
- __headers__表示当前消息headers信息。
- __offset__表示当前消息的偏移量。
- __timestamp__表示当前消息的时间戳。
您还可以自主命名,详情参见Kafka Reader。
- 单击提交到开发环境。
步骤三:新增数据源
将已经写入数据的消息队列Kafka版添加至DataWorks,作为迁移数据源,并添加MaxCompute作为数据迁移的目标源。
- 新建独享数据集成资源组。
由于当前DataWorks的默认资源组无法完美支持Kafka插件,您需要使用独享数据集成资源组完成数据同步。详情请参见新增和使用独享数据集成资源组。
- 登录DataWorks控制台。
- 在左侧导航栏,单击工作空间列表。
- 在目标工作空间所在行,单击配置工作空间,单击左侧导航栏中的数据源管理,即可进入数据源管理页面。
- 单击页面右上角的新增数据源,即可新增相应的数据源。具体操作,请参见数据源配置。
- 新增数据源消息队列Kafka版
- 在新增数据源面板,选择Kafka。
- 填写数据源Kafka信息。
- 数据源类型:选择阿里云实例模式。
- 数据源名称:输入新增的数据源名称。
- 适用环境:选择开发。
- 地区:选择华东1(杭州)。
- 实例ID:在消息队列Kafka版控制台创建的实例ID。
- 特殊认证方式:保持默认。
- 资源组连通性:选择数据集成。
- 在独享集群集成资源组列表,目标资源组所在行,单击测试连通性。
- 测试成功后,单击完成。
- 新增数据源MaxCompute
- 在新增数据源面板,选择MaxCompute。
- 填写数据源MaxCompute信息。
- 数据源名称:输入新增的数据源名称。
- 适用环境:选择开发。
- ODPS Endpoint:保持默认。
- ODPS项目名称:输入ODPS项目名称为bigdata_DOC。
- AccessKey ID:MaxCompute访问用户的AccessKey ID。更多信息,请参见获取AccessKey。
- AccessKey Secret:MaxCompute访问用户的AccessKey ID的密码。更多信息,请参见获取AccessKey。
- 资源组连通性:选择数据集成。
- 在独享集群集成资源组列表,目标资源组所在行,单击测试连通性。
- 测试成功后,单击完成。
步骤四:同步数据
- 进入数据开发页面。
- 登录DataWorks控制台。
- 在左侧导航栏,单击工作空间列表。
- 单击相应工作空间后的进入数据开发。
- 在数据开发页面,右键单击业务名称,选择。
- 在新建节点对话框,输入节点名称(即数据同步任务名称),然后单击提交。
- 在创建的节点页面,选择数据源信息。
您也可以单击配置区域上方的

图标,转换为脚本模式,通过脚本配置。示例如下:
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "localhost:9093",
"fetchMaxWaitMs": "500",
"kafkaConfig": {
"group.id": "datax_consumer_group"
},
"endType": "specific",
"column": [
"__key__",
"__value__",
"__partition__",
"__headers__",
"__offset__",
"__timestamp__"
],
"timeZone": "Asia/Shanghai",
"fetchMinBytes": "1",
"endDateTime": "${endDateTime}",
"encoding": "UTF-8",
"version": "10",
"stopWhenPollEmpty": "false",
"beginType": "specific",
"autoOffsetReset": "none",
"envType": 0,
"datasource": "kafka_001",
"valueType": "string",
"topic": "topic_c",
"beginDateTime": "${beginDateTime}",
"keyType": "string",
"sessionTimeoutMs": "30000",
"waitTime": "10"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "",
"truncate": true,
"datasource": "odps_001",
"envType": 0,
"column": [
"key",
"value",
"partition1",
"timestamp1",
"offset",
"t123"
],
"emptyAsNull": false,
"table": "testkafka"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"executeMode": null,
"errorLimit": {
"record": ""
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
- 单击数据集成资源出配置,选择步骤三:新增数据源中第一步创建的独享资源组,单击
图标,运行任务。
执行结果
完成运行后,运行日志中显示运行成功。
后续步骤
您可以新建一个数据开发任务运行SQL语句,查看当前表中是否已存在从消息队列Kafka版同步过来的数据。本文以select * from testkafka
为例,具体步骤如下:
- 在左侧导航栏,单击临时查询。
- 在临时查询面板,右键单击临时查询,选择。
- 在新建节点对话框中,输入节点名称,并选择目标文件夹。
- 单击提交。
- 在创建的节点页面,输入
select * from testkafka
,单击
图标,运行完成后,查看运行日志。