Flink SQL Join快速入门

更新时间:2025-02-08 03:53:44

Flink SQL支持在动态表上进行复杂的连接操作,提供多种查询语义和join类型。使用时需避免引发笛卡尔积,因为Flink SQL不支持这种操作,会导致查询失败。默认情况下,join顺序未优化。为提高性能,可在FROM子句中调整表顺序,将更新频率最低的表放在前面,最高的放在后面。

Joins概览

连接类型

类型说明

语法差异

连接类型

类型说明

语法差异

Regular Joins

Regular Join是最通用的join类型。在这种join下,join两侧表的任何新记录或变更都是可见的,并会影响整个join的结果

流式查询中的regular join语法非常灵活,支持对输入表进行插入、更新和删除操作。然而,regular join需要将两边的输入数据永久保存在状态中,这可能导致状态无限增长,具体取决于输入的数据量。为了防止状态过大,可以设置状态的time-to-live (TTL),但这可能影响结果的准确性。

Interval Joins

Interval Join是返回一个符合join条件和时间限制的简单笛卡尔积。

需要至少一个等值连接条件和一个在两边均包含的时间限定连接条件。时间范围的判断可以定义为一个条件(如 <, <=, >=, >),也可以使用一个BETWEEN 条件,或者对两边表中相同类型的时间属性(即:处理时间或事件时间)进行等式判断。

Temporal Joins

Temporal Join允许对版本表基于事件时间或处理时间进行join。 这意味着可以基于某个时间点连接版本表的特定时间版本数据。

要求两边表具备相同类型的时间处理语义(即:处理时间或事件时间)。且注意连接结果的生命周期,连接条件通常为某个特定的时间戳。

Lookup Joins

Lookup Join通常用于使用从外部系统查询的数据来丰富表。Join要求一个表具有处理时间属性,另一个表提供维表的支持。

要求一个表具备处理时间属性,而另一个表则需通过查找源连接器进行支持。两个表之间还需要一个强制的相等连接条件。

Lateral Joins

Lateral Join将表与表值函数的结果连接。左表的每一行都与表值函数相应调用产生的所有行相连接。

要求ON 子句中有一个固定的TRUE连接条件。如JOIN LATERAL TABLE(table_func(order_id)) t(res) ON TRUE

Regular Joins

常用的四种连接类型是:

  • INNER JOIN:返回两个表中满足连接条件的记录(交集)。

  • LEFT JOIN:返回左表中的所有记录,即使右表中没有匹配的记录(保留左表)。

  • RIGHT JOIN:返回右表中的所有记录,即使左表中没有匹配的记录(保留右表)。

  • FULL OUTER JOIN:返回两个表的并集,包含匹配和不匹配的记录。

Regular Joins图解

image

Regular Joins示例

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击数据开发 > ETL

  4. 单击新建后,在新建作业草稿对话框,选择空白的流作业草稿,单击下一步。

    此示例将展示如何使用连接来关联多个表之间的行,将超级英雄的昵称和他们的真实姓名关联起来。
    CREATE TEMPORARY TABLE NOC (
      agent_id STRING,
      codename STRING
    )
    WITH (
      'connector' = 'faker',   --faker模拟数据生成连接器
      'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',  --从五个数字中选择生成一个数
      'fields.codename.expression' = '#{superhero.name}',   --fake内置函数,将随机生成一个超级英雄的名字
      'number-of-rows' = '10'   --指定生产的数据为十行  
    );
    
    CREATE TEMPORARY TABLE RealNames (
      agent_id STRING,
      name     STRING
    )
    WITH (
      'connector' = 'faker',
      'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
      'fields.name.expression' = '#{Name.full_name}',  --fake内置函数,将随机生成一个名字
      'number-of-rows' = '10'
    );
    
    SELECT
        name,
        codename
    FROM NOC
    INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id;  --如果两张表的agent_id(1-5)相等,则输出name和codename。
  5. 单击右上方的调试,选择调试集群,单击确认。如果还没有Session集群,详情请参见创建Session集群

    image

更多Regular Joins用法详情请参见双流JOIN语句

Interval Joins

