当您希望对消息队列Kafka版实例中的数据进行清洗、转换并转存,可以通过执行ETL任务实现。本文介绍如何使用 消息队列Kafka版ETL任务将源Topic中的数据处理后发送到目标Topic。

前提条件

在使用前,请确保您已完成以下操作:
  • 消息队列Kafka版实例创建数据源Topic和目标Topic。具体操作,请参见步骤一:创建Topic
    说明 如果需要手动创建ETL任务所依赖的Topic,也需创建存储ETL任务相关信息的Topic。关于创建Topic时参数配置说明,请参见配置数据源和目标
  • 开通函数计算服务。更多信息,请参见 开通函数计算服务
  • 如果登录账号是RAM账号,需给RAM账号授权。具体操作,请参见 RAM账号授权

    对应权限策略脚本示例如下:

    {
        "Version": "1",
        "Statement": [
            {
                "Action": [
                    "kafka:CreateETLTask",
                    "kafka:ListETLTask",
                    "kafka:DeleteETLTask"
                ],
                "Resource": "*",
                "Effect": "Allow"
            }
        ]
    }

背景信息

  • ETL(Extract-Transform-Load)是将数据从来源端经过抽取、转换、加载至目的端的过程。您可以编写一个函数, 消息队列Kafka版将会使用这个函数将源Topic中的数据进行处理后,发送到目标Topic中。
  • 函数处理过程中函数计算会自动创建对应的服务和函数,服务的名称自动在ETL任务名称加前缀 _FC-kafka
  • 仅支持在同地域内,将数据从 消息队列Kafka版实例的数据源Topic经过函数计算转化后发送至目标Topic。
  • 函数计算的函数调用支持日志查询,以便您迅速排查问题。具体操作步骤,请参见 配置日志
  • 消息队列Kafka版的ETL任务处于公测阶段,且独立于 消息队列Kafka版实例,因此不会在 消息队列Kafka版侧产生费用。同时,阿里云不承诺ETL,使用ETL所依赖的其他产品和费用说明请以对应产品为准。

开启ETL

首次使用消息队列Kafka版ETL任务功能前,需先进行服务授权。授权后,系统会自动创建服务关联角色AliyunServiceRoleForAlikafkaETL。通过该角色, 消息队列Kafka版可以获取与ETL相关的产品的访问权限。更多信息,请参见服务关联角色

  1. 登录消息队列Kafka版控制台,在概览页面的资源分布区域,选择地域。
  2. 在左侧导航栏,单击ETL 任务列表
  3. ETL 任务列表页面,单击 创建任务
  4. 在弹出的服务授权对话框中,单击确认

创建ETL任务

