搭建海量气象格点数据系统

更新时间:
复制 MD 格式

基于表格存储 Tablestore-Grid 搭建五维气象格点数据系统,支持多维切片查询和数据集多条件检索。

背景信息

气象格点数据是典型的多维时空数据,每天产生几十 TB 到上百 TB 的数据量且持续增长,对存储扩展性和多维查询性能要求极高。

一个格点数据集是一个五维结构,包含物理量(温度、湿度、风速等)、时间、高度、经度和纬度。气象预报和分析场景需要对这五个维度进行各种组合切片查询:

  • 经纬度平面查询:读取固定时间和高度下的完整平面数据,例如未来三小时全球地面温度分布。

  • 时间序列查询:读取某个格点在一段时间内的变化数据,例如某地未来 3 小时到 72 小时的温度变化。

  • 多物理量查询:读取某一时刻某一高度某一点的全部物理量数据。

  • 跨模式对比查询:对比不同气象机构或不同模式系统产生的预报数据。

传统方案采用关系型数据库和文件系统存储格点数据,随着数据规模增长,在扩展性、维护成本和查询性能上逐渐难以满足需求。

Tablestore-Grid 示例项目封装了五维格点数据的存储和查询操作。借助表格存储的自动水平扩展和多元索引的多条件检索,可以搭建支撑 PB 级数据存储和毫秒级多维查询的气象格点数据系统。

数据模型

规整的五维网格数据构成一个数据集(GridDataSet),五维结构如下。

维度

说明

variable

物理量,例如温度、湿度、风速等。

time

时间维度,例如气象预报中的预报时效(未来 3 小时、6 小时等)。

z

z 轴,表示空间高度。

x

x 轴,表示经度或纬度。

y

y 轴,表示经度或纬度。

即 GridDataSet = F(variable, time, z, x, y)。每个数据集除五维数据外,还包含以下信息。

名称

说明

GridDataSetId

数据集的唯一标识。

Attributes

自定义属性,例如数据的产生时间、来源、预报类型、精度等。对自定义属性建立多元索引后,可以通过组合条件检索符合条件的数据集。

本方案使用 meta 表和 data 表分别存储数据集的元数据和格点数据。

meta 表:主键为 GridDataSetId,属性列存储数据集的维度信息(各维大小、数据类型)和自定义属性。对 meta 表创建多元索引后,可以按来源、精度、状态、创建时间等条件进行组合查询和排序。

data 表:使用四个主键列定位每一行数据。

主键

说明

GridDataSetId

数据集的唯一标识。

Variable

物理量名称,即五维模型中的第一维。

Time

时间,即五维模型中的第二维。

Z

高度,即五维模型中的第三维。

四列主键定位一行数据,该行保存后两维(x、y)构成的二维平面数据。平面数据按块切分后存储到多个属性列中,查询时只需读取覆盖目标区域的数据块,兼顾全量读取和局部查询的性能。

快速体验

下载气象格点数据示例代码,该项目包含建表、数据录入、切片查询和多条件检索的完整实现。

  • 示例代码下载:grid.demo.zip

  • 运行前,在用户主目录创建配置文件 tablestoreConf.json,填入表格存储实例的 Endpoint、AccessKey ID、AccessKey Secret 和实例名称。

  • 准备 NetCDF 格式的测试数据文件,将文件路径配置到 ExampleConfig.java

  • 依次运行 CreateStoreExample(建表和索引)、DataImportExample(录入数据)、DataFetchExample(切片查询)和 MetaQueryExample(多条件检索)。

方案实现

以下步骤基于 Tablestore-Grid 示例项目实现完整的气象格点数据系统。

说明

Tablestore-Grid 是一个封装了 meta 表和 data 表的创建、数据写入和查询操作的示例项目,提供 GridStore、GridDataWriter 和 GridDataFetcher 三个核心接口。该项目的源码包含在示例代码包中,需配合 Tablestore SDK 5.17.4 或以上版本和 NetCDF Java 库使用。

步骤一:初始化和创建数据表

通过 TableStoreGrid 初始化客户端,调用 createStore 创建 meta 表和 data 表,然后创建多元索引以支持数据集的多条件检索。

// Initialize TableStoreGrid
TableStoreGridConfig config = new TableStoreGridConfig();
config.setTableStoreEndpoint(endpoint);
config.setAccessId(accessKeyId);
config.setAccessKey(accessKeySecret);
config.setTableStoreInstance(instanceName);
config.setDataTableName("grid_data_table");
config.setMetaTableName("grid_meta_table");

TableStoreGrid tableStoreGrid = new TableStoreGrid(config);

// Create meta table and data table
tableStoreGrid.createStore();

