本文为您介绍全量Elasticsearch(ES)源表DDL定义、WITH参数、类型映射和代码示例。
注意
- 仅支持Elasticsearch 5.5及以上版本。
- 仅支持全量Elasticsearch源表,不支持增量Elasticsearch源表。
- DDL中的字段均对应Elasticsearch文档中的字段,不支持将文档ID写入表中。
DDL定义
CREATE TABLE elasticsearch_source(
name STRING,
location STRING,
`value` FLOAT
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
connector | 源表类型 | 是 | 固定值为elasticsearch 。
|
endPoint | Server地址 | 是 | 例如:http://127.0.0.1:9200。 |
accessId | Elasticsearch实例的用户名 | 否 | 无 |
accessKey | Elasticsearch实例的密码 | 否 | 无 |
indexName | 文档索引名称 | 是 | 无 |
typeNames | Type名称 | 否 | 默认值为_doc 。
说明 Elasticsearch 7.0以上版本不建议设置该参数。
|
batchSize | 每个scroll请求从Elasticsearch集群获取的最大文档数 | 否 | 默认值为2000。 |
keepScrollAliveSecs | scroll上下文保留的最长时间 | 否 | 单位为秒,默认值为3600。 |
类型映射
Flink以JSON来解析Elasticsearch数据,详情请参见数据类型映射关系。
代码示例
CREATE TEMPORARY TABLE elasticsearch_source (
name STRING,
location STRING,
`value` FLOAT
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
CREATE TEMPORARY TABLE blackhole_sink (
name STRING,
location STRING,
`value` FLOAT
) WITH (
'connector' ='blackhole'
);
INSERT INTO blackhole_sink
SELECT name, location, `value`
FROM elasticsearch_source;
在文档使用中是否遇到以下问题
更多建议
匿名提交