全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
流计算

TOP_N语句

更新时间:2017-12-20 18:39:19

TopN 常用于计算流数据中某个指标的最大/最小的前N个数据。Blink SQL 可以基于 OVER 窗口操作灵活地完成 TopN 的工作。

语法

  1. SELECT *
  2. FROM (
  3. SELECT *,
  4. ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
  5. ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  6. FROM table_name)
  7. WHERE rownum <= N [AND conditions]

参数说明:

  • ROW_NUMBER(): 是一个计算行号的OVER窗口函数,行号计算从1开始。
  • PARTITION BY col1[, col2..] : 指定分区的列,可以不指定。
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...]: 指定排序的列,可以多列不同排序方向。

如上语法所示,TopN 需要两层query,子查询中使用 ROW_NUMBER() 窗口函数来对数据根据排序列进行排序并标上排名。外层查询中,对排名进行过滤,只取前N条,如N=10,那么就是取 Top 10 的数据。

在物理执行过程中,Blink SQL 会对输入的数据流根据排序键进行排序,如果某个分区的前N条记录发生了改变,则会将改变的那几名数据以更新流的形式发给下游。所以如果需要将TopN的数据输出到外部存储,那么接的结果表必须是一个带primary key的表。

WHERE 条件的一些限制:为了Blink SQL能识别出这是一个TopN的query,外层循环中必须要指定 rownum <= N的格式来指定前N条记录,不能 rownum - 5 <= N 这种将rownum至于某个表达式中。当然,WHERE条件中,可以额外带上其他条件,但是必须是以 AND 连接。

示例一

如下示例是先统计查询流中每小时,每个城市,每个关键字被查询的次数,然后输出每小时、每个城市被查询最多的前100个关键字。所以输出表中,小时、城市、排名三者可以唯一确定一条记录,所以需要将这三列声明成联合主键(需要在外部存储中也有同样的主键设置)。

  1. CREATE TABLE rds_output (
  2. rownum int,
  3. start_time bigint,
  4. city varchar,
  5. keyword varchar,
  6. pv bigint,
  7. PRIMARY KEY (rownum, start_time, city)
  8. ) with (
  9. type = 'rds',
  10. ...
  11. )
  12. INSERT INTO rds_output
  13. SELECT rownum, start_time, city, keyword, pv
  14. FROM (
  15. SELECT *,
  16. ROW_NUMBER() OVER (PARTITION BY start_time, city ORDER BY pv desc) AS rownum
  17. FROM (
  18. select substr(time_str,1,12) as start_time,
  19. keyword,
  20. count(1) as pv,
  21. city
  22. from tmp_search
  23. group by substr(time_str,1,12), keyword, city
  24. ) a
  25. )
  26. WHERE rownum <= 100

示例二

测试数据

ip(varchar) time(varchar)
192.168.1.1 100000000
192.168.1.2 100000000
192.168.1.2 100000000
192.168.1.3 100030000
192.168.1.3 100000000
192.168.1.3 100000000

测试case

  1. create table source_table (
  2. IP VARCHAR,
  3. `TIME` VARCHAR
  4. )with(
  5. type='datahub',
  6. endPoint='http://dh-cn-hangzhou.aliyuncs.com',
  7. project='blink_test',
  8. topic='ip_count01',
  9. accessId='LTXXXx',
  10. accessKey='gUqXXXxxx'
  11. );
  12. create table result_table (
  13. rownum int,
  14. start_time VARCHAR,
  15. IP VARCHAR,
  16. cc BIGINT,
  17. PRIMARY KEY (start_time, IP)
  18. ) with (
  19. type = 'rds',
  20. url='jdbc:mysql://rm-bp1gz4k202t8XXXXXXs.com:3306/blink_test',
  21. tableName='blink_rds_test',
  22. userName='xxx',
  23. password='xxx'
  24. );
  25. INSERT INTO result_table
  26. SELECT rownum,start_time,IP,cc
  27. FROM (
  28. SELECT *,
  29. ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY cc desc) AS rownum
  30. FROM (
  31. SELECT SUBSTRING(`TIME`,1,2) AS start_time,--可以根据真实时间来去相应的数值,这里取得是测试数据
  32. COUNT(IP) AS cc,
  33. IP
  34. FROM source_table
  35. GROUP BY SUBSTRING(`TIME`,1,2), IP
  36. )a
  37. )
  38. WHERE rownum <= 3 --可以根据真实top值来去相应的数值,这里取得是测试数据

测试结果

rownum(int) start_time(varchar) ip(varchar) cc(bigint)
1 10 192.168.1.3 6
2 10 192.168.1.2 4
3 10 192.168.1.1 2
本文导读目录