创建并部署ETL用于将数据从源Topic经过处理后发送至目标Topic。

  1. ETL 任务列表页面,单击 创建任务
  2. 配置基本信息页面,输入ETL任务名称,单击 下一步
  3. 配置数据源和目标页面,配置数据源、目标Topic和消费信息。配置完成后单击 下一步
    参数 说明 示例值
    实例 数据源Topic和目标Topic所属的实例。
    • alikafka_pre-cn-7pp2bz47****
    • alikafka_post-cn-2r42bvbm****
    Topic 数据源Topic和目标Topic。
    说明 数据源Topic和目标Topic不能相同。
    • topic****
    • test****
    消费初始位置 数据消费的开始时间位点。单击 高级选项显示该参数。默认取值说明如下:
    • 最早位点:从最初位点开始消费。
    • 最近位点:从最新位点开始消费。
    最近位点
    失败处理 消息发送失败后,是否继续发送后续数据。单击 高级选项显示该参数。取值说明如下:
    • 继续订阅:消息发送失败后,继续发送后续数据。
    • 停止订阅:消息发送失败后,停止发送后续数据。
    继续订阅
    创建资源方式 选择创建ETL任务所依赖的Topic的方式。单击 高级选项显示该参数。
    • 自动创建
    • 手动创建
    自动创建
    ETL 任务消费组 ETL任务使用的 Group。如果选择 创建资源方式手动创建,显示该参数。该 Group的名称建议以etl-cluster开头。 etl-cluster-kafka
    任务位点 Topic 用于存储消费位点的Topic。如果选择 创建资源方式手动创建,显示该参数。
    • Topic名称:建议以etl-offset开头。
    • 分区数:Topic的分区数量必须大于1。
    • 存储引擎:Topic的存储引擎必须为Local存储。
      说明 当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    etl-offset-kafka
    任务配置 Topic 用于存储任务配置的Topic。如果选择 创建资源方式手动创建,显示该参数。
    • Topic名称:建议以etl-config开头。
    • 分区数:Topic的分区数量必须为1。
    • 存储引擎:Topic的存储引擎必须为Local存储。
      说明 当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    etl-config-kafka
    任务状态 Topic 用于存储任务状态的Topic。如果选择 创建资源方式手动创建,显示该参数。
    • Topic:建议以etl-status开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎必须为Local存储。
      说明 当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    etl-status-kafka
    死信队列 Topic 用于存储ETL框架的异常数据的Topic。如果选择 创建资源方式手动创建,显示该参数。该Topic可以和 异常数据Topic 为同一个Topic,以节省Topic资源。
    • Topic:建议以etl-error开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
      说明 当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。
    etl-error-kafka
    异常数据 Topic 用于存储Sink的异常数据的Topic。如果选择 创建资源方式手动创建,显示该参数。该Topic可以和 死信队列Topic 为同一个Topic,以节省Topic资源。
    • Topic:建议以etl-error开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
      说明 当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。
    etl-error-kafka
  4. 配置处理函数页面,配置函数信息,并单击 创建
    单击创建前,您可以单击 测试,测试编写的处理函数是否符合预期。
    参数 说明 示例
    函数语言 处理函数的语言。仅支持Python3。 Python3
    函数模版 系统提供的处理函数模板。选择一种模板,默认提供对应的函数代码。 添加前/后缀
    函数代码 处理消息的代码。提供数据清洗和转换的模板,并提供对应的函数代码,您可以根据需要修改和编辑代码内容。
    说明
    • 您可以根据需要import python部分模块。
    • 代码中message为字典格式,您修改key和value即可,其他不需要修改。
    • 将处理完的message返回。如果要是对消息的过滤,返回None。
    def deal_message(message):
        for keyItem in message.keys():
            if (keyItem == 'key'):
                message[keyItem] = message[keyItem] + "KeySurfix"
                continue
            if (keyItem == 'value'):
                message[keyItem] = message[keyItem] + "ValueSurfix"
                continue
        return message
    消息 Key 源Topic的消息处理Key值。单击 测试代码显示该参数。 demo
    消息内容 源Topic的消息处理Value值。 {"key": "test"}
    创建完成后,在ETL 任务列表页面,查看创建的ETL任务。创建成功后,系统自动部署。

发送测试消息

部署ETL任务后,您可以向消息队列Kafka版的数据源Topic发送消息,测试数据能否根据函数代码处理后发送至目标Topic。

  1. ETL 任务列表页面,目标ETL任务所在行操作列,单击测试
  2. 发送消息面板,填写如下信息,并单击确定,发送测试消息。
    • 消息 Key文本框中输入消息Key值,例如demo。
    • 消息内容文本框中输入测试的消息内容,例如{"key": "test"}。
    • 设置 发送到指定分区,选择是否指定分区。
      • 单击,在 分区 ID文本框中输入分区的ID,例如0。如果您需要查询分区的ID,请参见 查看分区状态
      • 单击,不指定分区。

查看函数日志

消息队列Kafka版源Topic获取数据,经过函数处理发送至目标Topic后,查看函数日志,验证是否收到消息。更多信息,请参见 配置日志

图 1. 函数日志查询
函数日志查询

查看ETL任务详情

成功创建ETL任务后,您可以在消息队列Kafka版控制台查看详细信息。

ETL 任务列表页面,目标ETL任务所在行操作列,单击详情
任务详情页面,查看任务详细信息。

删除ETL任务

如果您不再需要某个ETL任务,或者某个ETL任务不再使用,您可以在消息队列Kafka版控制台删除该ETL任务。

  1. ETL 任务列表页面,目标ETL任务所在行操作列,单击 删除
  2. 提示对话框中,单击确认