本文为您介绍如何使用Elasticsearch连接器。
背景信息
阿里云Elasticsearch兼容开源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商业功能,致力于数据分析、数据搜索等场景服务。为您提供企业级权限管控、安全监控告警、自动报表生成等场景服务。
Elasticsearch连接器支持的信息如下:
| 类别 | 详情 | 
| 支持类型 | 源表、维表和结果表 | 
| 运行模式 | 批模式和流模式 | 
| 数据格式 | JSON | 
| 特有监控指标 | 
 说明  指标含义详情,请参见监控指标说明。 | 
| API种类 | Datastream和SQL | 
| 是否支持更新或删除结果表数据 | 是 | 
前提条件
- 已创建Elasticsearch索引,详情请参见创建示例。 
- 已配置Elasticsearch公网或私网访问白名单,详情请参见配置实例公网或私网访问白名单。 
使用限制
- 源表和维表支持大于等于6.8.x,但小于8.x版本的Elasticsearch。 
- 结果表仅支持Elasticsearch 6.x、7.x和8.x版本。 
- 仅支持全量Elasticsearch源表,不支持增量Elasticsearch源表。 
语法结构
- 源表 - CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );
- 维表 - CREATE TABLE es_dim( field1 STRING, --作为JOIN时的Key,必须为STRING类型。 field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );说明- 如果指定主键,则维表JOIN时的Key(字段)有且只能有一个,且必须为Elasticsearch对应索引的文档ID。 
- 如果不指定主键,则维表JOIN时的Key可以有一个或多个,需要为Elasticsearch对应索引的文档中的字段。 
- 对于String类型,为了保持兼容性,默认会对表中字段名增加.keyword后缀。如果因此无法匹配到Elasticsearch中的Text字段,可以将配置项ignoreKeywordSuffix配置为true。 
 
- 结果表 - CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', -- 如果是Elasticsearch 6.x版本,填写elasticsearch-6 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );说明- Elasticsearch结果表会根据是否定义了主键,确定是在upsert模式或append模式下工作。 - 如果定义了主键,则主键必须为文档ID,Elasticsearch结果表将在upsert模式下工作,该模式可以处理包含UPDATE和DELETE的消息。 
- 如果未定义主键,Elasticsearch将自动生成随机的文档ID,Elasticsearch结果表将在append模式工作,该模式只能消费INSERT消息。 
 
- 某些类型(例如BYTES、ROW、ARRAY和MAP等)由于没有对应的字符串表示形式,所以不允许其作为主键字段。 
- DDL中的字段均对应Elasticsearch文档中的字段,不支持将文档ID等Meta信息写入Elasticsearch结果表中,因为文档ID等Meta信息由Elasticsearch实例侧维护。 
 
WITH参数
源表
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| connector | 源表类型。 | String | 是 | 无 | 固定值为elasticsearch。 | 
| endPoint | Server地址。 | String | 是 | 无 | 例如: | 
| indexName | 索引名称。 | String | 是 | 无 | 无。 | 
| accessId | Elasticsearch实例的用户名。 | String | 否 | 无 | 默认为空,不进行权限验证。如果定义了accessId,则必须定义非空的accessKey。 重要  为了避免您的用户名和密码信息泄露,建议您使用变量的方式填写,详情请参见项目变量。 | 
| accessKey | Elasticsearch实例的密码。 | String | 否 | 无 | |
| typeNames | Type名称。 | String | 否 | _doc | Elasticsearch 7.0以上版本不建议设置该参数。 | 
| batchSize | 每个scroll请求从Elasticsearch集群获取的最大文档数。 | Int | 否 | 2000 | 无。 | 
| keepScrollAliveSecs | scroll上下文保留的最长时间。 | Int | 否 | 3600 | 单位为秒。 | 
结果表
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| connector | 结果表类型。 | String | 是 | 无 | 固定值为 说明  仅实时计算引擎VVR 8.0.5及以上版本支持配置为 | 
| hosts | Server地址。 | String | 是 | 无 | 例如: | 
| index | 索引名称。 | String | 是 | 无 | Elasticsearch结果表同时支持静态索引和动态索引。在使用静态和动态索引时,您需要注意以下几点: 
 | 
| document-type | 文档类型。 | String | 
 | 无 | 当连接器类型为 | 
| username | 用户名。 | String | 否 | 空 | 默认为空,不进行权限验证。如果定义了username,则必须定义非空的password。 重要  为了避免您的用户名和密码信息泄露,建议您使用变量的方式填写,详情请参见项目变量。 | 
| password | 密码。 | String | 否 | 空 | |
| document-id.key-delimiter | 文档ID的分隔符。 | String | 否 | _ | 在Elasticsearch结果表中,主键用于计算Elasticsearch的文档ID。Elasticsearch结果表通过使用document-id.key-delimiter指定的键分隔符,按照DDL中定义的顺序连接所有主键字段,从而为每一行生成一个文档ID字符串。 说明  文档ID为最多512个字节但不包含空格的字符串。 | 
| failure-handler | Elasticsearch请求失败时的故障处理策略。 | String | 否 | fail | 可选策略如下: 
 | 
