消息检索

云消息队列 Kafka 版控制台提供的按位点查询和按时间查询消息的功能无法满足您搜索消息的需求时,您可以使用云消息队列 Kafka 版消息检索功能。消息检索支持按Topic分区、位点范围、时间范围以及消息KeyValue关键字检索。本文介绍如何开通消息检索、添加检索条件以及进行暂停、启用、删除管理操作。

前提条件

  • 已为云消息队列 Kafka 版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic

    说明

    Serverless实例暂不支持消息检索功能。

  • 云消息队列 Kafka 版实例所在地域已支持消息检索功能。查看地域支持情况,请参见开服地域

背景信息

云消息队列 Kafka 版消息检索借助云消息队列 Kafka 版Connector功能及表格存储(Tablestore)实现,通过ConnectorTopic中的消息进行转储,发送到表格存储中的数据表中,由表格存储引功能提供消息检索的能力。

  • 开通消息检索相当于自动化创建了一个云消息队列 Kafka 版同步数据至表格存储Connector任务,任务名称格式为:ots-ms-{Topic名称}-{6位随机字符}。任务详情查看操作,请参见查看消息检索任务详情

  • 首次开通消息检索后,云消息队列 Kafka 版自动为您开通同地域下的表格存储服务,并创建表格存储实例和对应的数据表。每个开通消息检索的Topic会在表格存储中对应创建一张数据表。自动创建的实例和数据表名称格式如下:

    • 实例名称:kfk-{云消息队列 Kafka 版实例名称后12位字符}

    • 数据表表名:{Topic名称}:kafka_topic_{Topic名称}_{6位随机字符}

  • 每个开通消息检索的Topic会在云消息队列 Kafka 版实例中自动创建4Topic2Group,用于记录任务配置和任务状态。名称格式如下:

    • 任务位点Topic:connect-offset-{任务名称}

    • 任务配置Topic:connect-config-{任务名称}

    • 任务状态Topic:connect-status-{任务名称}

    • 死信队列Topic/异常数据Topic:connect-error-{任务名称}

    • Connector消费组:connect-{任务名称}

    • 任务集群Group:connect-cluster-{任务名称}

计费说明

  • 云消息队列 Kafka 版消息检索处于公测阶段,暂时不会在云消息队列 Kafka 版侧产生费用。

  • 消息检索公测阶段,使用消息检索时,自动在表格存储创建的实例和数据表,暂时不收费。

重要

阿里云不承诺消息检索的SLA,使用消息检索所依赖的其他产品的SLA和费用说明请以对应产品为准。

注意事项

  • 单个云消息队列 Kafka 版实例默认最多同时支持3Topic的消息检索。

  • 表格存储单一属性列String类型的列值大小上限为2 MB,超过2 MB的消息不会被同步,也不会被检索出来。

  • 表格存储中的数据保留时长与云消息队列 Kafka 版实例消息保留时长具有相同的数据生命周期(TTL),当数据超过消息保留时长时,将会自动清除并移除相关索引。关于云消息队列 Kafka 版消息保留时长相关配置和说明信息,请参见变更消息配置

    由于表格存储数据过期策略与云消息队列 Kafka 版并不完全一致,故最终检索到的数据请以实际获取的为准。

消息检索操作步骤

步骤一:开通消息检索

开通某个实例下Topic的消息检索功能,以便于您根据需要对其Topic中消息进行检索。

说明
  • 首次开通消息检索时,仅会自动开通同地域下的表格存储服务。

  • 首次开通消息检索时,云消息队列 Kafka 版会为您自动创建服务关联角色AliyunServiceRoleForAlikafkaConnector,以便使用Connector功能。如果已创建服务关联角色,云消息队列 Kafka 版不会重复创建。关于服务关联角色的更多信息,请参见服务关联角色

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 在左侧导航栏,单击实例列表

  4. 实例列表页面,单击目标实例名称。

  5. 在左侧导航栏,单击消息检索,然后单击开通消息检索

  6. 开通消息检索面板,填写开通参数,然后单击确定

    说明

    如果您是首次在当前实例下开通消息检索功能,单击开通消息检索后,需在您尚未开通当前实例的 Connector 功能提示对话框,单击确认,然后再在开通消息检索面板填写参数。

    image

    开通参数说明

    参数

    描述

    示例值

    数据源 Topic

    需要开通消息检索的Topic。

    test

    消费初始位置

    开始消费的位置。默认取值为最近位点。取值说明如下:

    • 最早位点:从最初位点开始消费。

    • 最近位点:从最新位点开始消费。

    最近位点

    记录时间

    记录消息的时间,即检索消息时的时间范围。默认取值为即时时间。取值说明如下:

    • 原生时间云消息队列 Kafka 版中记录的消息创建时间,即发送消息时,客户端自带的或是您指定的ProducerRecord中的消息创建时间。

    • 即时时间:数据同步至表格存储的时间。

      该值仅表示消息被同步至表格存储的时间,并不是消息的生产时间,因此搜索结果中的消息时间与消息的生产时间可能会不一致。

    原生时间

    消息检索页面您可以查看到刚开通搜索功能的Topic。

