JOIN statements for dimension tables

更新时间:
复制 MD 格式

For each piece of streaming data, you can join it with an external dimension table data source. Realtime Compute for Apache Flink uses this capability to enrich streams with dimension data.

Overview of dimension table JOIN

A dimension table JOIN (also called a Lookup Join) queries an external dimension table at processing time for each piece of stream data, and enriches the main stream with dimension columns. It is commonly used to add dictionary or dimension information to event streams.

Key aspects of a dimension table JOIN:

  • Syntax: Use FOR SYSTEM_TIME AS OF PROCTIME() to indicate that each row of the main stream queries the current data of the dimension table at processing time, not a snapshot.

  • Cache strategy: Cache dimension data at the connector layer to reduce pressure on the external system.

  • Lookup behavior: Use a LOOKUP hint to configure sync/async mode, buffer capacity, retry strategy, and so on.

  • Physical join strategy: Use hints such as SHUFFLE_HASH to control shuffle behavior and mitigate data skew.

  • Job-level configuration: Use the SET command to adjust global parameters in the table.exec.* family.

Syntax of dimension table JOIN

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
Note
  • FOR SYSTEM_TIME AS OF PROCTIME() is required. It instructs the engine to join each row of the main stream against the dimension table snapshot as of the current processing time.

  • The ON clause must contain an equality condition on a column that the dimension table can look up randomly.

  • In the ON clause, you can apply type-cast functions such as CAST to source table columns. When the source and dimension column types do not match, cast the source column to align with the dimension column type.

Limits and considerations

  • A dimension table JOIN only supports joining against the current snapshot of the dimension table.

  • Dimension tables support INNER JOIN and LEFT JOIN only. RIGHT JOIN and FULL JOIN are not supported.

  • For a one-to-one join, make sure the join condition contains an equality predicate on a column that is unique in the dimension table.

  • For each piece of streaming data, the join only sees the latest version of the dimension table at processing time. After the join has been performed, any subsequent changes (insertion, update, or deletion) to the dimension data are not propagated to rows that have already been joined. For per-connector behavior, see the corresponding connector documentation.

Dimension table cache strategies

Most connectors support cache strategies for dimension table JOINs. Support varies slightly across connectors—refer to the corresponding connector documentation for details. The common cache strategies are:

Strategy

Behavior

None (default)

No caching.

LRU

Caches a subset of the dimension table. Every row from the source stream first looks up the cache; on a miss, the engine queries the physical dimension table.

ALL

Caches the entire dimension table. Before the job starts, the engine loads all rows into the cache; every subsequent lookup is served from the cache, and a cache miss means the key does not exist. Connector-specific options control periodic or scheduled reloads of the dimension table. This strategy fits small dimension tables that do not need frequent updates and demand maximum throughput.

Note
  • Choose between freshness and performance based on your business requirements. If freshness is critical, you can disable caching and read directly from the dimension table.

  • When caching is enabled, combine LRU with a TTL to keep cached data reasonably fresh. The TTL can be set to a short value—for example, several seconds to tens of seconds—so the cache is periodically refreshed from the source.

  • When using the ALL strategy, watch node memory carefully to avoid OOM.

  • Under the ALL strategy, the engine loads dimension data asynchronously, so you must increase the memory of the dimension JOIN node. As a rule of thumb, raise it by twice the size of the remote dimension table data.

Tuning a dimension table JOIN

Tuning parameter categories and how to pass them

Tuning a dimension table JOIN involves three categories of parameters. Each category has its own way of being passed, and using the wrong mechanism causes the hint to be silently ignored.

Category

Typical parameters

How to pass

Scope

Connector WITH options

lookup.cache, lookup.partial-cache.max-rows, lookup.partial-cache.expire-after-write

Set in the WITH (...) clause when creating the table, or override at query time with /*+ OPTIONS('key'='value') */.

Single table

LOOKUP hint options

table, async, capacity, timeout, output-mode, retry-predicate, retry-strategy, fixed-delay, max-attempts, shuffle

/*+ LOOKUP('table'='dim', 'async'='true', 'capacity'='100') */

Single join operation

Flink job-level TableConfig

