This topic explains how to use the Elasticsearch connector.
Background
Alibaba Cloud Elasticsearch is compatible with open source Elasticsearch and includes commercial features such as Security, Machine Learning, Graph, and APM for data analysis and data search. It provides enterprise-grade services, including access control, security monitoring and alerting, and automated report generation.
The following table describes the capabilities of the Elasticsearch connector.
|
Item |
Description |
|
Table type |
Source table, dimension table, and sink table |
|
Running mode |
Batch mode and streaming mode |
|
Data format |
JSON |
|
Metric |
|
|
API type |
DataStream API and SQL |
|
Data update or deletion in a sink table |
Supported |
Prerequisites
-
You have created an Elasticsearch index. For more information, see Getting started.
-
You have configured a public or private IP address whitelist for the Elasticsearch instance. For more information, see Manage IP address whitelists.
Limitations
-
Source tables and dimension tables support Elasticsearch 6.8.x or later.
NoteUsing Elasticsearch 8.x with source and dimension tables requires VVR 11.6 or later.
-
Sink tables support only Elasticsearch 6.x, 7.x, and 8.x.
-
Only full Elasticsearch source tables are supported; incremental ones are not.
Syntax
-
Source table
Elasticsearch 8.x
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch-8', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );Other versions
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' ); -
Dimension table
Elasticsearch 8.x
CREATE TABLE es_dim( field1 STRING, -- Must be of the STRING type when used as a key for a JOIN. field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch-8', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );Other versions
CREATE TABLE es_dim( field1 STRING, -- Must be of the STRING type when used as a key for a JOIN. field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );Note-
If a primary key is specified, only one field can be the join key, and it must be the document ID in the corresponding Elasticsearch index.
-
If no primary key is specified, you can use one or more fields as join keys. These keys must be fields in the corresponding Elasticsearch documents.
-
For
STRINGfields, the connector appends the.keywordsuffix to field names by default for compatibility. If this prevents matching withTEXTfields in Elasticsearch, set theignoreKeywordSuffixoption totrue.
-
-
Sink table
CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', -- If you use Elasticsearch 6.x, set this to 'elasticsearch-6'. 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );Note-
An Elasticsearch sink table operates in either
upsert modeorappend mode, depending on whether a primary key is defined.-
If a primary key is defined, its value is used as the document ID. The sink table then operates in
upsert modeand can processUPDATEandDELETEoperations. -
If no primary key is defined, Elasticsearch automatically generates a random document ID. The sink table then operates in
append modeand can only consumeINSERTmessages.
-
-
Data types such as
BYTES,ROW,ARRAY, andMAPdo not have a corresponding string representation. Therefore, you cannot use fields of these data types as a primary key. -
Fields in the
DDLcorrespond to fields in an Elasticsearch document. You cannot write metadata, such as the document ID, to the sink table because the Elasticsearch cluster maintains this metadata.
-
WITH options
Source table
|
Parameter |
Description |
Type |
Required |
Default |
Remarks |
|
connector |
The type of the source table. |
String |
Yes |
None |
Valid values: Note
Only VVR 11.6 or later supports the |
|
endPoint |
The server address of the Elasticsearch cluster. |
String |
Yes |
None |
Legacy option name. |
|
hosts |
Uses |
||||
|
indexName |
The name of the index. |
String |
Yes |
None |
Legacy option name. |
|
index |
For use with |
||||
|
accessId |
The username for authentication. |
String |
No |
None |
By default, this parameter is empty and authentication is not performed. If you specify accessId, you must specify a non-empty accessKey. Note
Important
To prevent exposure of your username and password, we recommend that you use project variables. For more information, see project variables. |
|
username |
|||||
|
accessKey |
The password for authentication. |
String |
No |
None |
|
|
password |
|||||
|
typeNames |
The name of the type. |
String |
No |
_doc |
We recommend that you do not configure this option for Elasticsearch 7.0 or later. |
|
batchSize |
The maximum number of documents to retrieve from the Elasticsearch cluster per scroll request. |
Int |
No |
2000 |
None |
|
keepScrollAliveSecs |
The maximum time to keep the scroll context alive. |
Int |
No |
3600 |
Unit: seconds. |
Sink table
|
Parameter |
Description |
Type |
Required |
Default |
Remarks |
|
connector |
The type of the sink table. |
String |
Yes |
None |
The value must be Note
Only VVR 8.0.5 or later supports the |
|
hosts |
The server address of the Elasticsearch cluster. |
String |
Yes |
None |
Example: |
|
index |
The name of the index. |
String |
Yes |
None |
The sink table supports both static and dynamic indexes:
|
|
document-type |
The document type. |
String |
|
None |
When the connector type is |
|
username |
The username for authentication. |
String |
No |
None |
By default, authentication is disabled. If you specify Important
To prevent exposure of your username and password, we recommend that you use project variables. For more information, see project variables. |
|
password |
The password for authentication. |
String |
No |
None |
|
|
document-id.key-delimiter |
The delimiter for the document ID. |
String |
No |
_ |
The connector uses the primary key to generate the document ID. The connector concatenates all primary key fields in the order defined in the DDL, using the delimiter specified by document-id.key-delimiter, to create a document ID string for each row. Note
A document ID is a string of up to 512 bytes that does not contain spaces. |
|
failure-handler |
The failure handling policy for failed Elasticsearch requests. |
String |
No |
fail |
Valid policies:
|
|
sink.flush-on-checkpoint |
Specifies whether to flush on checkpoint. |
Boolean |
No |
true |
|
|
sink.bulk-flush.backoff.strategy |
If the flush operation fails due to a temporary request error, set sink.bulk-flush.backoff.strategy to specify the retry strategy. |
Enum |
No |
DISABLED |
|
|
sink.bulk-flush.backoff.max-retries |
The maximum number of retries. |
Int |
No |
None |
None |
|
sink.bulk-flush.backoff.delay |
The delay between retry attempts. |
Duration |
No |
None |
|
|
sink.bulk-flush.max-actions |
The maximum number of buffered actions for each bulk request. |
Int |
No |
1000 |
A value of 0 disables this feature. |
|
sink.bulk-flush.max-size |
The maximum memory size of the request buffer. |
String |
No |
2 MB |
The unit is MB. The default value is 2 MB. A value of 0 disables this feature. |
|
sink.bulk-flush.interval |
The flush interval. |
Duration |
No |
1s |
The unit is seconds. The default value is 1s. A value of 0s disables this feature. |
|
connection.path-prefix |
A string to be prepended to every REST communication path. |
String |
No |
None |
None |
|
retry-on-conflict |
The maximum number of retries for an update operation if a version conflict occurs. If the number of retries exceeds this value, the job fails with an exception. |
Int |
No |
0 |
Note
|
|
routing-fields |
Specifies one or more Elasticsearch field names used to route a document to a specific shard. |
String |
No |
None |
Separate multiple field names with a semicolon (;). If a field's data is empty, the field is set to null. Note
This option is supported only in VVR 8.0.6 or later, for |
|
sink.delete-strategy |
Configures how the sink handles a retraction message (-D for DELETE or -U for UPDATE_BEFORE). |
Enum |
No |
DELETE_ROW_ON_PK |
Valid strategies:
|
|
sink.ignore-null-when-update |
When updating data, specifies whether to update a field to |
BOOLEAN |
No |
false |
Valid values:
Note
This option is supported only in VVR 11.1 or later. |
|
connection.request-timeout |
The timeout for requesting a connection from the connection manager. |
Duration |
No |
None |
Example:
Note
This option is supported only in VVR 11.7 or later. |
|
connect.timeout |
The timeout for establishing a connection. |
Duration |
No |
None |
Example:
Note
This option is supported only in VVR 11.7 or later. |
|
socket.timeout |
The timeout for waiting for data, which is the maximum period of inactivity between two consecutive data packets. |
Duration |
No |
None |
Example:
Note
This option is supported only in VVR 11.7 or later. |
|
connection.keep-alive |
The maximum duration a connection can remain idle before the system closes it. If this option is not set, the server's |
Duration |
No |
None |
Example:
Note
This option is supported only in VVR 11.7 or later. |
|
sink.bulk-flush.update.doc_as_upsert |
Specifies whether to treat the document as an upsert document in an update request. |
BOOLEAN |
No |
false |
Valid values:
According to https://github.com/elastic/elasticsearch/issues/105804, Elasticsearch ingest pipelines do not support partial updates for bulk update requests. If you want to use an ingest pipeline, set this option to true. Note
This option is supported only in VVR 11.5 or later. |
Dimension table
|
Parameter |
Description |
Type |
Required |
Default |
Remarks |
|
connector |
The type of the dimension table. |
String |
Yes |
None |
Valid values: Note
Only VVR 11.6 or later supports the |
|
endPoint |
The server address of the Elasticsearch cluster. |
String |
Yes |
None |
Legacy option name. |
|
hosts |
For use with |
||||
|
indexName |
The name of the index. |
String |
Yes |
None |
Legacy option name. |
|
index |
For use with |
||||
|
accessId |
The username for authentication. |
String |
No |
None |
By default, this parameter is empty and authentication is not performed. If you specify accessId, you must specify a non-empty accessKey. Note
Important
To prevent exposure of your username and password, we recommend that you use project variables. For more information, see project variables. |
|
username |
|||||
|
accessKey |
The password for authentication. |
String |
No |
None |
|
|
password |
|||||
|
typeNames |
The name of the type. |
String |
No |
_doc |
We recommend that you do not configure this option for Elasticsearch 7.0 or later. |
|
maxJoinRows |
The maximum number of rows to join for a single lookup. |
Integer |
No |
1024 |
None |
|
cache |
The caching strategy. |
String |
No |
None |
Valid values:
|
|
cacheSize |
The size of the cache, specified as the number of rows. |
Long |
No |
100000 |
The cacheSize parameter takes effect only when the LRU cache policy is selected for cache. |
|
cacheTTLMs |
The time-to-live (TTL) for the cache. |
Long |
No |
Long.MAX_VALUE |
Unit: milliseconds. The behavior of cacheTTLMs depends on the cache setting:
|
|
ignoreKeywordSuffix |
Specifies whether to ignore the .keyword suffix that is automatically appended to STRING fields. |
Boolean |
No |
false |
For compatibility, Flink converts Valid values:
|
|
cacheEmpty |
Specifies whether to cache empty results from lookups in the physical dimension table. |
Boolean |
No |
true |
The cacheEmpty parameter is effective only when the cache uses the LRU cache policy. |
|
queryMaxDocs |
For non-primary key dimension tables, this is the maximum number of documents that the Elasticsearch server returns for each lookup query. |
Integer |
No |
10000 |
The default value of 10,000 matches the maximum number of documents an Elasticsearch server can return per query. This value cannot exceed this limit. Note
|
Type mapping
Flink parses Elasticsearch data as JSON. For details, see data type mapping.
Examples
-
Source table example
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source; -
Dimension table example
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id; -
Sink table example 1
This example writes text content to Elasticsearch after text vectorization.
NoteCreate an index mapping in Elasticsearch in advance. Set the data type of the
embeddingfield todense_vectorand specify the dimensions. Otherwise, Elasticsearch may infer it as a regular array type.CREATE TEMPORARY TABLE datagen_source ( id STRING, content STRING, embedding ARRAY<FLOAT> ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, content STRING, embedding ARRAY<FLOAT>, PRIMARY KEY (id) NOT ENFORCED -- The primary key is optional. If you define a primary key, its value becomes the document ID. Otherwise, a random document ID is generated. ) WITH ( 'connector' = 'elasticsearch-8', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, content, embedding FROM datagen_source; -
Sink table example 2
The connector supports writing complex types such as
ROW,ARRAY, andMAPto Elasticsearch.CREATE TEMPORARY TABLE datagen_source( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> > ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> >, PRIMARY KEY (id) NOT ENFORCED -- The primary key is optional. If you define a primary key, its value becomes the document ID. Otherwise, a random document ID is generated. ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, details FROM datagen_source;