步骤二:发送测试消息

开通消息检索后,您可以向云消息队列 Kafka 版的数据源Topic发送消息,测试消息检索是否创建成功。

  1. 消息检索页面,找到需要测试的目标Topic,在其操作列,单击详情

  2. 任务详情页面,单击测试

  3. 发送消息面板,填写消息参数,然后单击确定

    1. 消息 Key文本框中输入消息的Key值,例如demo。

    2. 消息内容文本框输入测试的消息内容,例如 {"key": "test"}。

    3. 设置发送到指定分区,选择是否指定分区。

      • 单击,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态

      • 单击,不指定分区。

步骤三:搜索消息

  1. 消息检索页面,找到目标Topic,在其操作列,单击搜索

  2. 搜索面板,设置搜索条件,在搜索项下拉列表中选择需要添加的搜索项,单击添加搜索项,添加搜索项并在列设置搜索信息,然后单击确定

    image

    说明
    • 搜索条件可以根据分区位点范围时间范围KeyValue自由组合,设置多个搜索条件时,将采用交集进行搜索。

    • 建议发送消息的时候使用业务的唯一标识作为Key值,以便在搜索时能够实现精准匹配。

    搜索参数说明

    搜索项

    说明

    分区

    本次搜索Topic消息内容所在的分区。

    位点范围

    本次搜索Topic消息内容的位点范围。

    时间范围

    搜索Topic消息的时间范围。您可以设置近三天内的某个时间范围和自定义时间。 此处设置的搜索时间最小粒度为分钟。

    Key

    本次搜索Topic消息内容的消息Key或消息内容,即要匹配的KeyValue。

    检索关键词说明如下:

    • 搜索方式为短语匹配搜索。例如,消息Key是“云消息队列 Kafka 版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。”,可以设置搜索关键词为“分布式”,或者“阿里云”和“分布式”组合。

    • 如果搜索关键字中包含星号(*)或问号(?),则采用通配符检索。星号(*)代表任意字符序列,问号(?)代表任意单个字符。英文字母不区分大小写。例如,消息内容是“AliKafkaTest001qaz”,可以设置关键词为“AliKafkaTe*”。

      不建议使用通配符作为起始关键字,检索效率较低,且带有通配符的字符串长度不能超过20个字符。

    关于更多分词、短语匹配检索和通配符检索的使用和限制等详细内容,请参见分词短语匹配查询通配符查询

    Value

  3. Topic搜索页面,展示检索的消息如下:

    image

    说明
    • 消息检索结果最多显示10条,建议在设定搜索条件时尽可能具体,以便在搜索过程中实现精准匹配

    • 查询到的每条消息在控制台上最多显示1 KB的内容,超过1 KB的部分将自动截断。如需查看完整的消息内容,请下载相应的消息。

    检索结果参数说明

    参数

    描述

    分区

    消息的Topic分区。

    位点

    消息的所在的位点。

    Key

    消息的键(已强制转换为String类型)。

    Value

    消息的值(已强制转换为String类型),即消息的具体内容。

    消息创建时间

    消息创建时间或消息同步至表格存储的时间。

    消息创建时间指发送消息时,客户端自带的或是您指定的ProducerRecord中的消息创建时间。

    说明
    • 如果配置了该字段,则按配置值显示。

    • 如果未配置该字段,则默认取消息发送时的系统时间。

    • 如果显示值为1970/x/x x:x:x,则说明发送时间配置为0或其他错误的值。

    操作

    • 单击下载 Key:下载消息的键值。

    • 单击下载 Value:下载消息的具体内容。

其他操作

查看消息检索任务详情

开通消息检索后,即可查看自动创建的Topic、Group表格存储实例名称、表格存储数据表表名等详细信息,您也可以在详情中直接进入表格存储数据表。

消息检索页面,找到目标Topic,在其操作列,单击详情

image

查看消费详情

您可以查看订阅当前Topic的在线GroupTopic各个分区的消费进度,了解消息同步到表格存储时的消费和堆积情况。

消息检索页面,找到需要查看消费进度的目标Topic,在其操作列,单击消费进度

在消费详情页面,您可以查看Topic各分区的消费情况。

image

暂停消息检索任务

  1. 消息检索页面,找到目标Topic,在其操作列,选择更多 > 暂停

  2. 在弹出的对话框单击确认

启用消息检索任务

您可以根据需要重新启用某个已经暂停的消息检索任务。

  1. 消息检索页面,找到目标Topic,在其操作列,选择更多 > 启用

  2. 在弹出的对话框单击确认

删除消息检索任务

删除Topic的消息检索任务后,表格存储的数据表与多元索引会同步删除,该Topic不再提供消息检索功能。如果需要继续使用该Topic消息检索,请重新创建并等待数据同步。

  1. 消息检索页面,找到需要删除的目标Topic,根据任务状态在对应位置操作。

    • 如果任务处于运行中已暂停状态以外的其他状态,则在其操作列,单击删除

    • 如果任务处于运行中已暂停状态,则在其操作列,选择更多 > 删除

  2. 在弹出的对话框单击确认

    消息检索页面,您已看不到刚才已删除的Topic。