本文为您介绍Elasticsearch(ES)维表DDL定义、WITH参数、CACHE参数和代码示例。
说明 该Connector支持的功能是和阿里云Elasticsearch产品对齐的,暂不支持HTTPS加密访问形式。
什么是阿里云Elasticsearch
阿里云Elasticsearch兼容开源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商业功能,致力于数据分析、数据搜索等场景服务。为您提供企业级权限管控、安全监控告警、自动报表生成等场景服务。
前提条件
- 已创建Elasticsearch索引,详情请参见步骤一:创建实例。
- 已配置Elasticsearch公网或私网访问白名单,详情请参见配置实例公网或私网访问白名单。
使用限制
仅Flink计算引擎VVR 2.0.0及以上版本支持Elasticsearch Connector。
DDL定义
CREATE TABLE es_dim(
field1 STRING, --作为JOIN时的key,必须为STRING类型。
field2 FLOAT,
field3 BIGINT,
PRIMARY KEY (field1) NOT ENFORCED
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
说明
- 如果指定主键,则维表JOIN时的key有且只能有1个,且为Elasticsearch对应索引的文档ID。
- 如果不指定主键,则维表JOIN时的key可以有1个或多个,且为Elasticsearch对应索引的文档中的字段。
- 支持Elasticsearch 5.5及以上版本。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
connector | 维表类型。 | 是 | 固定值为elasticsearch 。
|
endPoint | 服务地址。 | 是 | 例如:http://127.0.XX.XX:9200。 |
accessId | Elasticsearch实例的用户名。 | 否 | 无。 |
accessKey | Elasticsearch实例的密码。 | 否 | 无。 |
indexName | 索引名称。 | 是 | 无。 |
typeNames | Type名称。 | 否 | 默认值为_doc 。
说明 Elasticsearch 7.0以上版本不建议设置该参数。
|
CACHE参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
cache | 缓存策略。 | 否 | Elasticsearch仅支持以下两种缓存策略:
|
cacheSize | 缓存大小。 | 否 | 选择LRU 缓存策略后,可以设置缓存大小,默认值为10000行。
|
cacheTTLMs | 缓存超时时间,单位为毫秒。 | 否 | cacheTTLMs配置和cache有关:
|
cacheEmpty | 是否缓存空结果。 | 否 | 默认值为true。 |
类型映射
Flink以JSON来解析Elasticsearch数据,详情请参见数据类型映射关系。
代码示例
CREATE TEMPORARY TABLE datagen_source (
id STRING,
data STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE es_dim (
id STRING,
`value` FLOAT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
CREATE TEMPORARY TABLE blackhole_sink (
id STRING,
data STRING,
`value` FLOAT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT e.*, w.*
FROM datagen_source AS e
JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;