创建AnalyticDB Sink Connector

本文介绍如何创建使用AnalyticDB Sink Connector,您可通过AnalyticDB Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至AnalyticDB的表中。

前提条件

详细步骤,请参见创建前提

步骤一:创建目标服务资源

创建云原生数据仓库 AnalyticDB MySQL 版资源或云原生数据仓库AnalyticDB PostgreSQL版资源。

本文以AnalyticDB PostgreSQL版为例,创建数据库名为abd_sink_database和数据表名为abd_sink_table的资源。

步骤二:创建AnalyticDB Sink Connector并启动

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  2. 在左侧导航栏,选择Connector 生态集成 > 任务列表

  3. 任务列表页面,单击创建任务

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 Kafka 版,设置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        选择云消息队列 Kafka 版源实例所在的地域。

        华北2(北京)

        kafka 实例

        选择生产云消息队列 Kafka 版消息的源实例。

        alikafka_post-cn-jte3****

        Topic

        选择生产云消息队列 Kafka 版消息的Topic。

        topic

        Group ID

        数据源所在的云消息队列 Kafka 版实例的Group ID。

        • 快速创建:新建一个Group ID资源,

        • 使用已有:填写一个已经的创建好的GroupID资源。

        GID_http_1

        消费位点

        选择开始消费消息的位点。

        最新位点

        网络配置

        选择路由消息的网络类型。

        基础网络

        专有网络VPC

        选择VPC ID。当网络配置设置为自建公网时需要设置此参数。

        vpc-bp17fapfdj0dwzjkd****

        交换机

        选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。

        vsw-bp1gbjhj53hdjdkg****

        安全组

        选择安全组。当网络配置设置为自建公网时需要设置此参数。

        alikafka_pre-cn-7mz2****

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见消息过滤

      3. Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见数据清洗

      4. Sink(目标)配置向导,选择服务类型云原生数据仓库 AnalyticDB,配置以下参数,单击保存

        参数

        说明

        示例

        实例类型

        选择已创建的实例的类型,本文以AnalyticDB PostgreSQL版为例。

        • MySQL版

        • PostgreSQL版

        PostgreSQL版

        AnalyticDB实例ID

        选择已创建的实例ID。

        gp-bp10uo5n536wd****

        数据库名

        选择创建的数据库。

        abd_sink_database

        表名

        选择创建的数据表。

        abd_sink_table

        数据库用户名

        输入设置的用户名。

        user

        数据库密码

        输入设置的密码。

        ******

        网络配置

        • 专有网络:通过专有网络VPC将Kafka消息投递到AnalyticDB。

        • 公网:通过公网将Kafka消息投递到AnalyticDB。

        公网

        VPC

        选择VPC ID。仅当网络配置专有网络时需配置此参数。

        vpc-bp17fapfdj0dwzjkd****

        交换机

        选择vSwitch ID。仅当网络配置专有网络时需配置此参数。

        vsw-bp1gbjhj53hdjdkg****

        安全组

        选择安全组。仅当网络配置专有网络时需配置此参数。

        test_group

  4. 返回任务列表页面,找到创建好的任务,在其右侧操作列,单击启用

  5. 提示对话框,阅读提示信息,然后单击确认

    启用任务后,会有30秒~60秒的延迟时间,您可以在任务列表页面的状态栏查看启动进度。

步骤三:测试AnalyticDB Sink Connector

  1. 任务列表页面,在AnalyticDB Sink Connector任务的事件源列单击源Topic。

  2. 在Topic详情页面,单击体验发送消息

  3. 快速体验消息收发面板,按照下图配置消息内容,然后单击确定

    说明

    此处的消息内容为JSON格式,JSON中需包含已创建的数据表的所有列,系统会将JSON中同名字段对应的值写入到数据表对应的列中。

    发送消息

  4. 任务列表页面,在AnalyticDB Sink Connector任务的事件目标列单击目标实例。

  5. 在实例基本信息页面,单击右上角的登录数据库

  6. 在DMS数据管理服务页面,执行以下语句,查询表中全量数据。

    SELECT * FROM  abd_sink_table;

    查询结果如下所示:结果