TopN 常用于计算流数据中某个指标的最大/最小的前N个数据。Blink SQL 可以基于 OVER 窗口操作灵活地完成 TopN 的工作。
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
参数说明:
ROW_NUMBER()
: 是一个计算行号的OVER窗口函数,行号计算从1开始。PARTITION BY col1[, col2..]
: 指定分区的列,可以不指定。ORDER BY col1 [asc|desc][, col2 [asc|desc]...]
: 指定排序的列,可以多列不同排序方向。如上语法所示,TopN 需要两层query,子查询中使用 ROW_NUMBER()
窗口函数来对数据根据排序列进行排序并标上排名。外层查询中,对排名进行过滤,只取前N条,如N=10,那么就是取 Top 10 的数据。
在物理执行过程中,Blink SQL 会对输入的数据流根据排序键进行排序,如果某个分区的前N条记录发生了改变,则会将改变的那几名数据以更新流的形式发给下游。所以如果需要将TopN的数据输出到外部存储,那么接的结果表必须是一个带primary key的表。
WHERE 条件的一些限制:为了Blink SQL能识别出这是一个TopN的query,外层循环中必须要指定 rownum <= N
的格式来指定前N条记录,不能 rownum - 5 <= N
这种将rownum至于某个表达式中。当然,WHERE条件中,可以额外带上其他条件,但是必须是以 AND 连接。
如下示例是先统计查询流中每小时,每个城市,每个关键字被查询的次数,然后输出每小时、每个城市被查询最多的前100个关键字。所以输出表中,小时、城市、排名三者可以唯一确定一条记录,所以需要将这三列声明成联合主键(需要在外部存储中也有同样的主键设置)。
CREATE TABLE rds_output (
rownum int,
start_time bigint,
city varchar,
keyword varchar,
pv bigint,
PRIMARY KEY (rownum, start_time, city)
) with (
type = 'rds',
...
)
INSERT INTO rds_output
SELECT rownum, start_time, city, keyword, pv
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY start_time, city ORDER BY pv desc) AS rownum
FROM (
select substr(time_str,1,12) as start_time,
keyword,
count(1) as pv,
city
from tmp_search
group by substr(time_str,1,12), keyword, city
) a
)
WHERE rownum <= 100
测试数据
ip(varchar) | time(varchar) |
---|---|
192.168.1.1 | 100000000 |
192.168.1.2 | 100000000 |
192.168.1.2 | 100000000 |
192.168.1.3 | 100030000 |
192.168.1.3 | 100000000 |
192.168.1.3 | 100000000 |
测试case
create table source_table (
IP VARCHAR,
`TIME` VARCHAR
)with(
type='datahub',
endPoint='http://dh-cn-hangzhou.aliyuncs.com',
project='blink_test',
topic='ip_count01',
accessId='LTXXXx',
accessKey='gUqXXXxxx'
);
create table result_table (
rownum int,
start_time VARCHAR,
IP VARCHAR,
cc BIGINT,
PRIMARY KEY (start_time, IP)
) with (
type = 'rds',
url='jdbc:mysql://rm-bp1gz4k202t8XXXXXXs.com:3306/blink_test',
tableName='blink_rds_test',
userName='xxx',
password='xxx'
);
INSERT INTO result_table
SELECT rownum,start_time,IP,cc
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY cc desc) AS rownum
FROM (
SELECT SUBSTRING(`TIME`,1,2) AS start_time,--可以根据真实时间来去相应的数值,这里取得是测试数据
COUNT(IP) AS cc,
IP
FROM source_table
GROUP BY SUBSTRING(`TIME`,1,2), IP
)a
)
WHERE rownum <= 3 --可以根据真实top值来去相应的数值,这里取得是测试数据
测试结果
rownum(int) | start_time(varchar) | ip(varchar) | cc(bigint) |
---|---|---|---|
1 | 10 | 192.168.1.3 | 6 |
2 | 10 | 192.168.1.2 | 4 |
3 | 10 | 192.168.1.1 | 2 |
如上所述的语法,rownum 字段会作为结果表的主键字段之一,写入结果表。但是这会导致数据膨胀的问题,比如说收到一条原排名9的更新数据,更新后排名上升到1,那么从1到9的数据其排名都发生变化了,需要将这些数据作为更新都写入结果表。所以会有数据膨胀,导致结果表收到了太多的数据而成为瓶颈。其优化方法是,结果表中不保存 rownum,最终的 rownum 由前端计算,因为 TopN 的数据量一般都不会很大,前端排序100个数据是很快的。这样当收到一条原排名9的更新数据,更新后排名上升到1,也只需要发送这一条数据,而不用把排名 1到9的数据都发送下去。极大地降低了结果表的压力。
SELECT col1, col2, col3
FROM (
SELECT col1, col2, col3
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
语法与上文的类似,只是在外层查询中将 rownum 字段裁剪掉即可。
在无 rownum 场景,结果表主键的定义一定要特别注意。若定义有误,会直接导致TopN结果的不正确。无 rownum 场景的主键应为 TopN 上游 group by 节点的 keys 列表。
该示例来自优酷的真实业务(精简后),优酷上每个视频在分发时会产生大量流量,依据视频产生的流量我们可以分析出最热门的视频。如下,就是统计出每分钟流量最大的 top5 的视频。
--从SLS读取数据原始存储表
CREATE TABLE sls_cdnlog_stream (
vid VARCHAR, -- video id
rowtime Timestamp, -- 观看视频发生的时间
response_size BIGINT, -- 观看产生的流量
WATERMARK FOR rowtime as withOffset(rowtime, 0)
) WITH (
type='sls',
...
);
--1分钟窗口统计vid带宽数
CREATE VIEW cdnvid_group_view AS
SELECT vid,
TUMBLE_START(rowtime, INTERVAL '1' MINUTE) AS start_time,
SUM(response_size) AS rss
FROM sls_cdnlog_stream
GROUP BY vid, TUMBLE(rowtime, INTERVAL '1' MINUTE);
--存储表
CREATE TABLE hbase_out_cdnvidtoplog (
vid VARCHAR,
rss BIGINT,
start_time VARCHAR,
-- 注意结果表中不存储 rownum 字段
-- 特别注意该主键的定义,为 TopN 上游 group by keys
PRIMARY KEY(start_time, vid)
) WITH (
type='RDS',
...
);
-- 统计每分钟 top5 消耗流量的 vid,并输出
INSERT INTO hbase_out_cdnvidtoplog
-- 注意次外层查询,不选出 rownum 字段
SELECT vid, rss, start_time FROM
(
SELECT
vid, start_time, rss,
ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY rss DESC) as rownum,
FROM
cdnvid_group_view
)
WHERE rownum <= 5;
vid(VARCHAR) | rowtime(Timestamp) | response_size(BIGINT) |
---|---|---|
10000 | 2017-12-18 15:00:10 | 2000 |
10000 | 2017-12-18 15:00:15 | 4000 |
10000 | 2017-12-18 15:00:20 | 3000 |
10001 | 2017-12-18 15:00:20 | 3000 |
10002 | 2017-12-18 15:00:20 | 4000 |
10003 | 2017-12-18 15:00:20 | 1000 |
10004 | 2017-12-18 15:00:30 | 1000 |
10005 | 2017-12-18 15:00:30 | 5000 |
10006 | 2017-12-18 15:00:40 | 6000 |
10007 | 2017-12-18 15:00:50 | 8000 |
start_time(VARCHAR) | vid(VARCHAR) | rss(BIGINT) |
---|---|---|
2017-12-18 15:00:00 | 10000 | 9000 |
2017-12-18 15:00:00 | 10007 | 8000 |
2017-12-18 15:00:00 | 10006 | 6000 |
2017-12-18 15:00:00 | 10005 | 5000 |
2017-12-18 15:00:00 | 10002 | 4000 |
在文档使用中是否遇到以下问题
更多建议
匿名提交