Interval Joins是将两组数据进行连接的过程,每组数据被划分为若干区间,区间由开始时间和结束时间定义。每组中的数据根据其时间戳被分配到相应的区间。区间连接通常用于比较在一定时间间隔内的两组数据。

Interval Joins图解

image

Interval Joins示例

此示例将展示如何在具有时间上下文相关的事件的表之间执行连接。将订单时间(order_time)与发货时间(shipment_time)相差三天的数据筛选出来。

参见Regular Joins新建ETL作业。

CREATE TEMPORARY TABLE orders (
  id INT,
  order_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP)    --根据本机时间,随机获取-4,-3,-2小时前的时间
)
WITH (
  'connector' = 'datagen',     --datagen连接器,可以周期性生成随机数据
  'rows-per-second'='10',      --生成随机数据的速率,10条/s
  'fields.id.kind'='sequence', --序列生成器
  'fields.id.start'='1',       --序列值从1开始
  'fields.id.end'='100'        --序列值从100结束
);

CREATE TEMPORARY TABLE shipments (
  order_id INT,
  shipment_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1))+1 AS INT), CURRENT_TIMESTAMP)   --根据本机时间,随机获取-2,-1,0小时前的时间
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='5',
  'fields.order_id.kind'='sequence',
  'fields.order_id.start'='1',
  'fields.order_id.end'='100'
);

SELECT
  o.id AS order_id,
  o.order_time,
  s.shipment_time,
  TIMESTAMPDIFF(HOUR,o.order_time,s.shipment_time) AS hour_diff    --订单时间(order_time)与发货时间(shipment_time)相差的时间
FROM orders o
JOIN shipments s ON o.id = s.order_id   
WHERE 
    o.order_time BETWEEN s.shipment_time - INTERVAL '3' HOUR AND s.shipment_time;  --根据发货时间筛选,订单是不是在三小时内下单的

单击右上方的调试,单击确认。调试结果如下:

image

更多Interval joins的用法详情请参见IntervalJoin语句

Temporal Joins

时间连接常用于连接时态表(在Flink中也称为动态表)。时态表是随时间变化的表,每条记录都关联了一个或多个时间段。如汇率或商品的价格在不同时间会有波动,此时需要采用时间连接,将事务发生的时间对应到当时相应的汇率或价格进行计算。

Temporal Joins图解

image

Temporal Joins示例

此示例将展示汇率在不同时间段发生变化后,订单也需要采用当时生效的汇率进行计算的业务场景。

(可选)生成模拟数据

  1. 参见Regular Joins创建ETL作业。

  1. 使用Faker连接器生成模拟数据,以Upsert Kafka连接器写入Kafka模拟汇率动态表。

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE currency_rates_faker (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
  'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
  'rows-per-second' = '2'
);

CREATE TEMPORARY TABLE transactions_faker (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'fields.id.expression' = '#{Internet.UUID}',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
  'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
  'rows-per-second' = '2'
);

BEGIN STATEMENT SET;

INSERT INTO currency_rates
SELECT * FROM currency_rates_faker;

INSERT INTO transactions
SELECT * FROM transactions_faker;

END;
  1. 单击右上方的部署,进行作业部署。

  2. 单击左侧导航栏的运维中心 > 作业运维,单击目标作业操作列的启动,选择无状态启动后单击启动

参见Regular Joins新建ETL作业,读取模拟数据进行调试。

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'currency_rates',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'transactions',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json'
);

SELECT
  t.id,
  t.total * c.eur_rate AS total_eur,
  c.eur_rate,
  t.total,
  c.currency_code,
  c.rate_time,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code 
;

单击右上方的调试,单击确认。调试结果如下:

如图所示,汇率发生了两次变更,时间分别在20:16:1120:35:22。其中有一个事务订单发生的时间在20:35:14,但此时汇率还没有发生变化,所以需要采用20:16:11变更的汇率进行计算。

image

Lookup Joins

查找连接常用于利用从外部系统查询的数据实现数据扩展或补充。并非所有数据都会频繁更新,实时工作流中亦是如此。在某些情况下,您可能需要使用存储在外部的静态数据来丰富流数据。例如,商品数据可能存储在需要与Flink直接连接的关系数据库中。Flink SQL允许您查找引用数据,并使用查找连接将其与流数据结合。连接要求一个表具有时间属性,而另一个表则通过连接器进行连接,例如MySQL连接器。

