本文介绍如何使用DataWorks数据同步功能,将云消息队列 Kafka 版集群上的数据迁移至阿里云大数据计算服务MaxCompute,方便您对离线数据进行分析加工。
背景信息
大数据计算服务MaxCompute(原ODPS)是一种大数据计算服务,能提供快速、完全托管免运维的EB级云数据仓库解决方案。
DataWorks基于MaxCompute计算和存储,提供工作流可视化开发、调度运维托管的一站式海量数据离线加工分析平台。在数加(一站式大数据平台)中,DataWorks控制台即为MaxCompute控制台。MaxCompute和DataWorks一起向用户提供完善的数据处理和数仓管理能力,以及SQL、MR、Graph等多种经典的分布式计算模型,能够更快速地解决用户海量数据计算问题,有效降低企业成本,保障数据安全。
本教程旨在帮助您使用DataWorks,将云消息队列 Kafka 版中的数据导入至MaxCompute,来进一步探索大数据的价值。
前提条件
云消息队列 Kafka 版实例版本需要大于等于0.10.2小于等于2.2.x。
1.准备云消息队列 Kafka 版数据
向Topic testkafka中写入数据,以作为迁移至MaxCompute中的数据。由于云消息队列 Kafka 版用于处理流式数据,您可以持续不断地向其中写入数据。为保证测试结果,建议您写入10条以上的数据。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击Topic 管理。
在Topic 管理页面,找到目标Topic,在其操作列中,选择 。
在快速体验消息收发面板,发送如下的测试消息。
在左侧导航栏,单击消息查询,然后在消息查询页面,选择查询方式、所属的Topic、分区等信息,单击查询,查看之前写入的Topic的数据。
关于消息查询的更多信息,请参见消息查询。以按时间查询为例,查询的一部分消息如下截图:
2.创建MaxCompute项目
2.1.开通MaxCompute(可选)
只有开通了MaxCompute,才可以在MaxCompute中执行创建项目等操作。具体操作,请参见开通MaxCompute。
2.2.创建MaxCompute项目
本文以在华东1(杭州)地域创建名为kafka_bigdata_doc的项目为例。具体操作,请参见通过MaxCompute控制台创建项目。
3.创建DataWorks工作空间
3.1.开通DataWorks(可选)
当前所在地域首次开通DataWorks服务时,必须购买DataWorks任意产品版本和按量付费新版资源组,才能开通并使用DataWorks。具体操作,请参见开通DataWorks服务。
3.2.创建工作空间
本文以在华东1(杭州)地域创建名为kafka_workspace的工作空间为例。具体操作,请参见创建工作空间。
4.添加数据源
4.1.创建独享数据集成资源组
创建一个名为kafka_dx的独享数据集成资源组。具体操作,请参见购买资源组。
绑定3.创建DataWorks工作空间步骤中创建的名为kafka_workspace的工作空间。具体操作,请参见绑定归属工作空间。
4.2.创建MaxCompute数据源
登录DataWorks控制台,切换至目标地域后,选择左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入管理中心。
进入工作空间管理中心页面后,选择左侧导航栏的
,进入数据源页面。单击新增数据源,选择MaxCompute,根据界面指引创建数据源。如下图所示,创建一个名为MaxCompute_data的数据源:
4.3.创建Kafka数据源
登录DataWorks控制台,切换至目标地域后,选择左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入管理中心。
进入工作空间管理中心页面后,选择左侧导航栏的
,进入数据源页面。单击新增数据源,选择Kafka,根据界面指引创建数据源。如下图所示,创建一个名为kafka_data的数据源:
说明实例ID填写已部署的云消息队列 Kafka 版的实例ID。
测试连通性时,如果出现无法连通的情况,单击自助排查解决,在连通性诊断工具面板中,按照指引完成测试即可。
5.创建DataWorks表
您需创建DataWorks表,以保证大数据计算服务MaxCompute可以顺利接收云消息队列 Kafka 版数据。为测试便利,本文以使用非分区表为例。
进入数据开发页面。
登录DataWorks控制台。
在左侧导航栏,单击工作空间。
在目标工作空间的操作列中,单击快速进入,选择数据开发。
在数据开发页面,右键单击目标业务名称,选择
。在新建表页面,选择引擎类型并输入表名为testkafka。
在DDL对话框中,输入如下建表语句,单击生成表结构。
CREATE TABLE testkafka ( key string, value string, partition string, headers string, offset string, timestamp string ) ;
单击提交到生产环境并确认。
6.创建并启动离线同步任务
进入数据开发页面。
登录DataWorks控制台。
在左侧导航栏,单击工作空间。
在目标工作空间的操作列中,单击快速进入,选择数据开发。
在数据开发页面,右键单击业务名称,选择
。在新建节点对话框,输入节点名称(即数据同步任务名称),然后单击确认。
在创建的节点页面,填写网络与资源配置信息。
单击下一步,填写配置任务信息,单击图标,运行任务。
7.结果验证
7.1验证离线同步任务运行结果
完成运行后,运行日志中显示运行成功。
7.2验证数据同步结果
进入数据开发页面。
登录DataWorks控制台。
在左侧导航栏,单击工作空间。
在目标工作空间的操作列中,单击快速进入,选择数据开发。
在临时查询面板,右键单击临时查询,选择
。在新建节点对话框中,输入名称。
单击确认。
在创建的节点页面,输入
select * from testkafka
,单击图标,运行完成后,查看运行日志。