创建Elasticsearch Sink Connector

本文介绍如何创建Elasticsearch Sink Connector,您可以通过Elasticsearch Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至阿里云Elasticsearch中。

前提条件

详细步骤,请参见创建前提

步骤一:创建目标服务资源

步骤二:创建Elasticsearch Sink Connector

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  2. 在左侧导航栏,选择Connector生态集成 > 任务列表

  3. 任务列表页面,单击创建任务

  4. 创建任务面板,设置任务名称描述,配置以下参数。

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 Kafka 版,设置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        源Kafka实例所在的地域。

        华东1(杭州)

        kafka实例

        数据源所在的Kafka实例ID。

        alikafka_post-cn-9hdsbdhd****

        Topic

        数据源所在的Kafka实例Topic。

        guide-sink-topic

        Group ID

        数据源所在的Kafka实例中的Group ID。

        • 快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。

        • 使用已有:选择已创建的Group,请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。

        使用已有

        消费位点

        • 最新位点:从最新位点开始消费。

        • 最早位点:从最初位点开始消费。

        最新位点

        网络配置

        有跨境传输数据需求时选择自建公网,其他情况可选择基础网络

        基础网络

        数据格式

        数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。

        • Json(默认Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)

        • Text(文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)

        • Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)

        Json

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        2000

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)配置向导,定义数据模式过滤发送的请求。更多信息,请参见事件模式

      3. Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗

      4. Sink(目标)配置向导,选择服务类型阿里云Elasticsearch acs.elasticSearch,配置以下参数。

        参数

        说明

        示例

        Elasticsearch实例

        已创建的Elasticsearch实例。

        es-cn-pe336j0gj001e****

        实例登录名

        创建实例时配置的登录名,默认为elastic。

        elastic

        实例登录密码

        创建实例时配置的密码。

        ******

        索引名称

        已创建的索引名称。如何创建索引,请参见步骤三:创建索引。索引名称支持字符串常量或 JsonPath 规则提示变量,如:product_info、$.data.key。

        product_info

        数据文档类型

        填写数据文档类型,支持字符串常量或 JsonPath 规则提取变量。

        如:_doc、$.data.key。

        说明

        仅当 Elasticsearch实例版本小于 7 时才可配置,默认为常量_doc。

        _doc

        文档 document

        选择将完整事件或部分事件投递到Elasticsearch,如选择部分事件,需配置JsonPath 提取规则。

        完整事件

        网络配置

        • 专有网络:通过专有网络VPC将Kafka消息投递到Elasticsearch。

        • 公网:通过公网将Kafka消息投递到Elasticsearch。

        公网

        VPC

        选择Elasticsearch实例所属的专有网络。仅当网络配置专有网络时需配置此参数。

        vpc-bp17fapfdj0dwzjkd****

        交换机

        选择Elasticsearch实例所属的专有交换机。仅当网络配置专有网络时需配置此参数。

        vsw-bp1gbjhj53hdjdkg****

        安全组

        选择安全组。仅当网络配置专有网络时需配置此参数。

        test_group

    • 任务属性

      配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信

  5. 完成上述配置后,单击保存。在任务列表页面,找到刚创建的Elasticsearch Sink Connector任务,此时状态栏为启动中,当状态变为运行中时,Connector创建成功。

步骤三:测试Elasticsearch Sink Connector

  1. 任务列表页面,在Elasticsearch Sink Connector任务的事件源列单击源Topic。

  2. 在Topic详情页面,单击体验发送消息

  3. 快速体验消息收发面板,按照下图配置消息内容,然后单击确定

    发送消息

  4. 登录Elasticsearch管理控制台,通过Kibana访问实例。更多信息,请参见快速入门

  5. 在Kibana控制台上执行以下命令,查看数据插入结果。

    GET /{索引名称}/_search

    数据插入结果如下所示:测试结果