当您希望对消息队列Kafka版实例中的数据进行清洗、转换并转存,可以通过执行ETL任务实现。本文介绍如何使用 消息队列Kafka版ETL任务将源Topic中的数据处理后发送到目标Topic。

前提条件

在使用前,请确保您已完成以下操作:
  • 消息队列Kafka版实例创建数据源Topic和目标Topic。具体操作,请参见步骤一:创建Topic
    说明 如果需要手动创建ETL任务所依赖的Topic,也需创建存储ETL任务相关信息的Topic。关于创建Topic时参数配置说明,请参见配置数据源和目标
  • 开通函数计算服务。更多信息,请参见开通函数计算服务
  • 如果登录账号是RAM账号,需给RAM账号授权。具体操作,请参见RAM账号授权

    对应权限策略脚本示例如下:

    {
        "Version": "1",
        "Statement": [
            {
                "Action": [
                    "kafka:CreateETLTask",
                    "kafka:ListETLTask",
                    "kafka:DeleteETLTask"
                ],
                "Resource": "*",
                "Effect": "Allow"
            }
        ]
    }

背景信息

  • ETL(Extract-Transform-Load)是将数据从来源端经过抽取、转换、加载至目的端的过程。您可以编写一个函数, 消息队列Kafka版将会使用这个函数将源Topic中的数据进行处理后,发送到目标Topic中。
  • 函数处理过程中函数计算会自动创建对应的服务和函数,服务的名称自动在ETL任务名称加前缀 _FC-kafka
  • 仅支持在同地域内,将数据从 消息队列Kafka版实例的数据源Topic经过函数计算转化后发送至目标Topic。
  • 函数计算的函数调用支持日志查询,以便您迅速排查问题。具体操作步骤,请参见 配置日志
  • 消息队列Kafka版的ETL任务处于公测阶段,且独立于 消息队列Kafka版实例,因此不会在 消息队列Kafka版侧产生费用。同时,阿里云不承诺ETL,使用ETL所依赖的其他产品和费用说明请以对应产品为准。

开启ETL

首次使用消息队列Kafka版ETL任务功能前,需先进行服务授权。授权后,系统会自动创建服务关联角色AliyunServiceRoleForAlikafkaETL。通过该角色, 消息队列Kafka版可以获取与ETL相关的产品的访问权限。更多信息,请参见服务关联角色

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击ETL 任务列表
  4. ETL 任务列表页面,单击 创建任务
  5. 在弹出的服务授权对话框中,单击确认

创建ETL任务