Lookup Joins图解

image
说明
  • 必须加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN维表当前时刻数据快照所看到的每条数据。如果JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化。

  • ON条件中必须包含维表实际能支持随机查找的字段的等值条件。

Lookup Joins示例

此示例将展示连接外部连接器的静态数据丰富订单数据,补充商品名称。

参见Regular Joins新建ETL作业。

CREATE TEMPORARY TABLE orders ( 
    order_id STRING,
    product_id INT,
    order_total INT
) WITH (
  'connector' = 'faker',   --faker模拟数据生成连接器
  'fields.order_id.expression' = '#{Internet.uuid}',   --随机生成UUID
  'fields.product_id.expression' = '#{number.numberBetween ''1'',''5''}',   --从数字1-5中生成一个数
  'fields.order_total.expression' = '#{number.numberBetween ''1000'',''5000''}',  --从数字1000-5000中生成一个数
  'number-of-rows' = '10'  --生成的数据数量
);

--连接MySQL的商品静态数据
CREATE TEMPORARY TABLE products (
 product_id INT,
 product_name STRING
)
WITH(
  'connector' = 'mysql',
  'hostname' = '${secret_values.mysqlhost}',
  'port' = '3306',
  'username' = '${secret_values.username}',
  'password' = '${secret_values.password}',
  'database-name' = 'db2024',
  'table-name' = 'products'
);

SELECT 
    o.order_id,
    p.product_name,
    o.order_total,
  CASE 
    WHEN o.order_total > 3000 THEN 1    
    ELSE 0
  END AS is_importance          --增加字段is_importance,如果销售额大于3000则值为1,表示是否为重要订单。
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF PROCTIME() AS p --FOR SYSTEM_TIME AS OF PROCTIME()子句确保在联接运算符处理orders行时,orders的每一行都与join条件匹配的products行连接。
ON o.product_id = p.product_id;

MySQL商品数据表示例

image

单击右上方的调试,单击确认。调试结果如下:

image

更多Lookup Joins的用法详情请参见维表JOIN语句

Lateral Joins

Lateral Joins允许在FROM子句中指定子查询,针对外部查询的每一行执行此子查询,从而提高 SQL 查询的灵活性和性能,通常能够通过减少表扫描的次数来优化查询效率。然而,当内部查询复杂或处理的数据量较大时,此操作可能会导致性能下降。

image

Lateral Joins示例

此示例将合计销售订单记录,将筛选出销售记录前三的商品和销售条数。

参见Regular Joins新建ETL作业。

CREATE TEMPORARY TABLE sale (
  sale_id STRING,
  product_id INT,
  sale_num INT
)
WITH (
  'connector' = 'faker',    --faker模拟数据生成连接器
  'fields.sale_id.expression' = '#{Internet.uuid}',   --随机生成UUID
  'fields.product_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',   --从五个数字中选择生成一个数
  'fields.sale_num.expression' = '#{number.numberBetween ''1'',''10''}',  --从数字1-10中随机生成一个整数
  'number-of-rows' = '50'   --生成50条数据
);

CREATE TEMPORARY TABLE products (
 product_id INT,
 product_name STRING,
 PRIMARY KEY(product_id) NOT ENFORCED
)
WITH(
  'connector' = 'mysql',
  'hostname' = '${secret_values.mysqlhost}',
  'port' = '3306',
  'username' = '${secret_values.username}',
  'password' = '${secret_values.password}',
  'database-name' = 'db2024',
  'table-name' = 'products'
);

SELECT 
    p.product_name,
    s.total_sales
FROM  products p
LEFT JOIN LATERAL
    (SELECT SUM(sale_num) AS total_sales FROM sale WHERE sale.product_id = p.product_id) s ON TRUE
    ORDER BY total_sales DESC
    LIMIT 3;

单击右上方的调试,单击确认。调试结果如下:

image

相关文档

  • 本页导读 (1)
  • Joins概览
  • Regular Joins
  • Interval Joins
  • Temporal Joins
  • Lookup Joins
  • Lateral Joins
  • 相关文档
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等