Elasticsearch

更新时间:
复制 MD 格式

This topic explains how to use the Elasticsearch connector.

Background

Alibaba Cloud Elasticsearch is compatible with open source Elasticsearch and includes commercial features such as Security, Machine Learning, Graph, and APM for data analysis and data search. It provides enterprise-grade services, including access control, security monitoring and alerting, and automated report generation.

The following table describes the capabilities of the Elasticsearch connector.

Item

Description

Table type

Source table, dimension table, and sink table

Running mode

Batch mode and streaming mode

Data format

JSON

Metric

Metric

  • Source table

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • Dimension table

    None

  • Sink table (for Ververica Runtime (VVR) 6.0.6 and later)

    • numRecordsOut

    • numRecordsOutPerSecond

Note

For more information about these metrics, see Metrics.

API type

DataStream API and SQL

Data update or deletion in a sink table

Supported

Prerequisites

  • You have created an Elasticsearch index. For more information, see Getting started.

  • You have configured a public or private IP address whitelist for the Elasticsearch instance. For more information, see Manage IP address whitelists.

Limitations

  • Source tables and dimension tables support Elasticsearch 6.8.x or later.

    Note

    Using Elasticsearch 8.x with source and dimension tables requires VVR 11.6 or later.

  • Sink tables support only Elasticsearch 6.x, 7.x, and 8.x.

  • Only full Elasticsearch source tables are supported; incremental ones are not.