// Create search index on meta table for multi-condition query
IndexSchema indexSchema = new IndexSchema();
indexSchema.setFieldSchemas(Arrays.asList(
        new FieldSchema("status", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("source", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("accuracy", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("create_time", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true)
));
tableStoreGrid.createMetaIndex("grid_meta_index", indexSchema);

步骤二:录入格点数据

数据录入分为三步:写入数据集元数据、通过 GridDataWriter 按二维平面逐层录入格点数据、更新元数据状态为录入完成。以下示例读取 NetCDF 格式的气象数据文件并录入表格存储。

// Step 1: Write dataset metadata
GridDataSetMeta meta = new GridDataSetMeta(
        "forecast_20260611",           // GridDataSetId
        DataType.FLOAT,                // Data type
        Arrays.asList("temperature"),  // Variables
        24, 10, 720, 1440,            // Dimensions: time=24, z=10, x=720, y=1440
        new StoreOptions(StoreOptions.StoreType.SLICE));

Map<String, Object> attributes = new HashMap<>();
attributes.put("source", "ECMWF");
attributes.put("accuracy", "0.25deg");
attributes.put("status", "INIT");
attributes.put("create_time", System.currentTimeMillis());
meta.setAttributes(attributes);
tableStoreGrid.putDataSetMeta(meta);

// Step 2: Import data from NetCDF file
GridDataWriter writer = tableStoreGrid.getDataWriter(meta);
NetcdfFile ncFile = NetcdfFile.open("forecast_data.nc");
for (Variable variable : ncFile.getVariables()) {
    if (meta.getVariables().contains(variable.getShortName())) {
        for (int t = 0; t < meta.gettSize(); t++) {
            for (int z = 0; z < meta.getzSize(); z++) {
                Array array = variable.read(
                        new int[]{t, z, 0, 0},
                        new int[]{1, 1, meta.getxSize(), meta.getySize()});
                Grid2D grid2D = new Grid2D(
                        array.getDataAsByteBuffer(),
                        variable.getDataType(),
                        new int[]{0, 0},
                        new int[]{meta.getxSize(), meta.getySize()});
                writer.writeGrid2D(variable.getShortName(), t, z, grid2D);
            }
        }
    }
}

// Step 3: Update metadata status to DONE
attributes.put("status", "DONE");
meta.setAttributes(attributes);
tableStoreGrid.updateDataSetMeta(meta);

步骤三:查询格点数据

GridDataFetcher 支持对五维数据进行任意维度的切片查询。通过 setVariablesToGet 指定要读取的物理量,通过起始点(origin)和读取大小(shape)指定其余四维的读取范围。

GridDataFetcher fetcher = tableStoreGrid.getDataFetcher(
        tableStoreGrid.getDataSetMeta("forecast_20260611"));

// Query 1: Get a latitude-longitude plane (fixed time and height)
fetcher.setVariablesToGet(Arrays.asList("temperature"));
fetcher.setOriginShape(
        new int[]{0, 0, 0, 0},       // origin: time=0, z=0, x=0, y=0
        new int[]{1, 1, 720, 1440});  // shape: one full plane
Grid4D grid4D = fetcher.fetch().getVariable("temperature");
Array planeData = grid4D.toArray();

// Query 2: Get a time series at a specific point and height
fetcher.setOriginShape(
        new int[]{0, 0, 360, 720},    // origin: time=0, z=0, specific point
        new int[]{24, 1, 1, 1});      // shape: all 24 time steps, single point
Grid4D timeSeries = fetcher.fetch().getVariable("temperature");

// Query 3: Get an arbitrary 4D sub-space
fetcher.setOriginShape(
        new int[]{6, 2, 100, 200},    // origin: time=6, z=2, x=100, y=200
        new int[]{12, 5, 50, 80});    // shape: 12 time steps, 5 heights, 50x80 area
Grid4D subSpace = fetcher.fetch().getVariable("temperature");

步骤四:多条件检索数据集

对 meta 表建立多元索引后,可以通过 QueryBuilder 组合多个条件检索数据集。以下示例查询已完成入库、最近一天创建、来源为 ECMWF 或 NMC、精度为 0.25deg 的数据集,并按创建时间倒序排列。

// Query: (status == DONE) AND (create_time > last 24h)
//        AND (accuracy == "0.25deg")
//        AND (source == "ECMWF" OR source == "NMC")
QueryGridDataSetResult result = tableStoreGrid.queryDataSets(
        "grid_meta_index",
        QueryBuilder.and()
                .equal("status", "DONE")
                .greaterThan("create_time", System.currentTimeMillis() - 86400000)
                .equal("accuracy", "0.25deg")
                .query(QueryBuilder.or()
                        .equal("source", "ECMWF")
                        .equal("source", "NMC")
                        .build())
                .build(),
        new QueryParams(0, 10, new Sort(
                Arrays.<Sort.Sorter>asList(
                        new FieldSort("create_time", SortOrder.DESC)))));

for (GridDataSetMeta meta : result.getGridDataSetMetas()) {
    System.out.println("DataSet: " + meta.getGridDataSetId()
            + ", Attributes: " + meta.getAttributes());
}

资源清理

重要

以下操作将删除所有气象格点数据相关的表和索引,数据不可恢复。确保不再需要这些数据后再执行清理操作。

// Delete search index first, then drop tables
SyncClient syncClient = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
syncClient.deleteSearchIndex(new DeleteSearchIndexRequest("grid_meta_table", "grid_meta_index"));
syncClient.deleteTable(new DeleteTableRequest("grid_meta_table"));
syncClient.deleteTable(new DeleteTableRequest("grid_data_table"));
syncClient.shutdown();

// Or if using TableStoreGrid instance, close it first
tableStoreGrid.close();