管理数据处理任务

当您希望对云消息队列 Kafka 版实例中的数据进行清洗、转换并转存,可以通过执行数据处理任务实现。本文介绍如何使用 云消息队列 Kafka 版数据处理任务将源Topic中的数据处理后发送到目标Topic。

前提条件

在使用前,请确保您已完成以下操作:

  • 云消息队列 Kafka 版实例创建数据源Topic和目标Topic。具体操作,请参见步骤一:创建Topic

    说明

    如果需要手动创建数据处理任务所依赖的Topic,也需创建存储数据处理任务相关信息的Topic。关于创建Topic时参数配置说明,请参见配置数据源和目标

  • 开通函数计算服务。更多信息,请参见 开通函数计算服务

  • 如果登录账号是RAM账号,需给RAM账号授权。具体操作,请参见 RAM账号授权

    对应权限策略脚本示例如下:

    {
        "Version": "1",
        "Statement": [
            {
                "Action": [
                    "kafka:CreateETLTask",
                    "kafka:ListETLTask",
                    "kafka:DeleteETLTask"
                ],
                "Resource": "*",
                "Effect": "Allow"
            }
        ]
    }

背景信息

  • 数据处理是将数据从来源端经过抽取、转换、加载至目的端的过程。您可以编写一个函数, 云消息队列 Kafka 版将会使用这个函数将源Topic中的数据进行处理后,发送到目标Topic中。

  • 函数处理过程中函数计算会自动创建对应的服务和函数,服务的名称自动在数据处理任务名称加前缀 _FC-kafka

  • 仅支持在同地域内,将数据从 云消息队列 Kafka 版实例的数据源Topic经过函数计算转化后发送至目标Topic。

  • 函数计算的函数调用支持日志查询,以便您迅速排查问题。具体操作步骤,请参见 配置日志

  • 云消息队列 Kafka 版的数据处理任务处于公测阶段,且独立于 云消息队列 Kafka 版实例,因此不会在 云消息队列 Kafka 版侧产生费用。如在使用数据处理任务时依赖其他产品,费用说明请以对应产品为准。

开启数据处理

重要

新建数据处理任务入口已变更至Connector 生态集成,Connector 生态集成提供数据过滤、转换能力。更多信息,请参见Connector概述

首次使用云消息队列 Kafka 版数据处理任务功能前,需先进行服务授权。授权后,系统会自动创建服务关联角色AliyunServiceRoleForAlikafkaETL。通过该角色, 云消息队列 Kafka 版可以获取与数据处理相关的产品的访问权限。更多信息,请参见服务关联角色

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  2. 在左侧导航栏,选择生态集成 > 数据处理任务列表

  3. 数据处理任务列表页面,单击创建任务

  4. 在弹出的服务授权对话框中,单击确认

创建数据处理任务

