本文为您介绍如何使用Elasticsearch连接器。
背景信息
阿里云Elasticsearch兼容开源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商业功能,致力于数据分析、数据搜索等场景服务。为您提供企业级权限管控、安全监控告警、自动报表生成等场景服务。
Elasticsearch连接器支持的信息如下:
类别 | 详情 |
---|---|
支持类型 | 源表、维表和结果表 |
运行模式 | 批模式和流模式 |
数据格式 | JSON |
特有监控指标 |
说明 指标的含义及如何查看监控指标,请参见查看监控指标。 |
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
- 已创建Elasticsearch索引,详情请参见创建示例。
- 已配置Elasticsearch公网或私网访问白名单,详情请参见配置实例公网或私网访问白名单。
使用限制
- 源表和维表仅支持Elasticsearch 5.5及以上版本。
- 结果表仅支持Elasticsearch 6.x和7.x版本。
- 仅Flink计算引擎VVR 2.0.0及以上版本支持Elasticsearch连接器。
- 仅支持全量Elasticsearch源表,不支持增量Elasticsearch源表。
语法结构
- 源表
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );
说明 DDL中的字段均对应Elasticsearch文档中的字段,不支持将文档ID写入表中。 - 维表
CREATE TABLE es_dim( field1 STRING, --作为JOIN时的Key,必须为STRING类型。 field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );
说明- 如果指定主键,则维表JOIN时的Key有且只能有一个,且为Elasticsearch对应索引的文档ID。
- 如果不指定主键,则维表JOIN时的Key可以有一个或多个,且为Elasticsearch对应索引的文档中的字段。
- 结果表
CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。 ) WITH ( 'connector' = 'elasticsearch-7', -- 如果是Elasticsearch 6.x版本,填写elasticsearch-6 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );
说明 DDL中的字段均对应Elasticsearch文档中的字段,不支持将文档ID写入表中。
WITH参数
- 源表
参数 说明 数据类型 是否必填 默认值 备注 connector 源表类型。 String 是 无 固定值为elasticsearch。 endPoint Server地址。 String 是 无 例如: http://127.0.0.1:XXXX
。indexName 索引名称。 String 是 无 无。 accessId Elasticsearch实例的用户名。 String 否 无 无。 accessKey Elasticsearch实例的密码。 String 否 无 无。 typeNames Type名称。 String 否 _doc Elasticsearch 7.0以上版本不建议设置该参数。 batchSize 每个scroll请求从Elasticsearch集群获取的最大文档数。 Int 否 2000 无。 keepScrollAliveSecs scroll上下文保留的最长时间。 Int 否 3600 单位为秒。 - 结果表
参数 说明 数据类型 是否必填 默认值 备注 connector 结果表类型。 String 是 无 固定值为 elasticsearch-6
或elasticsearch-7
。hosts Server地址。 String 是 无 例如: 127.0.0.1:XXXX
。index 索引名称。 String 是 无 无。 document-type 文档类型。 String - elasticsearch-6:必填
- elasticsearch-7:不支持
无 当连接器类型为 elasticsearch-6
时,此处参数取值需要和Elasticsearch侧的type参数取值保持一致。username 用户名。 String 否 空 默认为空,不进行权限验证。 password 密码。 String 否 空 如果定义了username,则必须定义非空的password。 document-id.key-delimiter 文档ID的分隔符。 String 否 _ 无。 failure-handler Elasticsearch请求失败时的故障处理策略。 String 否 fail 可选策略如下: - fail(默认值):如果请求失败,则作业失败。
- ignore:忽略失败并删除请求。
- retry-rejected:重新添加由于队列容量满而失败的请求。
- custom class name:用于使用ActionRequestFailureHandler子类进行故障处理。
sink.flush-on-checkpoint 是否在checkpoint时执行flush。 Boolean 否 true - true:默认值。
- false:禁用该功能后,在Elasticsearch进行Checkpoint时,连接器将不等待确认所有pending请求是否已完成,故连接器不会为请求提供At-least-once保证。
sink.bulk-flush.backoff.strategy 如果由于临时请求错误导致flush操作失败,则设置sink.bulk-flush.backoff.strategy指定重试策略。 Enum 否 DISABLED - DISABLED(默认值):不执行重试,即第一次请求错误后失败。
- CONSTANT:常量回退,即每次回退等待时间相同。
- EXPONENTIAL:指数回退,即每次回退等待时间指数递增。
sink.bulk-flush.backoff.max-retries 最大回退重试次数。 Int 否 无 无。 sink.bulk-flush.backoff.delay 每次回退尝试之间的延迟。 Duration 否 无 - 对于CONSTANT回退策略:该值为每次重试之间的延迟。
- 对于EXPONENTIAL回退策略:该值为初始基准延迟。
sink.bulk-flush.max-actions 每个批量请求的最大缓冲操作数。 Int 否 1000 0表示禁用该功能。 sink.bulk-flush.max-size 存放请求的缓冲区内存最大值。 String 否 2 MB 单位为MB,默认值为2 MB,0 MB表示禁用该功能。 sink.bulk-flush.interval flush的间隔。 Duration 否 1s 单位为秒,默认值为1s,0s表示禁用该功能。 connection.path-prefix 要添加到每个REST通信中的前缀字符串。 String 否 空 无。 retry-on-conflict 更新操作中,允许因版本冲突异常而重试的最大次数。超过该次数后将抛出异常导致作业失败。 Int 否 0 说明- 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
- 该参数仅在定义了主键的情况下生效。
- 维表
参数 说明 数据类型 是否必填 默认值 备注 connector 维表类型。 String 是 无 固定值为elasticsearch。 endPoint Server地址。 String 是 无 例如: http://127.0.0.1:XXXX
。indexName 索引名称。 String 是 无 无。 accessId Elasticsearch实例的用户名。 String 否 无 无。 accessKey Elasticsearch实例的密码。 String 否 无 无。 typeNames Type名称。 String 否 _doc Elasticsearch 7.0以上版本不建议设置该参数。 maxJoinRows 单行数据Join的最多行数。 Integer 否 1024 无。 cache 缓存策略。 String 否 ALL 支持以下三种缓存策略: - ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
- LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。
- None:无缓存。
cacheSize 缓存大小,即缓存多少行数据。 Long 否 100000 仅当cache选择LRU缓存策略时,cacheSize参数生效。 cacheTTLMs 缓存失效的超时时间。 Long 否 Long.MAX_VALUE 单位为毫秒。cacheTTLMs配置和cache配置有关: - 如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间,默认不过期。
- 如果cache配置为ALL,则cacheTTLMs为设置缓存重新加载的间隔时间,默认不重新加载。
类型映射
Flink以JSON来解析Elasticsearch数据,详情请参见数据类型映射关系。
使用示例
- 源表示例
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessSecret>', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;
- 维表示例
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessSecret>', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;
- 结果表示例
CREATE TEMPORARY TABLE datagen_source ( id STRING, name STRING, uv BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( user_id STRING, user_name STRING, uv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='<yourElasticsearch.accessId>', 'password' ='<yourElasticsearch.accessKey>' ); INSERT INTO es_sink SELECT id, name, uv FROM datagen_source;
文档ID
Elasticsearch结果表会根据是否定义了主键确定是在upsert模式或append模式下工作。
- 如果定义了主键,Elasticsearch结果表将在upsert模式下工作,该模式可以消费包含UPDATE和DELETE的消息。
- 如果未定义主键,Elasticsearch结果表将在append模式工作,该模式只能消费INSERT消息。
在Elasticsearch结果表中,主键用于计算Elasticsearch的文档ID。Elasticsearch结果表通过使用document-id.key-delimiter指定的键分隔符,按照DDL中定义的顺序连接所有主键字段,从而为每一行生成一个文档ID字符串。
某些类型(例如BYTES、ROW、ARRAY和MAP等)由于没有对应的字符串表示形式,所以不允许其作为主键字段。如果未指定主键,Elasticsearch将自动生成随机的文档ID。
说明 文档ID为最多512个字节但不包含空格的字符串。
索引
Elasticsearch结果表同时支持静态索引和动态索引。在使用静态和动态索引时,您需要注意以下几点:
- 如果使用静态索引,则索引选项值应为纯字符串,例如
myusers
,所有记录都将被写入myusers
索引。 - 如果使用动态索引,可以使用
{field_name}
引用记录中的字段值以动态生成目标索引。您还可以使用{field_name|date_format_string}
将TIMESTAMP、DATE和TIME类型的字段值转换为date_format_string
指定的格式。date_format_string
与Java的DateTimeFormatter兼容。例如,如果设置为myusers-{log_ts|yyyy-MM-dd}
,则log_ts字段值为2020-03-27 12:25:55
的记录将被写入myusers-2020-03-27
索引。