您可以通过多种方式实现去重需求,例如FIRST_VALUE、LAST_VALUE和DISTINCT等。本文为您介绍如何使用TopN方法实现去重,以及在使用过程中需要注意的事项。

去重的方案通常有两种:
  • 保留第一条
  • 保留最后一条
说明 ORDER BY后的时间属性字段必须在源表中定义。

语法

由于SQL上没有直接支持去重的语法,因此我们使用了SQL的ROW_NUMBER OVER WINDOW功能实现去重。ROW_NUMBER OVER WINDOWTopN语句方法类似,可以理解为一种特殊的TopN。
SELECT *
FROM (
   SELECT *,
    ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]
     ORDER BY timeAttributeCol [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1
参数说明:
  • ROW_NUMBER() : 一个计算行号的OVER窗口函数,行号计算从1开始。
  • PARTITION BY col1[, col2..] :指定分区的列,可以不指定。即去重的Keys。
  • ORDER BY timeAttributeCol [asc|desc]) :指定排序的列,必须是时间属性字段(Processing Time或Event Time)。可以指定为顺序(Keep First Row)或倒序(Keep Last Row)。
  • 外层查询rownum必须为= 1或者<= 1。条件必须是AND,并且不能存在Undeterministic的UDF的条件。
如上语法所示,去重需要两层Query:
  • 子查询中:使用ROW_NUMBER() 窗口函数,按照时间属性列对数据进行排序编号。
  • 外层查询中:对排名进行过滤,只取第一条,达到去重的目的。时间列排序方向可以是顺序或倒序,顺序并保留第一条也就是deduplicate keep first row,倒序并保留第一条也就是deduplicate keep last row
当排序字段是Processing Time列时,Flink会按系统时间去重,其每次运行结果不确定。当排序字段是Event Time列时,Flink会按业务时间去重,其每次运行结果是确定的。

Deduplicate Keep FirstRow

保留首行的去重策略,即保留指定Key下第一条出现的数据,之后出现该Key下的数据会被丢弃掉。因为其State中只存储了Key数据,因此性能较优。示例如下。
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
  FROM T
)
WHERE rowNum = 1

本例中,将T表按照b字段进行去重,并按照系统时间保留第一条数据。proctime在以上示例中是源表T中的一个具有Processing Time属性的字段。如果您按照系统时间去重,也可以将proctime字段简化成PROCTIME()函数进行调用,可以省略proctime字段的声明。

说明 blink-3.3.1版本后,FirstRow支持使用Event Time进行开窗,并且不会产生Retraction。

Deduplicate Keep LastRow

注意 LastRow目前不支持使用Event Time进行开窗。
LastRow的作用是也是去重,且只保留该主键下最后一条出现的数据。其性能略胜于使用LAST_VALUE函数,示例如下。
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY proctime DESC) as rowNum
  FROM T
)
WHERE rowNum = 1

FAQ

Q:执行ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY now() as time DESC)语句时,产生如下报错,应该如何处理?
java.lang.RuntimeException: Can not retract a non-existent record:
    38c30001,1b800000008,1c000000013,85000035343a3731,1d80000008d,2680000000c,8400000073616173,8500003130303333,8600a6bae9a7a4e5,0,0,27800000011,2900000001c,85000069616d6164,2b00000000c,0,0,8100000000000059,2c00000000a,2d000000016,0,0,0,2e800000016,3000000000c,3100000000c,32000000011,8765636976726573,3380000000c,8500004554554f52,3480000000c,3580000000a,8600a6bae9a7a4e5,2,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,5d304013.
    This should never happen.
A:以下两种原因可以导致以上报错:
  • 由代码中now()导致

    TopN现在不支持非确定性的排序字段,这里now()是非确定的,每次输出的值不一样,所以导致Retraction会找不到之前的值。请使用确定性的时间字段:Event Time或者是源表中一个具有Processing Time属性的字段。

  • blink.state.ttl.msstate.backend.niagara.ttl.ms参数设置过小

    将设置过小的TTL参数注释使用默认配置,或调大TTL参数。