双流JOIN语句
Flink SQL支持对动态表进行复杂而灵活的连接操作,本文为您介绍如何使用双流JOIN语句。
背景信息
实时计算的JOIN和传统批处理JOIN的语义一致,都用于将两张表关联起来。区别为实时计算关联的是两张动态表,关联的结果也会动态更新,以保证最终结果和批处理结果一致。
语法
tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:表名称。
tableExpression:表达式。
joinCondition:JOIN条件。
为左右流的状态设置不同生命周期
从实时计算引擎VVR 8.0.1 开始,您可以通过Flink SQL Hint 语法单独为双流JOIN的左右流状态设置不同生命周期 (TTL)来减少维护的状态大小。
语法
SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
注意事项
JOIN_STATE_TTL HINT仅支持在双流JOIN场景使用,不支持维表JOIN,Interval Join 或 Window Join。
若双流JOIN时JOIN_STATE_TTL_HINT仅指定某一条流的在JOIN节点的状态生命周期,则另外一条流的状态生命周期使用Flink SQL作业级别的状态生命周期,由table.exec.state.ttl控制(参见基本配置),默认为1.5天。
tableReference支持表名,视图名和别名,一旦为表名指定别名时,则需使用别名。
这是一个实验性质的特性,HINT语法未来可能会发生变化。
示例
-- HINT使用别名
SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */
o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
FROM Orders AS o
JOIN Products AS p
ON o.productid = p.productid;
-- HINT使用表名
SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
FROM Orders
JOIN Products
ON Orders.productid = Products.productid;
-- HINT使用视图名
CREATE TEMPORARY VIEW v AS
SELECT id, ...
FROM (
SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn
FROM src1
WHERE ...
) tmp
WHERE rn = 1;
SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
FROM v
LEFT JOIN src2 AS b ON v.id = b.id;
Orders JOIN Products表的数据示例
测试数据
表 1. Orders rowtime
productid
orderid
units
10:17:00
30
5
4
10:17:05
10
6
1
10:18:05
20
7
2
10:18:07
30
8
20
11:02:00
10
9
6
11:04:00
10
10
1
11:09:30
40
11
12
11:24:11
10
12
4
表 2. Products productid
name
unitprice
30
Cheese
17
10
Beer
0.25
20
Wine
6
30
Cheese
17
10
Beer
0.25
10
Beer
0.25
40
Bread
100
10
Beer
0.25
测试语句
SELECT o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid;
测试结果
o.rowtime
o.productid
o.orderid
o.units
p.name
p.unitprice
10:17:00
30
5
4
Cheese
17.00
10:17:00
30
5
4
Cheese
17.00
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:18:05
20
7
2
Wine
6.00
10:18:07
30
8
20
Cheese
17.00
10:18:07
30
8
20
Cheese
17.00
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:09:30
40
11
12
Bread
100.00
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
datahub_stream1 JOIN datahub_stream2表的数据示例
测试数据
表 3. datahub_stream1 a(BIGINT)
b(BIGINT)
c(VARCHAR)
0
10
test11
1
10
test21
表 4. datahub_stream2 a(BIGINT)
b(BIGINT)
c(VARCHAR)
0
10
test11
1
10
test21
0
10
test31
1
10
test41
测试语句
SELECT s1.c,s2.c FROM datahub_stream1 AS s1 JOIN datahub_stream2 AS s2 ON s1.a = s2.a WHERE s1.a = 0;
测试结果
s1.c(VARCHAR)
s2.c(VARCHAR)
test11
test11
test11
test31