文档

创建Elasticsearch Sink Connector

更新时间:

本文介绍如何创建Elasticsearch Sink Connector,您可以通过Elasticsearch Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至阿里云Elasticsearch中。

前提条件

详细步骤,请参见创建前提

步骤一:创建目标服务资源

步骤二:创建Elasticsearch Sink Connector

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  2. 在左侧导航栏,选择Connector生态集成 > 消息流出(Sink)

  3. 消息流出(Sink)页面,单击创建任务

  4. 消息流出创建面板,配置以下参数,单击确定

    1. 基础信息区域,设置任务名称,将流出类型选择为检索分析服务Elasticsearch

    2. 资源配置区域,设置以下参数。

      表 1. 源(云消息队列 Kafka 版

      参数

      说明

      示例

      地域

      源Kafka实例所在的地域。

      华东1(杭州)

      kafka实例

      数据源所在的Kafka实例ID。

      alikafka_post-cn-9hdsbdhd****

      Topic

      数据源所在的Kafka实例Topic。

      guide-sink-topic

      Group ID

      数据源所在的Kafka实例中的Group ID。

      • 快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。

      • 使用已有:选择已创建的Group,请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。

      使用已有

      并发配额(消费者数)

      消费Topic数据的并发线程数,线程和Topic分区的对应关系如下:

      • Topic分区数=并发消费数:一个线程消费一个Topic分区。建议使用。

      • Topic分区数>并发消费数:多个并发消费会均摊所有分区消费。

      • Topic分区数<并发消费数:一个线程消费一个Topic分区,多出的消费数无效。

      2

      消费位点

      • 最新位点:从最新位点开始消费。

      • 最早位点:从最初位点开始消费。

      最新位点

      网络配置

      有跨境传输数据需求时选择自建公网,其他情况可选择默认网络

      默认网络

      表 1. 目标(检索分析服务Elasticsearch)

      参数

      说明

      示例

      Elasticsearch实例

      已创建的Elasticsearch实例。

      es-cn-pe336j0gj001e****

      实例登录名

      创建实例时配置的登录名,默认为elastic。

      elastic

      实例登录密码

      创建实例时配置的密码。

      ******

      索引名称

      已创建的索引名称。如何创建索引,请参见步骤三:创建索引

      product_info

      是否使用 SSL

      根据自身需求选择非加密连接或SSL安全连接。

      不使用

      网络配置

      • 专有网络:通过专有网络VPC将Kafka消息投递到Elasticsearch。

      • 公网:通过公网将Kafka消息投递到Elasticsearch。

      公网

      VPC

      选择VPC ID。仅当网络配置专有网络时需配置此参数。

      vpc-bp17fapfdj0dwzjkd****

      交换机

      选择vSwitch ID。仅当网络配置专有网络时需配置此参数。

      vsw-bp1gbjhj53hdjdkg****

      安全组

      选择安全组。仅当网络配置专有网络时需配置此参数。

      test_group

      批量推送

      批量推送可帮您批量聚合多个事件,当批量推送条数批量推送间隔两个条件达到任意一个时即会触发批量推送。例如:您设置的推送条数为100条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等待15 s后再推送。

      • 开启:开启批量推送功能。

      • 关闭:关闭批量推送功能。默认每次仅投递给FC一条消息。

      关闭

      批量推送条数

      一次调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求。取值范围为[1,10000]。仅当批量推送参数设置为开启时需要配置此参数。

      100

      批量推送间隔

      调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算。取值范围为[0,15],单位:秒。取值为0表示没有等待时间,直接投递。仅当批量推送参数设置为开启时需要配置此参数。

      10

    完成上述配置后,在消息流出(Sink)页面,找到刚创建的Elasticsearch Sink Connector任务,单击其右侧操作列的启动。当状态栏由启动中变为运行中时,Connector创建成功。

步骤三:测试Elasticsearch Sink Connector

  1. 消息流出(Sink)页面,在Elasticsearch Sink Connector任务的事件源列单击源Topic。

  2. 在Topic详情页面,单击体验发送消息

  3. 快速体验消息收发面板,按照下图配置消息内容,然后单击确定

    发送消息

  4. 登录Elasticsearch管理控制台,通过Kibana访问实例。更多信息,请参见快速入门

  5. 在Kibana控制台上执行以下命令,查看数据插入结果。

    GET /{索引名称}/_doc/_search

    数据插入结果如下所示:测试结果

  • 本页导读 (1)