Global configurations whose keys start with table.exec.* or table.optimizer.*

Run SET 'table.exec.async-lookup.buffer-capacity' = '3072'; at the beginning of the job.

Entire job

Important
  • An OPTIONS() hint only overrides options that the connector accepts in the WITH clause. It does not accept table.exec.* TableConfig keys—invalid keys are silently ignored.

  • A LOOKUP() hint only recognizes the fixed keys listed in the table above. It does not accept full Flink configuration keys. For example, to adjust the async lookup buffer you must write 'capacity'='100'; writing 'table.exec.async-lookup.buffer-capacity'='100' has no effect.

  • To change a global parameter that is not exposed through any hint, use SET 'xxx' = 'yyy';.

Override connector options with the OPTIONS hint

An OPTIONS() hint lets you override the WITH options of a table directly in a query, without modifying the CREATE TABLE statement. It is commonly used to tune cache behavior.

SELECT t.id, t.name, w.phoneNumber
FROM kafka_input AS t
LEFT JOIN phoneNumber /*+ OPTIONS(
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.max-rows' = '1000'
) */ FOR SYSTEM_TIME AS OF PROCTIME() AS w
ON t.name = w.name;

Every key inside OPTIONS must be a WITH option that the connector actually supports.

Note

The lookup.cache and lookup.partial-cache.max-rows options shown above belong to Flink's generic LookupCache interface (FLIP-221), and apply only to newer connectors that implement this interface (such as Fluss and JDBC). The None/LRU/ALL strategies described in the "Dimension table cache strategies" section correspond to legacy connectors, which expose their cache behavior through different WITH options. Refer to the documentation of the target connector to confirm which cache options and strategies it supports, and avoid using parameters that the connector does not recognize.

Configure lookup behavior with the LOOKUP hint

The LOOKUP hint behaves consistently with the Apache Flink community version. It configures sync/async mode, retry, and shuffle behavior for a single join operation. For details, see Apache Flink Lookup Hint.

  • The LOOKUP hint is available only in VVR 8.0 and later.

  • The 'shuffle' = 'true' option is available only in VVR 8.0.8 and later.

  • Aliases are supported in VVR 8.0 and later. When the dimension table is referenced by an alias, the hint must use that alias.

Supported options

Option

Meaning

Value

table

The dimension table name or alias the hint applies to.

Table name or alias string.

async

Whether to enable asynchronous lookup.

true / false

output-mode

Output ordering of asynchronous lookups.

ordered / allow_unordered

capacity

Buffer queue capacity for asynchronous lookups.

Integer.

timeout

Timeout for an asynchronous lookup.

Duration (for example, 180s).

retry-predicate

Condition that triggers a retry.

Only lookup_miss is currently supported.

retry-strategy

Retry strategy.

Only fixed_delay is currently supported.

fixed-delay

Fixed interval between retries.

Duration (for example, 10s).

max-attempts

Maximum number of retry attempts.

Integer.

shuffle

Whether to shuffle before the dimension table join.

true / false

Behavior of the shuffle option

The shuffle option affects the shuffle strategy of the dimension table join. The effective behavior in each scenario is as follows.

Scenario

Join strategy

'shuffle' = 'true' is not set.

The engine's default shuffle strategy is used.

'shuffle' = 'true' is not set and the dimension connector does not provide a custom join strategy.

The engine's default shuffle strategy is used.

'shuffle' = 'true' is set and the dimension connector does not provide a custom join strategy.

SHUFFLE_HASH is used by default. For details, see the SHUFFLE_HASH section below.

'shuffle' = 'true' is set and the dimension connector provides a custom join strategy.

The custom shuffle strategy provided by the connector is used.

Note

Currently only the Streaming Lakehouse Paimon connector provides a custom shuffle strategy. When the join columns cover all bucket columns, it shuffles based on the bucket.

Code examples
-- Apply the shuffle strategy only to dimension table dim1.
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b;

-- Apply the shuffle strategy to both dim1 and dim2.
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'), LOOKUP('table'='dim2', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b;

-- When dim1 is referenced by the alias D1, the hint must use the alias.
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b;