创建并部署ETL用于将数据从源Topic经过处理后发送至目标Topic。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击ETL 任务列表
  4. ETL 任务列表页面,单击 创建任务
  5. 配置基本信息页面,输入ETL任务名称,单击 下一步
  6. 配置数据源和目标页面,配置数据源、目标Topic和消费信息。配置完成后单击 下一步
    参数 说明 示例值
    实例 数据源Topic和目标Topic所属的实例。
    • alikafka_pre-cn-7pp2bz47****
    • alikafka_post-cn-2r42bvbm****
    Topic 数据源Topic和目标Topic。
    说明 数据源Topic和目标Topic不能相同。
    • topic****
    • test****
    消费初始位置 数据消费的开始时间位点。单击 高级选项显示该参数。默认取值说明如下:
    • 最早位点:从最初位点开始消费。
    • 最近位点:从最新位点开始消费。
    最近位点
    失败处理 消息发送失败后,是否继续发送后续数据。单击 高级选项显示该参数。取值说明如下:
    • 继续订阅:消息发送失败后,继续发送后续数据。
    • 停止订阅:消息发送失败后,停止发送后续数据。
    继续订阅
    创建资源方式 选择创建ETL任务所依赖的Topic的方式。单击 高级选项显示该参数。
    • 自动创建
    • 手动创建
    自动创建
    ETL 任务消费组 ETL任务使用的 Group。如果选择 创建资源方式手动创建,显示该参数。该 Group的名称建议以etl-cluster开头。 etl-cluster-kafka
    任务位点 Topic 用于存储消费位点的Topic。如果选择 创建资源方式手动创建,显示该参数。
    • Topic名称:建议以etl-offset开头。
    • 分区数:Topic的分区数量必须大于1。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    etl-offset-kafka
    任务配置 Topic 用于存储任务配置的Topic。如果选择 创建资源方式手动创建,显示该参数。
    • Topic名称:建议以etl-config开头。
    • 分区数:Topic的分区数量必须为1。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    etl-config-kafka
    任务状态 Topic 用于存储任务状态的Topic。如果选择 创建资源方式手动创建,显示该参数。
    • Topic:建议以etl-status开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    etl-status-kafka
    死信队列 Topic 用于存储ETL框架的异常数据的Topic。如果选择 创建资源方式手动创建,显示该参数。该Topic可以和 异常数据Topic 为同一个Topic,以节省Topic资源。
    • Topic:建议以etl-error开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
    etl-error-kafka
    异常数据 Topic 用于存储Sink的异常数据的Topic。如果选择 创建资源方式手动创建,显示该参数。该Topic可以和 死信队列Topic 为同一个Topic,以节省Topic资源。
    • Topic:建议以etl-error开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
    etl-error-kafka
  7. 配置处理函数页面,配置函数信息,并单击 创建
    单击创建前,您可以单击 测试,测试编写的处理函数是否符合预期。
    参数 说明 示例
    函数语言 处理函数的语言。仅支持Python3。 Python3
    函数模版 系统提供的处理函数模版。选择一种模版,默认提供对应的函数代码。 添加前/后缀
    函数代码 处理消息的代码。提供数据清洗和转换的模版,并提供对应的函数代码,您可以根据需要修改和编辑代码内容。
    说明
    • 您可以根据需要import python部分模块。
    • 代码中message为字典格式,您修改key和value即可,其他不需要修改。
    • 将处理完的message返回。如果要是对消息的过滤,返回None。
    def deal_message(message):
        for keyItem in message.keys():
            if (keyItem == 'key'):
                message[keyItem] = message[keyItem] + "KeySurfix"
                continue
            if (keyItem == 'value'):
                message[keyItem] = message[keyItem] + "ValueSurfix"
                continue
        return message
    消息 Key 源Topic的消息处理Key值。单击 测试代码显示该参数。 demo
    消息内容 源Topic的消息处理Value值。 {"key": "test"}
    创建完成后,在ETL 任务列表页面,查看创建的ETL任务。创建成功后,系统自动部署。
    说明 如果函数计算不支持当前目标实例所在的可用区,创建的ETL任务无法运行,提示目标可用区不支持。如需运行该ETL任务,请提交工单联系消息队列Kafka版技术人员。

发送测试消息

部署ETL任务后,您可以向消息队列Kafka版的数据源Topic发送消息,测试数据能否根据函数代码处理后发送至目标Topic。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击ETL 任务列表
  4. ETL 任务列表页面,目标ETL任务所在行操作列,单击测试
  5. 发送消息面板,填写如下信息,并单击确定,发送测试消息。
    • 消息 Key文本框中输入消息Key值,例如demo。
    • 消息内容文本框中输入测试的消息内容,例如{"key": "test"}。
    • 设置 发送到指定分区,选择是否指定分区。
      • 单击,在 分区 ID文本框中输入分区的ID,例如0。如果您需要查询分区的ID,请参见 查看分区状态
      • 单击,不指定分区。

查看函数日志

消息队列Kafka版源Topic获取数据,经过函数处理发送至目标Topic后,查看函数日志,验证是否收到消息。更多信息,请参见 配置日志

图 1. 函数日志查询
函数日志查询

查看ETL任务详情

成功创建ETL任务后,您可以在消息队列Kafka版控制台查看详细信息。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击ETL 任务列表
  4. ETL 任务列表页面,目标ETL任务所在行操作列,单击详情
    任务详情页面,查看任务详细信息。

删除ETL任务

如果您不再需要某个ETL任务,或者某个ETL任务不再使用,您可以在消息队列Kafka版控制台删除该ETL任务。

  1. 登录消息队列Kafka版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 在左侧导航栏,单击ETL 任务列表
  4. ETL 任务列表页面,目标ETL任务所在行操作列,单击 删除
  5. 提示对话框中,单击确认