文档

创建RocketMQ Sink Connector

更新时间:

本文介绍如何在云消息队列 Kafka 版控制台创建消息流出任务,将云消息队列 Kafka 版的数据导出至云消息队列 RocketMQ 版

前提条件

创建消息流出任务

  1. 登录云消息队列 Kafka 版控制台,在左侧导航栏选择Connector生态集成 > 任务列表

  2. 在顶部菜单栏选择地域,如华东1(杭州),然后在任务列表页面中,单击创建任务列表

  3. 创建任务面板,设置任务名称描述,配置以下参数,单击保存

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 Kafka 版,设置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        选择云消息队列 Kafka 版源实例所在的地域。

        华北2(北京)

        kafka 实例

        选择生产云消息队列 Kafka 版消息的源实例。

        MQ_INST_115964845466****_ByBeUp3p

        Topic

        选择生产云消息队列 Kafka 版消息的Topic。

        topic

        Group ID

        选择源实例的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。

        GID_http_1

        并发配额(消费者数)

        选择源实例的消费者数。

        1

        消费位点

        选择开始消费消息的位点。

        最新位点

        网络配置

        选择路由消息的网络类型。

        默认网络

        专有网络VPC

        选择VPC ID。当网络配置设置为自建公网时需要设置此参数。

        vpc-bp17fapfdj0dwzjkd****

        交换机

        选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。

        vsw-bp1gbjhj53hdjdkg****

        安全组

        选择安全组。当网络配置设置为自建公网时需要设置此参数。

        alikafka_pre-cn-7mz2****

        批量推送

        批量推送可帮您批量聚合多个事件,当批量推送条数批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。

        例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。

        开启

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见消息过滤

      3. Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见数据清洗

      4. Sink(目标)配置向导,选择服务类型消息队列 RocketMQ 版,配置以下参数。

        参数

        说明

        示例

        版本

        选择云消息队列 RocketMQ 版实例版本。

        RocketMQ 4.x

        实例ID

        选择已创建的云消息队列 RocketMQ 版实例。

        test

        Topic

        选择已创建的Topic。

        test

        消息体(body)

        事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。

        部分事件

        $.data.body

        自定义属性(Properties)

        选择模板。您可以自定义一个模板,定义模板里需要的变量,事件总线EventBridge可以提取事件中的字段,按照模板定义的形式进行转换。

        说明

        如果需要全量传递源端的RocketMQ消息的属性,推荐使用示例中的配置。

        变量

        {
          "userProperties":"$.data.userProperties",
          "msgId":"$.data.systemProperties.UNIQ_KEY"
        }

        模板

        {
          "EB_SYS_EMBED_OBJECT":"${userProperties}",
          "UNIQ_KEY":"${msgId}"
        }

        消息索引(Keys)

        事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。

        部分事件

        $.data.systemProperties.KEYS

        消息标签(Tags)

        事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。

        部分事件

        $.data.systemProperties.TAGS
    • 任务属性

      配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信

  4. 返回任务列表页面,找到创建好的任务,在其右侧操作列,单击启用

  5. 提示对话框,阅读提示信息,然后单击确认

    启用任务后,会有30秒~60秒的延迟时间,您可以在任务列表页面的状态栏查看启动进度。

其他操作

任务列表页面,找到目标任务,在其右侧操作列,执行其他操作。

  • 查看任务详情:单击详情,在任务页面,查看任务的基础信息、任务属性及监控指标。

  • 编辑任务配置:单击编辑,在编辑任务面板,修改任务详情及属性。

  • 启停任务:单击启用或者停用,然后在提示对话框,单击确认

  • 删除任务:单击删除,然后在提示对话框,单击确认

  • 本页导读 (1)
文档反馈