本文介绍如何使用DataWorks数据集成,将消息队列for Apache Kafka集群上的数据迁移至MaxCompute,方便您对离线数据进行分析加工。
步骤一:准备Kafka数据
- 登录消息队列 for Apache Kafka控制台创建Topic和Consumer Group,分别命名为testkafka和console-consumer。详情请参见步骤三:创建资源。
本示例中, console-consumer将用于消费testkafka中的数据。
- 向testkafka中写入数据。由于消息队列 for Apache Kafka用于处理流式数据,您可以持续不断地向其中写入数据。为保证测试结果,建议写入10 条以上的数据。您可以直接在控制台使用发送消息功能来写入数据,也可以使用消息队列
for Apache Kafka的SDK收发消息。详情参见使用SDK收发消息。
- 为验证写入数据是否生效,您可以在控制台查询消息,查看之前写入Topic中的数据。详情请参见查询消息。
步骤二:在DataWorks上创建目标表
您需要在DataWorks上创建目标表,以保证MaxCompute可以接收消息队列for Apache Kafka的数据。
- 进入数据开发页面。
- 登录DataWorks控制台。
- 在左侧导航栏,单击工作空间列表。
- 单击相应工作空间后的数据开发。
- 右键单击业务流程,选择。
- 在新建表页面,选择引擎类型并输入表名。
- 在表的编辑页面,单击DDL模式。
- 在DDL模式对话框,输入如下建表语句,单击生成表结构。
CREATE TABLE testkafka
(
key string,
value string,
partition1 string,
timestamp1 string,
offset string,
t123 string,
event_id string,
tag string
) ;
建表语句中的每一列对应DataWorks数据集成Kafka Reader的默认列:
- key:表示消息的 Key。
- value:表示消息的完整内容 。
- partition:表示当前消息所在分区。
- headers:表示当前消息 headers 信息。
- offset:表示当前消息的偏移量。
- timestamp:表示当前消息的时间戳。
您还可以自主命名,详情参见Kafka Reader。
- 单击提交到生产环境并确认。
步骤三:同步数据
- 新增自定义数据集成资源组。此处创建的ECS实例将用以完成数据集成任务。
- 新建数据集成节点。
- 进入数据开发页面,右键单击指定业务流程,选择。
- 在新建节点对话框中,输入节点名称,并单击提交。
- 在顶部菜单栏上,单击
图标。
- 在脚本模式下,单击顶部菜单栏上的
图标。
- 在导入模板对话框中选择来源类型、数据源、目标类型及数据源。
- 配置脚本,示例代码如下。
{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "47.xxx.xxx.xxx:9092",
"kafkaConfig": {
"group.id": "console-consumer"
},
"valueType": "ByteArray",
"column": [
"__key__",
"__value__",
"__partition__",
"__timestamp__",
"__offset__",
"'123'",
"event_id",
"tag.desc"
],
"topic": "testkafka",
"keyType": "ByteArray",
"waitTime": "10",
"beginOffset": "0",
"endOffset": "3"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "",
"truncate": true,
"compress": false,
"datasource": "odps_first",
"column": [
"key",
"value",
"partition1",
"timestamp1",
"offset",
"t123",
"event_id",
"tag"
],
"emptyAsNull": false,
"table": "testkafka"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"throttle": false,
"concurrent": 1
}
}
}
- 配置调度资源组。
- 在右侧导航栏,单击调度配置。
- 在资源属性区域,选择调度资源组为步骤 1 中创建的自定义资源组。
- 单击
图标运行代码。
- 您可以在运行日志查看运行结果。
后续步骤
您可以新建一个ODPS SQL节点运行SQL语句,查看从Kafka同步数据至MaxCompute是否成功。详情请参见使用临时查询运行SQL语句(可选)。