对于每条流式数据,可以关联一个外部维表数据源,为实时计算Flink版提供数据关联查询。
背景信息
Processing Time Temporal Join使用处理时间(Processing Time)属性,将事实表中的每条数据与维表的的最新数据进行关联处理。与事件时间(Event Time)不同,处理时间并不关注事件实际发生的时刻,而是依据数据到达处理系统的时间点。
使用限制
仅实时计算引擎VVR 8.0.10及以上版本支持。
仅MySQL维表支持使用Processing Time Temporal Join。
注意事项
不支持全量同步阶段下的Checkpoint,需要配置
execution.checkpointing.interval-during-backlog: 0
参数关闭Checkpoint,而增量同步阶段不受影响。使用Processing Time Temporal Join时,需配置
table.optimizer.proctime-temporal-join-strategy: TEMPORAL_JOIN
参数。
语法格式
Processing Time Temporal Join语法与维表Join语法一样:
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
使用示例
测试数据
表1 kafka_input
id(bigint)
name(varchar)
age(bigint)
1
lilei
22
2
hanmeimei
20
3
libai
28
表2 phoneNumber
name(varchar)
phoneNumber(bigint)
dufu
1390000111
baijuyi
1390000222
libai
1390000333
lilei
1390000444
测试语句
SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN'; -- 使用Processing Time Temporal Join。 SET 'execution.checkpointing.interval-during-backlog' = '0'; -- 关闭全量阶段下的Checkpoint。 CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT, proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); -- 维表 CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w ON t.name = w.name;
测试结果
id(bigint)
phoneNumber(bigint)
name(varchar)
1
1390000444
lilei
3
1390000333
libai