本文为您介绍全量Elasticsearch(ES)源表DDL定义、WITH参数、类型映射和代码示例。

注意
  • 仅支持Elasticsearch 5.5及以上版本。
  • 仅支持全量Elasticsearch源表,不支持增量Elasticsearch源表。
  • DDL中的字段均对应Elasticsearch文档中的字段,不支持将文档ID写入表中。

DDL定义

 CREATE TABLE elasticsearch_source(
   name STRING, 
   location STRING, 
   `value` FLOAT
) WIHT (
   'connector' ='elasticsearch',
   'endPoint' = '<yourEndPoint>',
   'accessId' = '<yourAccessId>',
   'accessKey' = '<yourAccessSecret>',
   'indexName' = '<yourIndexName>',
   'typeNames' = '<yourTypeName>'
);

WITH参数

参数 说明 是否必填 备注
connector 源表类型 固定值为elasticsearch
endPoint Server地址 例如:http://127.0.0.1:9200
accessId AccessKey ID
accessKey AccessKey Secret
indexName 文档索引名称
typeNames Type名称 默认值为_doc
说明 Elasticsearch 7.0以上版本不建议设置该参数。
batchSize 每个scroll请求从Elasticsearch集群获取的最大文档数 默认值为2000。
keepScrollAliveSecs scroll上下文保留的最长时间 单位为妙,默认值为3600秒。

类型映射

Flink全托管以JSON来解析Elasticsearch数据,详情请参见数据类型映射关系

代码示例

CREATE TEMPORARY TABLE elasticsearch_source (
   name STRING,
   location STRING,
   `value` FLOAT
) WITH (
   'connector' ='elasticsearch',
   'endPoint' = '<yourEndPoint>',
   'accessId' = '<yourAccessId>',
   'accessKey' = '<yourAccessSecret>',
   'indexName' = '<yourIndexName>',
   'typeNames' = '<yourTypeName>'
);

CREATE TEMPORARY TABLE blackhole_sink (
   name STRING,
   location STRING,
   `value` FLOAT
) WITH (
   'connector' ='blackhole'
);

INSERT INTO blackhole_sink
SELECT name, location, `value`
FROM elasticsearch_source;