-- Apply the shuffle strategy to both dim1 and dim2 using their aliases.
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'), LOOKUP('table'='D2', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b;

Set job-level TableConfig with SET

table.exec.* and similar TableConfig keys are job-level global settings. They must be passed via SET; neither the LOOKUP hint nor the OPTIONS hint accepts them.

Common TableConfig keys for dimension table JOIN:

Key

Meaning

Default

table.exec.async-lookup.buffer-capacity

Buffer queue capacity for asynchronous lookups (job-level default).

100

table.exec.async-lookup.timeout

Timeout for asynchronous lookups (job-level default).

3 min

table.exec.async-lookup.output-mode

Output ordering of asynchronous lookups (job-level default).

ordered

-- Set job-level defaults at the beginning of the job.
SET 'table.exec.async-lookup.buffer-capacity' = '3072';
SET 'table.exec.async-lookup.timeout' = '180s';

INSERT INTO sink_table
SELECT ...
FROM src_table
LEFT JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME() AS d
ON src_table.key = d.key;
Note

When SET and a LOOKUP hint set the same parameter, the LOOKUP hint on the individual join takes precedence.

Control shuffle with join-strategy hints

Join-strategy hints control shuffle behavior for dimension table joins. The available hints are SHUFFLE_HASH, REPLICATED_SHUFFLE_HASH, and SKEW. The table below shows which join strategy fits each cache strategy.

Cache strategy

SHUFFLE_HASH

REPLICATED_SHUFFLE_HASH (equivalent to SKEW)

None

This strategy is not recommended; the main stream incurs extra network overhead.

This strategy is not recommended; the main stream incurs extra network overhead.

LRU

Consider this strategy when dimension lookup IO is the bottleneck. When the main stream has temporal locality on the join key, it improves cache hit rate, reduces IO requests, and increases overall throughput.

Important

The main stream incurs extra network overhead. If the main stream is skewed on the join key and you hit a performance bottleneck, consider REPLICATED_SHUFFLE_HASH instead.

Consider this strategy when dimension lookup IO is the bottleneck and the main stream is skewed on the join key. When the main stream has temporal locality on the join key, it improves cache hit rate, reduces IO requests, and increases overall throughput.

ALL

Recommended when dimension table memory becomes the bottleneck. Memory usage can drop to 1 / parallelism.

Important

The main stream incurs extra network overhead. If the main stream is skewed on the join key and you hit a performance bottleneck, consider REPLICATED_SHUFFLE_HASH instead.

Recommended when dimension table memory becomes the bottleneck and the main stream is skewed on the join key. Memory usage drops to bucket count / parallelism.

Important
  • The shuffle option of the LOOKUP hint already covers what SHUFFLE_HASH does. When both are present, the LOOKUP hint's shuffle option wins.

  • The shuffle option of the LOOKUP hint does not yet handle data skew. When combined with REPLICATED_SHUFFLE_HASH or SKEW, the strategy specified by REPLICATED_SHUFFLE_HASH or SKEW takes precedence.

SHUFFLE_HASH

Effect: Using a shuffle-hash strategy in a dimension table JOIN shuffles the main stream by join key before the join. Combined with the LRU cache strategy, it raises cache hit rate and reduces IO requests; combined with the ALL cache strategy, it reduces memory usage. A single SHUFFLE_HASH hint can specify multiple dimension tables.

Limits: SHUFFLE_HASH reduces memory usage, but the upstream data must be shuffled by join key, which introduces extra network overhead. Avoid it in the following two cases:

  • The main stream is heavily skewed on the join key. Using SHUFFLE_HASH would make the join node a bottleneck, causing severe backpressure for streaming jobs or long tails for batch jobs. Use REPLICATED_SHUFFLE_HASH instead.

  • The dimension table is small and the ALL cache strategy has no memory bottleneck. In this case the memory saved by SHUFFLE_HASH is not worth the extra network overhead.

Code example:

-- Enable SHUFFLE_HASH for dim1 only.
SELECT /*+ SHUFFLE_HASH(dim1) */ ...

-- Enable SHUFFLE_HASH for both dim1 and dim2.
SELECT /*+ SHUFFLE_HASH(dim1, dim2) */ ...

