本文介绍将阿里云实时计算Flink处理的数据,同步到阿里云Elasticsearch(简称ES)中的方法。

前提条件

背景信息

阿里云实时计算Flink是阿里云官方支持的Flink产品,支持包括Kafka/Elasticsearch等多种输入输出系统。实时计算Flink与Elasticsearch结合,能够满足典型的日志检索场景,原理如下。

Kafka/LOG等系统中的日志,经过Flink进行简单或者复杂计算之后,输出到ES进行搜索。结合Flink的强大计算能力与ES的强大搜索能力,可为业务提供实时数据加工及查询,助力业务实时化转型。且实时计算Flink为您提供了非常简单的对接ES的方式,以下示例来说明实时计算Flink产品如何与ES对接。

假设目前业务的日志或者数据被写入了LOG中,并且需要对LOG中的数据进行计算之后再写到ES进行搜索。整条链路示意如下。Flink+ES数据链路

创建实时计算作业

  1. 登录实时计算控制台创建作业
  2. 编写Flink SQL。
    1. 创建日志服务LOG源表。
      create table sls_stream(
        a int,
        b int,
        c VARCHAR
      )
      WITH (
        type ='sls',  
        endPoint ='<yourEndpoint>',
        accessId ='<yourAccessId>',
        accessKey ='<yourAccessKey>',
        startTime = '<yourStartTime>',
        project ='<yourProjectName>',
        logStore ='<yourLogStoreName>',
        consumerGroup ='<yourConsumerGroupName>'
      );
      WITH参数说明如下表。
      变量 说明
      <yourEndpoint> 阿里云日志服务的公网服务入口,即访问对应LOG项目及其内部日志数据的URL。详情请参见服务入口

      例如杭州区域的日志服务入口为:http://cn-hangzhou.log.aliyuncs.com。需要在对应的服务入口前加http://

      <yourAccessId> LogService的Access Key ID。
      <yourAccessKey> LogService的密钥。
      <yourStartTime> 消费日志开始的时间点。运行Flink作业时所选时间要大于此处设置的时间。
      <yourProjectName> LogService的项目名称。
      <yourLogStoreName> LogService项目下具体的LogStore名称。
      <yourConsumerGroupName> 日志服务的消费组名称。

      更多WITH参数及其说明请参见创建日志服务LOG源表

    2. 创建Elasticsearch结果表。
      注意
      • 实时计算3.2.2及以上版本增加了Elasticsearch结果表功能,创建Flink作业时,请注意所选的版本。
      • Elasticsearch结果表的实现使用REST API,理论上兼容Elasticsearch的各个版本。
      CREATE TABLE es_stream_sink(
        a int,
        cnt BIGINT,
        PRIMARY KEY(a)
      )
      WITH(
        type ='elasticsearch',
        endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessSecret>',
        index = '<yourIndex>',
        typeName = '<yourTypeName>'
      );
      WITH参数说明如下。
      变量 说明
      <instanceid> 阿里云Elasticsearch实例的ID,可在实例的基本信息页面获取。

      例如es-cn-45xxxxxxxxxxxxk1q

      <port> 阿里云Elasticsearch实例的公网端口,可在实例的基本信息页面获取。

      一般为9200

      <yourAccessId> 访问阿里云ES实例的账号,即登录Kibana控制台的用户名。一般为elastic。
      <yourAccessKey> 访问阿里云ES实例的密码,即登录Kibana控制台的密码,在创建实例时设置。
      <yourIndex> ES的文档索引,类似于数据库Database的名称。如果您还未创建过文档索引,需要首先创建一个文档索引,详情请参见创建索引
      <yourTypeName> ES的索引类型,类似于数据库的Table名称。如果您还未创建过索引类型,需要首先创建一个索引类型,详情请参见创建索引

      更多WITH参数及其说明请参见创建ElasticSearch结果表

      • ES支持根据PRIMARY KEY进行文档更新,且PRIMARY KEY只能为1个字段。指定PRIMARY KEY后,文档的ID为PRIMARY KEY字段的值。未指定PRIMARY KEY的文档对应的ID为随机,详情请参见Index API
      • ES支持多种更新模式,对应WITH中的参数为updateMode
        • updateMode=full时,新增的文档会完全覆盖已存在的文档。
        • updateMode=inc时,ES会根据输入的字段值更新对应的字段。
      • ES所有的更新默认为UPSERT语义,即INSERT或UPDATE。
    3. 处理业务逻辑并同步数据。
      INSERT INTO es_stream_sink
      SELECT 
        a,
        count(*) as cnt
      FROM sls_stream GROUP BY a
  3. 提交运行。
    将以上作业文件提交运行,即可将LOG中的数据进行简单聚合后写入Elasticsearch。实时计算Flink还支持更多的计算操作,包括创建、UDX等,详情请参见Flink SQL概述

总结

使用实时计算Flink+Elasticsearch,可帮助您快速创建实时搜索链路。如果您有更复杂的Elasticsearch写入逻辑,可以使用实时计算Flink的自定义Sink功能来实现,详情请参见创建自定义结果表