| sink.flush-on-checkpoint | 是否在checkpoint时执行flush。 | Boolean | 否 | true | 
 | 
| sink.bulk-flush.backoff.strategy | 如果由于临时请求错误导致flush操作失败,则设置sink.bulk-flush.backoff.strategy指定重试策略。 | Enum | 否 | DISABLED | 
 | 
| sink.bulk-flush.backoff.max-retries | 最大回退重试次数。 | Int | 否 | 无 | 无。 | 
| sink.bulk-flush.backoff.delay | 每次回退尝试之间的延迟。 | Duration | 否 | 无 | 
 | 
| sink.bulk-flush.max-actions | 每个批量请求的最大缓冲操作数。 | Int | 否 | 1000 | 0表示禁用该功能。 | 
| sink.bulk-flush.max-size | 存放请求的缓冲区内存最大值。 | String | 否 | 2 MB | 单位为MB,默认值为2 MB,0 MB表示禁用该功能。 | 
| sink.bulk-flush.interval | flush的间隔。 | Duration | 否 | 1s | 单位为秒,默认值为1s,0s表示禁用该功能。 | 
| connection.path-prefix | 要添加到每个REST通信中的前缀字符串。 | String | 否 | 空 | 无。 | 
| retry-on-conflict | 更新操作中,允许因版本冲突异常而重试的最大次数。超过该次数后将抛出异常导致作业失败。 | Int | 否 | 0 | 说明  
 | 
| routing-fields | 指定一个或多个ES字段名称,用来将文档路由到Elasticsearch的指定分片中。 | String | 否 | 无 | 多个字段名以分号(;)进行分割。如果某个字段数据为空,则该字段会被置为null。 说明  仅实时计算引擎VVR 8.0.6及以上版本,且elasticsearch-7和elasticsearch-8支持该参数。 | 
| sink.delete-strategy | 用来配置收到回撤(-D/-U)类型消息时的行为 | Enum | 否 | DELETE_ROW_ON_PK | 可选行为如下: 
 | 
| sink.ignore-null-when-update | 更新数据时,如果传入的数据字段值为null,判断是否更新对应字段为null,或不更新该字段。 | BOOLEAN | 否 | false | 参数取值如下: 
 说明  仅实时计算引擎VVR 11.1及以上版本支持该参数。 | 
维表
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| connector | 维表类型。 | String | 是 | 无 | 固定值为elasticsearch。 | 
| endPoint | Server地址。 | String | 是 | 无 | 例如: | 
| indexName | 索引名称。 | String | 是 | 无 | 无。 | 
| accessId | Elasticsearch实例的用户名。 | String | 否 | 无 | 默认为空,不进行权限验证。如果定义了accessId,则必须定义非空的accessKey。 重要  为了避免您的用户名和密码信息泄露,建议您使用变量的方式填写,详情请参见项目变量。 | 
| accessKey | Elasticsearch实例的密码。 | String | 否 | 无 | |
| typeNames | Type名称。 | String | 否 | _doc | Elasticsearch 7.0以上版本不建议设置该参数。 | 
| maxJoinRows | 单行数据Join的最多行数。 | Integer | 否 | 1024 | 无。 | 
| cache | 缓存策略。 | String | 否 | None | 支持以下三种缓存策略: 
 | 
| cacheSize | 缓存大小,即缓存多少行数据。 | Long | 否 | 100000 | 仅当cache选择LRU缓存策略时,cacheSize参数生效。 | 
| cacheTTLMs | 缓存失效的超时时间。 | Long | 否 | Long.MAX_VALUE | 单位为毫秒。cacheTTLMs配置和cache配置有关: 
 | 
| ignoreKeywordSuffix | 是否忽略自动为String字段添加的.keyword后缀。 | Boolean | 否 | false | 为了保证兼容性,Flink将Elasticsearch中的Text类型转换为String,并默认在String类型字段名后增加.keyword后缀。 参数取值如下: 
 | 
| cacheEmpty | 是否缓存物理维表中查找结果为空的结果。 | Boolean | 否 | true | 仅当cache选择LRU缓存策略时,cacheEmpty参数生效。 | 
| queryMaxDocs | 非主键维表的输入端每条数据到来后,查询ElasticSearch Server时返回的最大文档条数。 | Integer | 否 | 10000 | 默认值10000是ElasticSearch Server返回文档条数的最大限制,该配置项的取值不能超过这个上限。 说明  
 | 
类型映射
Flink以JSON来解析Elasticsearch数据,详情请参见数据类型映射关系。
使用示例
- 源表示例 - CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;
- 维表示例 - CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;
- 结果表示例1 - CREATE TEMPORARY TABLE datagen_source ( id STRING, name STRING, uv BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( user_id STRING, user_name STRING, uv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, name, uv FROM datagen_source;
- 结果表示例2 - CREATE TEMPORARY TABLE datagen_source( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> > ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> >, PRIMARY KEY (id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, details FROM datagen_source;