Processing Time Temporal Join语句

对于每条流式数据,可以关联一个外部维表数据源,为实时计算Flink版提供数据关联查询。

背景信息

Processing Time Temporal Join使用处理时间(Processing Time)属性,将事实表中的每条数据与维表的的最新数据进行关联处理。与事件时间(Event Time)不同,处理时间并不关注事件实际发生的时刻,而是依据数据到达处理系统的时间点。

使用限制

  • 仅实时计算引擎VVR 8.0.10及以上版本支持。

  • 仅MySQL维表支持使用Processing Time Temporal Join。

注意事项

  • 不支持全量同步阶段下的Checkpoint,需要配置execution.checkpointing.interval-during-backlog: 0参数关闭Checkpoint,而增量同步阶段不受影响。

  • 使用Processing Time Temporal Join时,需配置table.optimizer.proctime-temporal-join-strategy: TEMPORAL_JOIN参数。

语法格式

Processing Time Temporal Join语法与维表Join语法一样:

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;

使用示例

  • 测试数据

    • 表1 kafka_input

      id(bigint)

      name(varchar)

      age(bigint)

      1

      lilei

      22

      2

      hanmeimei

      20

      3

      libai

      28

    • 表2 phoneNumber

      name(varchar)

      phoneNumber(bigint)

      dufu

      1390000111

      baijuyi

      1390000222

      libai

      1390000333

      lilei

      1390000444

  • 测试语句

    SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN';  -- 使用Processing Time Temporal Join。
    SET 'execution.checkpointing.interval-during-backlog' = '0';              -- 关闭全量阶段下的Checkpoint。
    
    CREATE TEMPORARY TABLE kafka_input (
      id   BIGINT,
      name VARCHAR,
      age  BIGINT,
      proc_time AS PROCTIME()
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    -- 维表
    CREATE TEMPORARY TABLE phoneNumber(
      name VARCHAR,
      phoneNumber BIGINT,
      PRIMARY KEY(name) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE result_infor(
      id BIGINT,
      phoneNumber BIGINT,
      name VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO result_infor
    SELECT
      t.id,
      w.phoneNumber,
      t.name
    FROM kafka_input as t
    JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w
    ON t.name = w.name;
  • 测试结果

    id(bigint)

    phoneNumber(bigint)

    name(varchar)

    1

    1390000444

    lilei

    3

    1390000333

    libai