当您希望对消息队列Kafka版实例中的数据进行清洗、转换并转存,可以通过执行ETL任务实现。本文介绍如何使用 消息队列Kafka版ETL任务将源Topic中的数据处理后发送到目标Topic。
前提条件
- 为消息队列Kafka版实例创建数据源Topic和目标Topic。具体操作,请参见步骤一:创建Topic。
说明 如果需要手动创建ETL任务所依赖的Topic,也需创建存储ETL任务相关信息的Topic。关于创建Topic时参数配置说明,请参见配置数据源和目标。
- 开通函数计算服务。更多信息,请参见 开通函数计算服务 。
- 如果登录账号是RAM账号,需给RAM账号授权。具体操作,请参见 RAM账号授权 。
对应权限策略脚本示例如下:
{ "Version": "1", "Statement": [ { "Action": [ "kafka:CreateETLTask", "kafka:ListETLTask", "kafka:DeleteETLTask" ], "Resource": "*", "Effect": "Allow" } ] }
背景信息
- ETL(Extract-Transform-Load)是将数据从来源端经过抽取、转换、加载至目的端的过程。您可以编写一个函数, 消息队列Kafka版将会使用这个函数将源Topic中的数据进行处理后,发送到目标Topic中。
- 函数处理过程中函数计算会自动创建对应的服务和函数,服务的名称自动在ETL任务名称加前缀
_FC-kafka
。 - 仅支持在同地域内,将数据从 消息队列Kafka版实例的数据源Topic经过函数计算转化后发送至目标Topic。
- 函数计算的函数调用支持日志查询,以便您迅速排查问题。具体操作步骤,请参见 配置日志。
- 消息队列Kafka版的ETL任务处于公测阶段,且独立于 消息队列Kafka版实例,因此不会在 消息队列Kafka版侧产生费用。同时,阿里云不承诺ETL,使用ETL所依赖的其他产品和费用说明请以对应产品为准。
开启ETL
首次使用消息队列Kafka版ETL任务功能前,需先进行服务授权。授权后,系统会自动创建服务关联角色AliyunServiceRoleForAlikafkaETL。通过该角色, 消息队列Kafka版可以获取与ETL相关的产品的访问权限。更多信息,请参见服务关联角色。
- 登录消息队列Kafka版控制台,在概览页面的资源分布区域,选择地域。
- 在左侧导航栏,单击ETL 任务列表。
- 在ETL 任务列表页面,单击 创建任务。
- 在弹出的服务授权对话框中,单击确认。
创建ETL任务
创建并部署ETL用于将数据从源Topic经过处理后发送至目标Topic。
发送测试消息
部署ETL任务后,您可以向消息队列Kafka版的数据源Topic发送消息,测试数据能否根据函数代码处理后发送至目标Topic。
查看函数日志
从消息队列Kafka版源Topic获取数据,经过函数处理发送至目标Topic后,查看函数日志,验证是否收到消息。更多信息,请参见 配置日志。

查看ETL任务详情
成功创建ETL任务后,您可以在消息队列Kafka版控制台查看详细信息。
在ETL 任务列表页面,目标ETL任务所在行操作列,单击详情。
在任务详情页面,查看任务详细信息。
删除ETL任务
如果您不再需要某个ETL任务,或者某个ETL任务不再使用,您可以在消息队列Kafka版控制台删除该ETL任务。
- 在ETL 任务列表页面,目标ETL任务所在行操作列,单击 删除。
- 在提示对话框中,单击确认。