本文介绍如何使用DataWorks数据同步功能,将云消息队列 Kafka 版集群上的数据迁移至阿里云大数据计算服务MaxCompute,便于分析加工离线数据。
前提条件
在开始本教程前,确保您在同一地域中已完成以下操作:
云消息队列 Kafka 版
已创建MaxCompute项目,本文以在华东1(杭州)地域创建名为bigdata_DOC的项目为例。
已创建DataWorks工作空间,并绑定MaxCompute计算资源。
背景信息
大数据计算服务MaxCompute(原ODPS)是一种大数据计算服务,能提供快速、完全托管免运维的EB级云数据仓库解决方案。
DataWorks基于MaxCompute计算和存储,提供工作流可视化开发、调度运维托管的一站式海量数据离线加工分析平台。在数加(一站式大数据平台)中,DataWorks控制台即为MaxCompute控制台。MaxCompute和DataWorks一起向用户提供完善的数据处理和数仓管理能力,以及SQL、MR、Graph等多种经典的分布式计算模型,能够更快速地解决用户海量数据计算问题,有效降低企业成本,保障数据安全。
本教程旨在帮助您使用DataWorks,将云消息队列 Kafka 版中的数据导入至MaxCompute,来进一步探索大数据的价值。
步骤一:准备云消息队列 Kafka 版数据
向Topic testkafka中写入数据,以作为迁移至MaxCompute中的数据。由于云消息队列 Kafka 版用于处理流式数据,您可以持续不断地向其中写入数据。为保证测试结果,建议您写入10条以上的数据。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击Topic 管理。
在Topic 管理页面,单击目标Topic名称进入Topic 详情页面,然后单击体验发送消息。
在快速体验消息收发面板,发送测试消息。
发送方式选择控制台。
在消息 Key文本框中输入消息的Key值,例如demo。
在消息内容文本框输入测试的消息内容,例如 {"key": "test"}。
设置发送到指定分区,选择是否指定分区。
单击是,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态。
单击否,不指定分区。
根据界面提示信息,通过SDK订阅消息,或者执行Docker命令订阅消息。
发送方式选择Docker,运行Docker容器。
执行运行 Docker 容器生产示例消息区域的Docker命令,发送消息。
执行发送后如何消费消息?区域的Docker命令,订阅消息。
发送方式选择SDK,根据您的业务需求,选择需要的语言或者框架的SDK以及接入方式,通过SDK体验消息收发。
在左侧导航栏,单击消息查询,然后在消息查询页面,选择查询方式、所属的Topic、分区等信息,单击查询,查看之前写入的Topic的数据。
关于消息查询的更多信息,请参见消息查询。以按时间查询为例,查询的一部分消息如下截图:

步骤二:在DataWorks上创建表
创建DataWorks表,以保证大数据计算服务MaxCompute可以顺利接收云消息队列 Kafka 版数据。为测试便利,本文以使用非分区表为例。
进入Data Studio
登录DataWorks控制台,在左上角选择地域。
在左侧导航栏选择工作空间。
在工作空间列表页面,单击目标工作空间对应的操作列。
在Data Studio页面,新建MaxCompute SQL节点。按照如下语句新建表
testkafka。CREATE TABLE testkafka ( key STRING, value STRING, partition1 STRING, timestamp1 STRING, offset STRING, t123 STRING, event_id STRING, tag STRING ) ;每一列对应于DataWorks数据集成Kafka Reader的默认列:
key表示消息的key。
value表示消息的完整内容 。
partition表示当前消息所在分区。
headers表示当前消息headers信息。
offset表示当前消息的偏移量。
timestamp表示当前消息的时间戳。
也可以选择自主命名,详情参见Kafka Reader。
具体信息,请参见表管理。
步骤三:新增数据源
将已经写入数据的云消息队列 Kafka 版添加至DataWorks,作为迁移数据源,并添加MaxCompute作为数据迁移的目标源。
新建独享数据集成资源组。
由于当前DataWorks的公共资源组无法完美支持Kafka插件,您需要使用独享数据集成资源组完成数据同步。详情请参见使用独享数据集成资源组。
登录DataWorks控制台,在左上角选择地域。
在左侧导航栏选择工作空间。
在工作空间列表页面,单击目标工作空间名称。
在空间详情页面,单击左侧导航栏数据源。
新增数据源Kafka
在数据源页签,单击添加数据源,选择Kafka。
在新增Kafka数据源页面填写如下信息。
数据源类型:选择阿里云实例模式。
数据源名称:输入新增的数据源名称。
地区:根据实际情况选择。
实例ID:在云消息队列 Kafka 版控制台创建的实例ID。
特殊认证方式:保持默认。
在数据集成资源组列表,绑定目标资源组,然后在目标资源组所在行,单击批量测试连通性。测试成功后,单击完成。
具体操作,请参见数据源配置。
新增数据源MaxCompute
在数据源页签,单击添加数据源,选择MaxCompute。
在添加MaxCompute数据源页面,填写基本信息。
步骤四:同步数据
登录DataWorks控制台,在左上角选择地域。
在左侧导航栏选择。
选择工作空间,单击进入Data Studio。
在Data Studio左侧,单击
,选择。数据来源选择Kafka。
数据去向选择MaxCompute(ODPS)。
在节点配置页面,填写如下信息。
数据源-来源:选择新增的数据源Kafka名称。
数据源-去向:选择新增的数据源MaxCompute名称。
数据来源-主题:选择写入Kafka数据的Topic。
数据去向-表:已创建MaxCompute表。
运行资源:选择步骤三:新增数据源中第一步创建的独享资源组。
其他保持默认即可。
也可以单击配置区域上方的
图标,转换为脚本模式,通过脚本配置。示例如下:{ "type": "job", "version": "2.0", "steps": [ { "stepType": "kafka", "parameter": { "server": "localhost:9093", "fetchMaxWaitMs": "500", "kafkaConfig": { "group.id": "datax_consumer_group" }, "endType": "specific", "column": [ "key", "value", "partition", "headers", "offset", "timestamp" ], "timeZone": "Asia/Shanghai", "fetchMinBytes": "1", "endDateTime": "${endDateTime}", "encoding": "UTF-8", "version": "10", "stopWhenPollEmpty": "false", "beginType": "specific", "autoOffsetReset": "none", "envType": 0, "datasource": "kafka_001", "valueType": "string", "topic": "topic_c", "beginDateTime": "${beginDateTime}", "keyType": "string", "sessionTimeoutMs": "30000", "waitTime": "10" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "truncate": true, "datasource": "odps_001", "envType": 0, "column": [ "key", "value", "partition1", "timestamp1", "offset", "t123" ], "emptyAsNull": false, "table": "testkafka" }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }单击运行。
后续步骤
新建数据开发任务运行SQL语句,查看当前表中是否已存在从云消息队列 Kafka 版同步过来的数据。本文以select * from testkafka为例,具体步骤如下:
登录DataWorks控制台,在左上角选择地域。
在左侧导航栏选择工作空间。
在工作空间列表页面,单击目标工作空间对应的操作列。
新建MaxCompute SQL节点,输入
SELECT * FROM testkafka在调试配置页面,计算资源选择绑定的MaxCompute项目资源,计算配额选择交互式配置组。
单击运行,查看运行结果。