读取数据

更新时间:
复制为 MD 格式

本文将为您介绍读取Fluss数据的流批方式。

数据读取

流模式读取

支持多种消费模式,对于主键表,默认消费模式是initial,首先消费全量数据,然后再消费增量数据。

  1. 登录实时计算管理控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击数据开发 > ETL

  4. 单击+后,单击新建流作业,填写文件名称并选择引擎版本,单击创建

  5. 编写作业代码。

    SELECT * FROM `fluss-catalog`.`my_db`.`my_table`;
    
    # earliest模式
    SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;

批模式读取

  • 主键表与日志表均支持 LIMIT 读取,可快速返回指定条数结果,极大便利数据预览。

  • 主键表与日志表(0.9-ali-1.0 及以上版本)均已支持 COUNT(*) 统计查询。

  • 主键表额外支持基于主键和索引键的精确匹配的点查功能。

  1. 登录实时计算管理控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击数据开发 > ETL

  4. 单击+后,单击新建批作业,填写文件名称并选择引擎版本SQL方言,单击创建

  5. 编写作业代码。

    # LIMIT 读取
    SELECT * FROM `fluss-catalog`.`my_db`.`my_table` LIMIT 10;
    
    # 精确查询(仅主键表)
    SELECT * FROM `fluss-catalog`.`my_db`.`my_pk_tbl` WHERE shop_id = 10000 and user_id = 123456;
    
    # 聚合查询
    SELECT count(*) FROM `fluss-catalog`.`my_db`.`my_log_table`;

查询优化

谓词下推

SELECT ... WHERE 中编写的过滤条件,Flink 自动将符合条件的谓词下推到 Fluss 服务端。服务端根据每个 RecordBatch 的列统计信息评估过滤谓词,跳过不匹配的数据块,减少网络传输和反序列化开销。

支持的过滤操作符

  • 比较操作符:=<>>>=<<=

  • IN (...)

  • IS NULLIS NOT NULL

  • BETWEEN ... AND ...

  • LIKE:支持前缀匹配('abc%')、后缀匹配('%abc')和包含匹配('%abc%'

  • 逻辑组合:ANDOR

使用限制

  • 仅适用于日志表,主键表不支持。

  • 过滤条件中引用的所有列必须开启列统计信息(table.statistics.columns)。

  • 任一引用列未开启统计,该过滤条件不会被下推。

  • 基于 batch 级统计的粗粒度过滤,可能存在 false positive(多返回数据),查询结果语义不受影响。

使用示例

步骤一:建表并开启列统计

CREATE TABLE sensor_log (
  sensor_id BIGINT,
  temperature DOUBLE,
  humidity DOUBLE,
  event_time TIMESTAMP(3)
) WITH (
  'table.statistics.columns' = 'sensor_id,temperature'
);

步骤二:使用 WHERE 条件查询

SELECT sensor_id, temperature, event_time
FROM sensor_log
WHERE temperature > 30.0 AND sensor_id = 1001;

步骤三:通过 EXPLAIN 验证谓词下推

EXPLAIN SELECT sensor_id, temperature, event_time
FROM sensor_log
WHERE temperature > 30.0 AND sensor_id = 1001;

EXPLAIN 输出中,TableSourceScan 节点的 filter=[...] 子句表明过滤条件已被下推到 Fluss 服务端。

说明

EXPLAIN 输出中 Calc 节点仍保留过滤条件是正常行为。Flink 客户端对接收到的数据再次执行过滤,作为安全兜底。

关于谓词下推的原理和配置详情,请参见表特性

数据查询

说明
  • 主键表与日志表均支持 LIMIT 读取,可快速返回指定条数结果,极大便利数据预览。

  • 主键表与日志表(0.9-ali-1.0 及以上版本)均已支持 COUNT(*) 统计查询。

  • 主键表额外支持基于主键和索引键的精确匹配的点查功能。

  1. 登录实时计算管理控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击数据查询 > 新建查询脚本

    SELECT * FROM `fluss-catalog`.`my_db`.`customer` 
    WHERE c_custkey = 1;
  4. 选中代码块,单击运行即可查看结果。

消费模式

消费模式

主键表

日志表

适用场景

full(默认)

首先消费全量数据,然后再消费增量数据。

从最早的位点开始消费。

适用于需要处理全量数据的场景,如全量数据分析。

earliest

从最早的 binlog 位点开始消费。

从最早的位点开始消费。

适用于需要处理历史变更数据的场景,如初次加载。

latest

从最新的 binlog 位点开始消费,

从最新的位点开始消费

适用于仅关注最新数据的场景,如实时监控。

timestamp

从指定的时间(由配置项 scan.startup.timestamp 指定)开始消费 binlog。

从指定的时间(由配置项 scan.startup.timestamp 指定)开始消费

适用于基于特定时间点的数据恢复或回溯。

SQL示例

// 默认full模式
SELECT * FROM my_table ;

// earliest 消费模式
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;

// latest 消费模式
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'latest') */;

// 使用时间戳的方式
SELECT * FROM my_table
/*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '1678883047356') */;

// 使用时间字符串的方式
SELECT * FROM my_table
/*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '2023-12-09 23:09:12') */;