创建并部署数据处理任务用于将数据从源Topic经过处理后发送至目标Topic。

  1. 数据处理任务列表页面,单击 创建任务

  2. 配置基本信息页面,输入数据处理任务名称,单击 下一步

  3. 配置数据源和目标页面,配置数据源、目标Topic和消费信息。配置完成后单击 下一步

    参数

    说明

    示例值

    实例

    数据源Topic和目标Topic所属的实例。

    • alikafka_pre-cn-7pp2bz47****

    • alikafka_post-cn-2r42bvbm****

    Topic

    数据源Topic和目标Topic。

    说明

    数据源Topic和目标Topic不能相同。

    • topic****

    • test****

    消费初始位置

    数据消费的开始时间位点。单击 高级选项显示该参数。默认取值说明如下:

    • 最早位点:从最初位点开始消费。

    • 最近位点:从最新位点开始消费。

    最近位点

    失败处理

    消息发送失败后,是否继续发送后续数据。单击 高级选项显示该参数。取值说明如下:

    • 继续订阅:消息发送失败后,继续发送后续数据。

    • 停止订阅:消息发送失败后,停止发送后续数据。

    继续订阅

    创建资源方式

    选择创建数据处理任务所依赖的Topic的方式。单击 高级选项显示该参数。

    • 自动创建

    • 手动创建

    自动创建

    数据处理任务消费组

    数据处理任务使用的 Group。如果选择 创建资源方式手动创建,显示该参数。该 Group的名称建议以etl-cluster开头。

    etl-cluster-kafka

    任务位点 Topic

    用于存储消费位点的Topic。如果选择 创建资源方式手动创建,显示该参数。

    • Topic名称:建议以etl-offset开头。

    • 分区数:Topic的分区数量必须大于1。

    • 存储引擎:Topic的存储引擎必须为Local存储。

      说明

      当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。

    • cleanup.policy:Topic的日志清理策略必须为compact。

    etl-offset-kafka

    任务配置 Topic

    用于存储任务配置的Topic。如果选择 创建资源方式手动创建,显示该参数。

    • Topic名称:建议以etl-config开头。

    • 分区数:Topic的分区数量必须为1。

    • 存储引擎:Topic的存储引擎必须为Local存储。

      说明

      当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。

    • cleanup.policy:Topic的日志清理策略必须为compact。

    etl-config-kafka

    任务状态 Topic

    用于存储任务状态的Topic。如果选择 创建资源方式手动创建,显示该参数。

    • Topic:建议以etl-status开头。

    • 分区数:Topic的分区数量建议为6。

    • 存储引擎:Topic的存储引擎必须为Local存储。

      说明

      当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。

    • cleanup.policy:Topic的日志清理策略必须为compact。

    etl-status-kafka

    死信队列 Topic

    用于存储数据处理框架的异常数据的Topic。如果选择 创建资源方式手动创建,显示该参数。该Topic可以和 异常数据Topic 为同一个Topic,以节省Topic资源。

    • Topic:建议以etl-error开头。

    • 分区数:Topic的分区数量建议为6。

    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。

      说明

      当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。

    etl-error-kafka

    异常数据 Topic

    用于存储Sink的异常数据的Topic。如果选择 创建资源方式手动创建,显示该参数。该Topic可以和 死信队列Topic 为同一个Topic,以节省Topic资源。

    • Topic:建议以etl-error开头。

    • 分区数:Topic的分区数量建议为6。

    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。

      说明

      当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。

    etl-error-kafka

  4. 配置处理函数页面,配置函数信息,并单击 创建

    单击创建前,您可以单击 测试,测试编写的处理函数是否符合预期。

    参数

    说明

    示例

    函数语言

    处理函数的语言。仅支持Python3。

    Python3

    函数模版

    系统提供的处理函数模板。选择一种模板,默认提供对应的函数代码。

    添加前/后缀

    函数代码

    处理消息的代码。提供数据清洗和转换的模板,并提供对应的函数代码,您可以根据需要修改和编辑代码内容。

    说明
    • 您可以根据需要import python部分模块。

    • 代码中message为字典格式,您修改key和value即可,其他不需要修改。

    • 将处理完的message返回。如果要是对消息的过滤,返回None。

    def deal_message(message):
        for keyItem in message.keys():
            if (keyItem == 'key'):
                message[keyItem] = message[keyItem] + "KeySurfix"
                continue
            if (keyItem == 'value'):
                message[keyItem] = message[keyItem] + "ValueSurfix"
                continue
        return message

    消息 Key

    源Topic的消息处理Key值。单击 测试代码显示该参数。

    demo

    消息内容

    源Topic的消息处理Value值。

    {"key": "test"}

    创建完成后,在数据处理任务列表页面,查看创建的数据处理任务。创建成功后,系统自动部署。

发送测试消息

部署数据处理任务后,您可以向云消息队列 Kafka 版的数据源Topic发送消息,测试数据能否根据函数代码处理后发送至目标Topic。

  1. 数据处理任务列表页面,目标数据处理任务所在行操作列,单击测试

  2. 发送消息面板,填写如下信息,并单击确定,发送测试消息。

    • 消息 Key文本框中输入消息Key值,例如demo。

    • 消息内容文本框中输入测试的消息内容,例如{"key": "test"}。

    • 设置 发送到指定分区,选择是否指定分区。

      • 单击,在 分区 ID文本框中输入分区的ID,例如0。如果您需要查询分区的ID,请参见 查看分区状态

      • 单击,不指定分区。

查看函数日志

云消息队列 Kafka 版源Topic获取数据,经过函数处理发送至目标Topic后,查看函数日志,验证是否收到消息。更多信息,请参见 配置日志

图 1. 函数日志查询函数日志查询

查看数据处理任务详情

成功创建数据处理任务后,您可以在云消息队列 Kafka 版控制台查看详细信息。

数据处理任务列表页面,目标数据处理任务所在行操作列,单击详情

任务详情页面,查看任务详细信息。

删除数据处理任务

如果您不再需要某个数据处理任务,或者某个数据处理任务不再使用,您可以在云消息队列 Kafka 版控制台删除该数据处理任务。

  1. 数据处理任务列表页面,目标数据处理任务所在行操作列,单击 删除

  2. 提示对话框中,单击确认