-- When dim1 is referenced by the alias D1, the hint must use the alias.
SELECT /*+ SHUFFLE_HASH(D1) */ ...

REPLICATED_SHUFFLE_HASH

Effect: REPLICATED_SHUFFLE_HASH behaves essentially the same as SHUFFLE_HASH, except that it randomly distributes main-stream rows with the same key across N parallel instances. This addresses the performance bottleneck caused by data skew. A single REPLICATED_SHUFFLE_HASH hint can specify multiple dimension tables.

Limits:

  • Configure the bucket count for skewed data with table.exec.skew-join.replicate-num (default: 16). Its value cannot exceed the parallelism of the dimension join node. For details, see Job-level SQL tuning.

  • Update streams are not supported. Using this hint when the main stream is an update stream raises an error.

Code example:

SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ ...

SKEW

Effect: When the specified table is known to be skewed, the optimizer applies the replicated-shuffle-hash strategy. SKEW is syntactic sugar—internally it is implemented with replicated shuffle hash.

Limits:

  • Each SKEW hint can specify only one table.

  • The table name must refer to the main table that has the skew, not the dimension table.

  • Update streams are not supported. Using this hint when the main stream is an update stream raises an error.

Code example:

SELECT /*+ SKEW(src) */  ...

Examples

Example 1: A basic dimension table JOIN

The most basic form: enrich a Kafka stream with a MySQL dimension table. No tuning hints are used.

CREATE TEMPORARY TABLE kafka_input (
  id   BIGINT,
  name VARCHAR,
  age  BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = '<yourTopic>',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE phoneNumber (
  name        VARCHAR,
  phoneNumber BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

CREATE TEMPORARY TABLE result_infor (
  id          BIGINT,
  phoneNumber BIGINT,
  name        VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO result_infor
SELECT
  t.id,
  w.phoneNumber,
  t.name
FROM kafka_input AS t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() AS w
ON t.name = w.name;

Example 2: Enable cache with the OPTIONS hint and configure async lookup with the LOOKUP hint

Tuning scenario: the dimension table's QPS is insufficient. You want to turn on partial cache on the dimension table and enable async lookup with a larger buffer queue.

INSERT INTO user_behavior_wide
SELECT /*+ LOOKUP('table' = 't2', 'async' = 'true', 'capacity' = '3072') */
  t1.member_id AS member_id,
  t2.tag AS tag
FROM user_behavior_datagen AS t1
LEFT JOIN fluss.fluss.user_active_info /*+ OPTIONS(
    'lookup.cache' = 'PARTIAL',
    'lookup.partial-cache.max-rows' = '1000'
  ) */
FOR SYSTEM_TIME AS OF PROCTIME() AS t2
ON t1.member_id = t2.member_id;
  • Connector options (lookup.cache, lookup.partial-cache.max-rows) are passed through an OPTIONS() hint placed right after the dimension table reference; this overrides the corresponding WITH options at query time.

  • Lookup behavior (async, capacity) is configured through a LOOKUP() hint placed after SELECT. OPTIONS and LOOKUP are two different hint types and must be written separately; combining them silently disables the hint.

Example 3: Tune the job-level async lookup buffer with SET

Tuning scenario: multiple dimension table JOINs in the same job need a larger asynchronous lookup buffer. You want the change to apply globally, and to enable async via the LOOKUP hint.

SET 'table.exec.async-lookup.buffer-capacity' = '3072';
SET 'table.exec.async-lookup.timeout' = '180s';

INSERT INTO user_behavior_wide
SELECT /*+ LOOKUP('table' = 't2', 'async' = 'true') */
  t1.member_id,
  t2.tag
FROM user_behavior_datagen AS t1
LEFT JOIN fluss.fluss.user_active_info
FOR SYSTEM_TIME AS OF PROCTIME() AS t2
ON t1.member_id = t2.member_id;
  • table.exec.async-lookup.buffer-capacity and similar parameters are job-level TableConfig keys and can be set only with SET. Putting them inside OPTIONS or LOOKUP has no effect.

  • When the LOOKUP hint does not specify capacity, the global value from SET is used. If both are specified, the LOOKUP hint takes precedence on that individual join.