维表JOIN语句
对于每条流式数据,可以关联一个外部维表数据源,为实时计算Flink版提供数据关联查询。
使用限制
维表JOIN仅支持对当前时刻维表快照的关联。
维表支持INNER JOIN和LEFT JOIN,不支持RIGHT JOIN或FULL JOIN。
注意事项
如果您有一对一JOIN需求,请确保连接条件中包含了维表中具有唯一性字段的等值连接条件。
对每条流式数据,只会关联当时维表的最新版本数据,即JOIN行为只发生在处理时间(Processing Time)。如果JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化。具体的维表的行为请参见对应连接器行为。
维表JOIN语法
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
说明
必须加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN维表当前时刻所看到的每条数据。
ON条件中必须包含维表实际能支持随机查找的字段的等值条件。
ON条件中维表字段不能使用CAST等类型转换函数。如果您有类型转换需求,请在源表字段进行操作。
使用示例
测试数据
表1 kafka_input
id(bigint)
name(varchar)
age(bigint)
1
lilei
22
2
hanmeimei
20
3
libai
28
表2 phoneNumber
name(varchar)
phoneNumber(bigint)
dufu
1390000111
baijuyi
1390000222
libai
1390000333
lilei
1390000444
测试语句
CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w ON t.name = w.name;
测试结果
id(bigint)
phoneNumber(bigint)
name(varchar)
1
1390000444
lilei
3
1390000333
libai