Syntax

  • Source table

    Elasticsearch 8.x

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch-8',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );

    Other versions

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • Dimension table

    Elasticsearch 8.x

    CREATE TABLE es_dim(
      field1 STRING, -- Must be of the STRING type when used as a key for a JOIN.
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch-8',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );

    Other versions

    CREATE TABLE es_dim(
      field1 STRING, -- Must be of the STRING type when used as a key for a JOIN.
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    Note
    • If a primary key is specified, only one field can be the join key, and it must be the document ID in the corresponding Elasticsearch index.

    • If no primary key is specified, you can use one or more fields as join keys. These keys must be fields in the corresponding Elasticsearch documents.

    • For STRING fields, the connector appends the .keyword suffix to field names by default for compatibility. If this prevents matching with TEXT fields in Elasticsearch, set the ignoreKeywordSuffix option to true.

  • Sink table

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7', -- If you use Elasticsearch 6.x, set this to 'elasticsearch-6'.
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    Note
    • An Elasticsearch sink table operates in either upsert mode or append mode, depending on whether a primary key is defined.

      • If a primary key is defined, its value is used as the document ID. The sink table then operates in upsert mode and can process UPDATE and DELETE operations.

      • If no primary key is defined, Elasticsearch automatically generates a random document ID. The sink table then operates in append mode and can only consume INSERT messages.

    • Data types such as BYTES, ROW, ARRAY, and MAP do not have a corresponding string representation. Therefore, you cannot use fields of these data types as a primary key.

    • Fields in the DDL correspond to fields in an Elasticsearch document. You cannot write metadata, such as the document ID, to the sink table because the Elasticsearch cluster maintains this metadata.

WITH options

Source table

Parameter

Description

Type

Required

Default

Remarks

connector

The type of the source table.

String

Yes

None

Valid values: elasticsearch or elasticsearch-8.

Note

Only VVR 11.6 or later supports the elasticsearch-8 value.

endPoint

The server address of the Elasticsearch cluster.

String

Yes

None

Legacy option name.

hosts

Uses elasticsearch-8.

indexName

The name of the index.

String

Yes

None

Legacy option name.

index

For use with elasticsearch-8.

accessId

The username for authentication.

String

No

None

By default, this parameter is empty and authentication is not performed. If you specify accessId, you must specify a non-empty accessKey.

Note

elasticsearch-8 is updated to use the username and password parameters.

Important

To prevent exposure of your username and password, we recommend that you use project variables. For more information, see project variables.

username

accessKey

The password for authentication.

String

No

None

password

typeNames

The name of the type.

String

No

_doc

We recommend that you do not configure this option for Elasticsearch 7.0 or later.

batchSize

The maximum number of documents to retrieve from the Elasticsearch cluster per scroll request.

Int

No

2000

None

keepScrollAliveSecs

The maximum time to keep the scroll context alive.

Int

No

3600

Unit: seconds.

Sink table

Parameter

Description

Type

Required

Default

Remarks

connector

The type of the sink table.

String

Yes

None

The value must be elasticsearch-6, elasticsearch-7 , or elasticsearch-8.

Note

Only VVR 8.0.5 or later supports the elasticsearch-8 value.

hosts

The server address of the Elasticsearch cluster.

String

Yes

None

Example: 127.0.0.1:XXXX.

index

The name of the index.

String

Yes

None

The sink table supports both static and dynamic indexes:

  • For a static index, the value must be a plain string, such as myusers. All records are written to the myusers index.

  • For a dynamic index, you can use {field_name} to reference field values from the record to dynamically generate the target index. You can also use {field_name|date_format_string} to convert field values of the TIMESTAMP, DATE, and TIME data types to the format specified by date_format_string. The date_format_string is compatible with Java's DateTimeFormatter. For example, if you set the index to myusers-{log_ts|yyyy-MM-dd}, a record with the log_ts field value 2020-03-27 12:25:55 is written to the myusers-2020-03-27 index.

document-type

The document type.

String

  • elasticsearch-6: Yes

  • elasticsearch-7: Not supported

None

When the connector type is elasticsearch-6, the value of this parameter must be consistent with the value of the type parameter in Elasticsearch.

username

The username for authentication.

String

No

None

By default, authentication is disabled. If you specify username, you must also specify a non-empty password.

Important

To prevent exposure of your username and password, we recommend that you use project variables. For more information, see project variables.

password

The password for authentication.

String

No

None

document-id.key-delimiter

The delimiter for the document ID.

String

No

_

The connector uses the primary key to generate the document ID. The connector concatenates all primary key fields in the order defined in the DDL, using the delimiter specified by document-id.key-delimiter, to create a document ID string for each row.

Note

A document ID is a string of up to 512 bytes that does not contain spaces.

failure-handler

The failure handling policy for failed Elasticsearch requests.

String

No

fail

Valid policies:

  • fail (default): Fails the job if a request fails.

  • ignore: Ignores the failure and drops the request.

  • retry-rejected: Re-adds requests that failed due to a full queue.

  • A custom class name: Uses an ActionRequestFailureHandler subclass for failure handling.

sink.flush-on-checkpoint

Specifies whether to flush on checkpoint.

Boolean

No

true

  • true: The default value.

  • false: If disabled, the connector does not wait for all pending requests to be acknowledged during a checkpoint. This means the connector does not provide an at-least-once delivery guarantee.

sink.bulk-flush.backoff.strategy

If the flush operation fails due to a temporary request error, set sink.bulk-flush.backoff.strategy to specify the retry strategy.

Enum

No

DISABLED

  • DISABLED (default): No retries are performed. The job fails on the first request error.

  • CONSTANT: A constant backoff strategy where the wait time between retries is always the same.

  • EXPONENTIAL: An exponential backoff strategy where the wait time between retries increases exponentially.

sink.bulk-flush.backoff.max-retries

The maximum number of retries.

Int

No

None

None

sink.bulk-flush.backoff.delay

The delay between retry attempts.

Duration

No

None

  • For a constant backoff strategy, this value is the delay between each retry.

  • For an exponential backoff strategy, this value is the initial baseline delay.

sink.bulk-flush.max-actions

The maximum number of buffered actions for each bulk request.

Int

No

1000

A value of 0 disables this feature.

sink.bulk-flush.max-size

The maximum memory size of the request buffer.

String

No

2 MB

The unit is MB. The default value is 2 MB. A value of 0 disables this feature.

sink.bulk-flush.interval

The flush interval.

Duration

No

1s

The unit is seconds. The default value is 1s. A value of 0s disables this feature.

connection.path-prefix

A string to be prepended to every REST communication path.

String

No

None

None

retry-on-conflict

The maximum number of retries for an update operation if a version conflict occurs. If the number of retries exceeds this value, the job fails with an exception.

Int

No

0

Note
  • This option is supported only in VVR 4.0.13 or later.

  • This option takes effect only when a primary key is defined.

routing-fields

Specifies one or more Elasticsearch field names used to route a document to a specific shard.

String

No

None

Separate multiple field names with a semicolon (;). If a field's data is empty, the field is set to null.

Note

This option is supported only in VVR 8.0.6 or later, for elasticsearch-7 and elasticsearch-8.

sink.delete-strategy

Configures how the sink handles a retraction message (-D for DELETE or -U for UPDATE_BEFORE).

Enum

No

DELETE_ROW_ON_PK

Valid strategies:

  • DELETE_ROW_ON_PK (default): Ignores -U messages but deletes the row (document) corresponding to the primary key when it receives a -D message.

  • IGNORE_DELETE: Ignores both -U and -D messages. No retraction occurs in the Elasticsearch sink.

  • NON_PK_FIELD_TO_NULL: Ignores -U messages. However, when a -D message is received, this setting modifies the row (document) for the primary key: the primary key value remains the same, and all other non-primary key values in the table schema are set to NULL. This is mainly used for partial updates when multiple sinks write to the same Elasticsearch table simultaneously.

  • CHANGELOG_STANDARD: Similar to DELETE_ROW_ON_PK, but it also deletes the row (document) corresponding to the primary key when a -U message is received.

    Note

    This option is supported only in VVR 8.0.8 or later.

sink.ignore-null-when-update

When updating data, specifies whether to update a field to null or leave it unchanged if the incoming field value is null.

BOOLEAN

No

false

Valid values:

  • true: The field is not updated. This value is supported only when a primary key is set for the Flink table and the Elasticsearch data format is JSON.

  • false: The field is updated to null.

Note

This option is supported only in VVR 11.1 or later.

connection.request-timeout

The timeout for requesting a connection from the connection manager.

Duration

No

None

Example:

'connection.request-timeout' = '1 min' -- 1 minute
'connection.request-timeout' = '500ms' -- 500 milliseconds
Note

This option is supported only in VVR 11.7 or later.

connect.timeout

The timeout for establishing a connection.

Duration

No

None

Example:

'connect.timeout' = '1 min' -- 1 minute
'connect.timeout' = '500ms' -- 500 milliseconds
Note

This option is supported only in VVR 11.7 or later.

socket.timeout

The timeout for waiting for data, which is the maximum period of inactivity between two consecutive data packets.

Duration

No

None

Example:

'socket.timeout' = '1 min' -- 1 minute
'socket.timeout' = '500ms' -- 500 milliseconds
Note

This option is supported only in VVR 11.7 or later.

connection.keep-alive

The maximum duration a connection can remain idle before the system closes it. If this option is not set, the server's Keep-Alive response header determines the duration. If the server does not send a Keep-Alive header, the connection remains active indefinitely.

Duration

No

None

Example:

'connection.keep-alive' = '1 min' -- 1 minute
'connection.keep-alive' = '500ms' -- 500 milliseconds
Note

This option is supported only in VVR 11.7 or later.

sink.bulk-flush.update.doc_as_upsert

Specifies whether to treat the document as an upsert document in an update request.

BOOLEAN

No

false

Valid values:

  • true: Sets the doc_as_upsert field of the update request to true.

  • false: Populates the upsert field of the update request with the document.

According to https://github.com/elastic/elasticsearch/issues/105804, Elasticsearch ingest pipelines do not support partial updates for bulk update requests. If you want to use an ingest pipeline, set this option to true.

Note

This option is supported only in VVR 11.5 or later.

Dimension table

Parameter

Description

Type

Required

Default

Remarks

connector

The type of the dimension table.

String

Yes

None

Valid values: elasticsearch or elasticsearch-8.

Note

Only VVR 11.6 or later supports the elasticsearch-8 value.

endPoint

The server address of the Elasticsearch cluster.

String

Yes

None

Legacy option name.

hosts

For use with elasticsearch-8.

indexName

The name of the index.

String

Yes

None

Legacy option name.

index

For use with elasticsearch-8.

accessId

The username for authentication.

String

No

None

By default, this parameter is empty and authentication is not performed. If you specify accessId, you must specify a non-empty accessKey.

Note

elasticsearch-8 now uses the username and password parameters.

Important

To prevent exposure of your username and password, we recommend that you use project variables. For more information, see project variables.

username

accessKey

The password for authentication.

String

No

None

password

typeNames

The name of the type.

String

No

_doc

We recommend that you do not configure this option for Elasticsearch 7.0 or later.

maxJoinRows

The maximum number of rows to join for a single lookup.

Integer

No

1024

None

cache

The caching strategy.

String

No

None

Valid values:

  • ALL: Caches all data from the dimension table. Before the job starts, the system loads all data from the dimension table into the cache. Subsequent lookups are served directly from the cache. If a key is not found, the system considers it non-existent. The system reloads the entire cache when the TTL expires.

  • LRU: Caches a portion of the data from the dimension table. When a record from the source table arrives, the system first looks up the data in the cache. If it is a cache miss, it queries the physical dimension table.

  • None: No caching.

cacheSize

The size of the cache, specified as the number of rows.

Long

No

100000

The cacheSize parameter takes effect only when the LRU cache policy is selected for cache.

cacheTTLMs

The time-to-live (TTL) for the cache.

Long

No

Long.MAX_VALUE

Unit: milliseconds. The behavior of cacheTTLMs depends on the cache setting:

  • When cache is LRU, cacheTTLMs is the TTL for cache entries. By default, entries do not expire.

  • When cache is ALL, cacheTTLMs is the interval for reloading the cache. By default, the cache is not reloaded.

ignoreKeywordSuffix

Specifies whether to ignore the .keyword suffix that is automatically appended to STRING fields.

Boolean

No

false

For compatibility, Flink converts Text types from Elasticsearch to STRING and appends a .keyword suffix to the field name by default.

Valid values:

  • true: Ignores the suffix.

    If the suffix prevents matching with Text type fields in Elasticsearch, set this option to true.

  • false: Does not ignore the suffix.

cacheEmpty

Specifies whether to cache empty results from lookups in the physical dimension table.

Boolean

No

true

The cacheEmpty parameter is effective only when the cache uses the LRU cache policy.

queryMaxDocs

For non-primary key dimension tables, this is the maximum number of documents that the Elasticsearch server returns for each lookup query.

Integer

No

10000

The default value of 10,000 matches the maximum number of documents an Elasticsearch server can return per query. This value cannot exceed this limit.

Note
  • This option is supported only in VVR 8.0.8 or later.

  • This option takes effect only for non-primary key dimension tables, as data in primary key tables is unique.

  • A large default value helps ensure query correctness but increases memory usage during Elasticsearch queries. If you encounter memory issues, you can reduce this value to optimize memory usage.

Type mapping

Flink parses Elasticsearch data as JSON. For details, see data type mapping.

Examples

  • Source table example

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • Dimension table example

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • Sink table example 1

    This example writes text content to Elasticsearch after text vectorization.

    Note

    Create an index mapping in Elasticsearch in advance. Set the data type of the embedding field to dense_vector and specify the dimensions. Otherwise, Elasticsearch may infer it as a regular array type.

        CREATE TEMPORARY TABLE datagen_source (
          id STRING,
          content STRING,
          embedding ARRAY<FLOAT>
        ) WITH (
          'connector' = 'datagen'
        );
    
        CREATE TEMPORARY TABLE es_sink (
          id STRING,
          content STRING,
          embedding ARRAY<FLOAT>,
          PRIMARY KEY (id) NOT ENFORCED -- The primary key is optional. If you define a primary key, its value becomes the document ID. Otherwise, a random document ID is generated.
        ) WITH (
          'connector' = 'elasticsearch-8',
          'hosts' = '<yourHosts>',
          'index' = '<yourIndex>',
          'username' ='${secret_values.ak_id}',
          'password' ='${secret_values.ak_secret}'
        );
    
        INSERT INTO es_sink
        SELECT id, content, embedding
        FROM datagen_source;
  • Sink table example 2

    The connector supports writing complex types such as ROW, ARRAY, and MAP to Elasticsearch.

    CREATE TEMPORARY TABLE datagen_source(  
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >
    ) WITH (  
        'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >, 
      PRIMARY KEY (id) NOT ENFORCED  -- The primary key is optional. If you define a primary key, its value becomes the document ID. Otherwise, a random document ID is generated.
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, details
    FROM datagen_source;