本文将为您介绍读取Fluss数据的流批方式。
数据读取
流模式读取
支持多种消费模式,对于主键表,默认消费模式是initial,首先消费全量数据,然后再消费增量数据。
登录实时计算管理控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击。
单击+后,单击新建流作业,填写文件名称并选择引擎版本,单击创建。
编写作业代码。
SELECT * FROM `fluss-catalog`.`my_db`.`my_table`; # earliest模式 SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
批模式读取
主键表与日志表均支持 LIMIT 读取,可快速返回指定条数结果,极大便利数据预览。
主键表与日志表(0.9-ali-1.0 及以上版本)均已支持 COUNT(*) 统计查询。
主键表额外支持基于主键和索引键的精确匹配的点查功能。
登录实时计算管理控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击。
单击+后,单击新建批作业,填写文件名称并选择引擎版本和SQL方言,单击创建。
编写作业代码。
# LIMIT 读取 SELECT * FROM `fluss-catalog`.`my_db`.`my_table` LIMIT 10; # 精确查询(仅主键表) SELECT * FROM `fluss-catalog`.`my_db`.`my_pk_tbl` WHERE shop_id = 10000 and user_id = 123456; # 聚合查询 SELECT count(*) FROM `fluss-catalog`.`my_db`.`my_log_table`;
查询优化
谓词下推
在 SELECT ... WHERE 中编写的过滤条件,Flink 自动将符合条件的谓词下推到 Fluss 服务端。服务端根据每个 RecordBatch 的列统计信息评估过滤谓词,跳过不匹配的数据块,减少网络传输和反序列化开销。
支持的过滤操作符
比较操作符:
=、<>、>、>=、<、<=IN (...)IS NULL、IS NOT NULLBETWEEN ... AND ...LIKE:支持前缀匹配('abc%')、后缀匹配('%abc')和包含匹配('%abc%')逻辑组合:
AND、OR
使用限制
仅适用于日志表,主键表不支持。
过滤条件中引用的所有列必须开启列统计信息(
table.statistics.columns)。任一引用列未开启统计,该过滤条件不会被下推。
基于 batch 级统计的粗粒度过滤,可能存在 false positive(多返回数据),查询结果语义不受影响。
使用示例
步骤一:建表并开启列统计
CREATE TABLE sensor_log (
sensor_id BIGINT,
temperature DOUBLE,
humidity DOUBLE,
event_time TIMESTAMP(3)
) WITH (
'table.statistics.columns' = 'sensor_id,temperature'
);步骤二:使用 WHERE 条件查询
SELECT sensor_id, temperature, event_time
FROM sensor_log
WHERE temperature > 30.0 AND sensor_id = 1001;步骤三:通过 EXPLAIN 验证谓词下推
EXPLAIN SELECT sensor_id, temperature, event_time
FROM sensor_log
WHERE temperature > 30.0 AND sensor_id = 1001;EXPLAIN 输出中,TableSourceScan 节点的 filter=[...] 子句表明过滤条件已被下推到 Fluss 服务端。
EXPLAIN 输出中 Calc 节点仍保留过滤条件是正常行为。Flink 客户端对接收到的数据再次执行过滤,作为安全兜底。
关于谓词下推的原理和配置详情,请参见表特性。
数据查询
主键表与日志表均支持 LIMIT 读取,可快速返回指定条数结果,极大便利数据预览。
主键表与日志表(0.9-ali-1.0 及以上版本)均已支持 COUNT(*) 统计查询。
主键表额外支持基于主键和索引键的精确匹配的点查功能。
登录实时计算管理控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击。
SELECT * FROM `fluss-catalog`.`my_db`.`customer` WHERE c_custkey = 1;选中代码块,单击运行即可查看结果。
消费模式
消费模式 | 主键表 | 日志表 | 适用场景 |
full(默认) | 首先消费全量数据,然后再消费增量数据。 | 从最早的位点开始消费。 | 适用于需要处理全量数据的场景,如全量数据分析。 |
earliest | 从最早的 binlog 位点开始消费。 | 从最早的位点开始消费。 | 适用于需要处理历史变更数据的场景,如初次加载。 |
latest | 从最新的 binlog 位点开始消费, | 从最新的位点开始消费 | 适用于仅关注最新数据的场景,如实时监控。 |
timestamp | 从指定的时间(由配置项 scan.startup.timestamp 指定)开始消费 binlog。 | 从指定的时间(由配置项 scan.startup.timestamp 指定)开始消费 | 适用于基于特定时间点的数据恢复或回溯。 |
SQL示例
// 默认full模式
SELECT * FROM my_table ;
// earliest 消费模式
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
// latest 消费模式
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'latest') */;
// 使用时间戳的方式
SELECT * FROM my_table
/*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '1678883047356') */;
// 使用时间字符串的方式
SELECT * FROM my_table
/*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '2023-12-09 23:09:12') */;