使用Tablestore存储和查询CPFS文件元数据

更新时间:
复制为 MD 格式

文件存储CPFS广泛应用于高性能计算、AI 训练等场景,随着文件规模的增长,掌握文件系统的容量分布、访问热度和 Fileset 配额使用情况对于资源管理至关重要。本文介绍如何通过扫描工具采集 CPFS 文件元数据,将数据写入表格存储(Tablestore),并借助 Grafana 实现可视化监控,帮助运维团队实时掌握文件系统状态。

方案架构

整体数据流如下:

  1. CPFS 文件系统:扫描脚本部署在挂载了 CPFS 的 Linux 机器上,遍历指定目录,采集文件的路径、大小、访问时间、修改时间等元数据,以及 Fileset 配额信息。扫描结果先保存在本机,再写入 Tablestore 并打包上传至 OSS 归档。

  2. Tablestore:作为元数据的持久化存储,利用多元索引支持按时间范围、文件路径、Fileset 等维度进行灵活查询,并通过 SQL 映射表与 Grafana 对接。

  3. Grafana:通过阿里云 Tablestore 数据源插件连接 Tablestore,导入预置仪表盘,实现文件系统容量、文件数量、Fileset 使用情况的可视化展示。

image

准备工作

在开始操作前,需要完成以下环境和资源准备。

  1. 挂载 CPFS 文件系统:本文以智算版为例,将 CPFS 根目录挂载到 Linux 服务器的 /cpfs 目录。挂载方式请参考 CPFS 产品文档:

  2. Python 环境:确保执行扫描的机器已安装 Python 3.8 或以上版本。

  3. Tablestore 实例:已在表格存储控制台创建实例,并获取实例名称和访问地址(Endpoint)。

  4. OSS Bucket:已创建用于存放扫描结果归档文件的 OSS Bucket,并记录 Bucket 名称和所在地域。

  5. Grafana:执行可视化监控的机器已安装 Grafana 服务。

步骤一:安装依赖

在执行扫描的机器上安装所需的 Python 依赖包。

pip3 install alibabacloud-oss-v2 tablestore alibabacloud-nas20170626
说明

依赖包要求 Python 3.8 及以上版本。安装前可通过 python3 --version 确认当前版本。

步骤二:初始化 Tablestore

在正式扫描前,需要在 Tablestore 实例中创建数据表、多元索引和 SQL 映射表。init_ots.py 脚本会自动完成上述三项初始化操作。

脚本的具体功能如下:

  • 数据表:以 file_name(文件路径)为主键,存储文件元数据字段,TTL 设置为 86400 秒(1 天)。

  • 多元索引:为文件大小、时间戳、Fileset 信息等字段建立索引,支持多维度查询和聚合统计。

  • SQL 映射表:为数据表创建 SQL 语义映射,供 Grafana Tablestore 数据源插件通过 SQL 查询数据。

将以下脚本保存为 init_ots.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
OTS 初始化脚本
用于创建数据表、多元索引和 SQL 映射表
"""

import sys
import argparse
from tablestore import OTSClient, TableMeta, TableOptions, ReservedThroughput, CapacityUnit
from tablestore import FieldSchema, FieldType, SearchIndexMeta, IndexSetting, AnalyzerType, SplitAnalyzerParameter

def create_table(client, table_name):
    """
    创建数据表
    主键: file_name (String)
    TTL: 86400
    """
    print(f"[1/3] 开始创建数据表: {table_name}")

    try:
        # 定义主键 schema
        schema_of_primary_key = [('file_name', 'STRING')]

        # 构造数据表的结构信息
        table_meta = TableMeta(table_name, schema_of_primary_key)

        # 构造数据表的配置信息,TTL 设置为 86400 秒
        table_options = TableOptions(
            time_to_live=86400,
            max_version=1,
            max_time_deviation=86400,
            allow_update=False
        )

        # 创建数据表时必须设置预留读写吞吐量,默认值为 0
        reserved_throughput = ReservedThroughput(CapacityUnit(0, 0))

        # 发起创建表请求
        client.create_table(table_meta, table_options, reserved_throughput)

        print(f"✓ 数据表创建成功: {table_name}")
        return True

    except Exception as e:
        print(f"✗ 数据表创建失败: {str(e)}")
        return False


def create_search_index(client, table_name, index_name):
    """
    创建多元索引
    TTL: 86400
    """
    print(f"[2/3] 开始创建多元索引: {index_name}")

    try:
        # 定义字段 schema
        fields = [
            # file_name: Text 类型,使用分隔符分词,分隔符为 /,大小写敏感
            FieldSchema('file_name', FieldType.TEXT, index=True,
                        analyzer=AnalyzerType.SPLIT,
                        analyzer_parameter=SplitAnalyzerParameter("/")),

            # aTime: Long 类型
            FieldSchema('aTime', FieldType.LONG, index=True),

            # wTime: Long 类型
            FieldSchema('wTime', FieldType.LONG, index=True),

            # cluster_id: Keyword 类型
            FieldSchema('cluster_id', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),

            # fileSize: Long 类型
            FieldSchema('fileSize', FieldType.LONG, index=True),

            # cTime: Long 类型
            FieldSchema('cTime', FieldType.LONG, index=True),

            # region: Keyword 类型
            FieldSchema('region', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),

            # mTime: Long 类型
            FieldSchema('mTime', FieldType.LONG, index=True),

            # fileset_id: Keyword 类型
            FieldSchema('fileset_id', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),

            # fileset_name: Keyword 类型
            FieldSchema('fileset_name', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),

            # fileset_file_count_quota: Long 类型
            FieldSchema('fileset_file_count_quota', FieldType.LONG, index=True),

            # fileset_size_quota: Long 类型
            FieldSchema('fileset_size_quota', FieldType.LONG, index=True),
        ]

        # 设置索引配置
        index_setting = IndexSetting(routing_fields=['file_name'])

        # 构造多元索引的 meta 信息
        index_meta = SearchIndexMeta(
            fields,
            index_setting=index_setting,
            index_sort=None,
            time_to_live=86400
        )

        # 发起创建多元索引请求
        client.create_search_index(table_name, index_name, index_meta)

        print(f"✓ 多元索引创建成功: {index_name}")
        return True

    except Exception as e:
        print(f"✗ 多元索引创建失败: {str(e)}")
        return False


def create_mapping_table(client, table_name):
    """
    创建 SQL 映射表
    """
    print(f"[3/3] 开始创建 SQL 映射表")

    try:
        # 构造 SQL 语句
        sql_query = f"""CREATE TABLE `{table_name}` (
            `file_name` VARCHAR(1024),
            `aTime` BIGINT(20),
            `wTime` BIGINT(20),
            `cluster_id` MEDIUMTEXT,
            `fileSize` BIGINT(20),
            `cTime` BIGINT(20),
            `region` MEDIUMTEXT,
            `mTime` BIGINT(20),
            `fileset_id` MEDIUMTEXT,
            `fileset_name` MEDIUMTEXT,
            `fileset_file_count_quota` BIGINT(20),
            `fileset_size_quota` BIGINT(20),
            PRIMARY KEY(`file_name`)
        )"""

        # 执行 SQL 语句
        client.exe_sql_query(sql_query)

        print(f"✓ SQL 映射表创建成功")
        return True

    except Exception as e:
        print(f"✗ SQL 映射表创建失败: {str(e)}")
        return False


def main():
    """
    主函数
    """
    print("=" * 60)
    print("OTS 初始化脚本")
    print("=" * 60)

    # 解析命令行参数
    parser = argparse.ArgumentParser(description='OTS 初始化脚本 - 创建数据表、多元索引和 SQL 映射表')
    parser.add_argument('--endpoint', required=True, help='OTS 实例的访问地址')
    parser.add_argument('--instance-name', required=True, help='OTS 实例名称')
    parser.add_argument('--access-key-id', required=True, help='Access Key ID')
    parser.add_argument('--access-key-secret', required=True, help='Access Key Secret')
    parser.add_argument('--table-name', required=True, help='要创建的数据表名称')
    parser.add_argument('--search-index-name', required=True, help='要创建的多元索引名称')

    args = parser.parse_args()

    # 打印配置信息
    print("\n配置信息:")
    print(f"  Endpoint: {args.endpoint}")
    print(f"  Instance Name: {args.instance_name}")
    print(f"  Table Name: {args.table_name}")
    print(f"  Search Index Name: {args.search_index_name}")
    print()

    try:
        # 初始化 OTS 客户端
        print("正在连接 OTS 实例...")
        client = OTSClient(
            args.endpoint,
            args.access_key_id,
            args.access_key_secret,
            args.instance_name
        )
        print("✓ 连接成功\n")

        # 执行初始化步骤
        success = True

        # 步骤1: 创建数据表
        if not create_table(client, args.table_name):
            success = False
            sys.exit(1)

        print()

        # 步骤2: 创建多元索引
        if not create_search_index(client, args.table_name, args.search_index_name):
            success = False
            sys.exit(1)

        print()

        # 步骤3: 创建 SQL 映射表
        if not create_mapping_table(client, args.table_name):
            success = False
            sys.exit(1)

        print()
        print("=" * 60)
        if success:
            print("✓ 所有初始化步骤完成!")
        else:
            print("✗ 初始化过程中出现错误")
        print("=" * 60)

    except Exception as e:
        print(f"\n✗ 脚本执行失败: {str(e)}")
        sys.exit(1)


if __name__ == "__main__":
    main()

执行以下命令运行初始化脚本。

python3 init_ots.py \
  --endpoint https://<实例名>.<region-id>.ots.aliyuncs.com \
  --instance-name <实例名> \
  --access-key-id <AccessKey ID> \
  --access-key-secret <AccessKey Secret> \
  --table-name bmcpfs_test \
  --search-index-name bmcpfs_test_idx
说明
  • --table-name--search-index-name 使用示例值 bmcpfs_testbmcpfs_test_idx,可根据实际需求自定义。若修改了表名和索引名,需在后续步骤中同步更新 bmcpfs_stat_tool_config.json 中的对应配置,以及 Grafana 仪表盘中的变量。

  • --endpoint 格式为 https://<实例名>.<region-id>.ots.aliyuncs.com,可在表格存储控制台的实例详情页获取。

步骤三:部署扫描脚本

在执行扫描的机器上创建部署目录(本文以 /opt/cpfs-scanner 为例),并在该目录下创建以下文件。

  1. 创建 meta_stat.py,内容如下。

    import argparse
    import concurrent
    import csv
    import fcntl
    import json
    import logging
    import os
    import re
    import shutil
    import subprocess
    import sys
    import tarfile
    import time
    from collections import OrderedDict
    from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor
    from datetime import datetime, timezone
    from logging.handlers import RotatingFileHandler
    from typing import Dict, Optional
    
    import alibabacloud_oss_v2 as oss
    import tablestore
    from alibabacloud_oss_v2.exceptions import OperationError
    from alibabacloud_nas20170626.client import Client as NAS20170626Client
    from alibabacloud_credentials.client import Client as CredentialClient
    from alibabacloud_credentials.models import Config as CredentialConfig
    from alibabacloud_tea_openapi import models as open_api_models
    from alibabacloud_nas20170626 import models as nas20170626_models
    from alibabacloud_tea_util import models as util_models
    from tablestore import PutRowItem
    
    VERSION = "1.3a"
    PUT = "PUT"
    UPDATE = "UPDATE"
    IGNORE = "IGNORE"
    NEW_ATIME = "NEW_ATIME"
    AVAILABLE_OTS_COLUMN_CONDITION = [IGNORE, NEW_ATIME]
    AVAILABLE_OTS_WRITE_TYPE = [UPDATE, PUT]
    TABLESTORE_BATCH_SIZE = 200
    # 设置日志
    SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
    LOG_FILE = os.path.join(SCRIPT_DIR, 'meta_stat.log')
    
    # 初始化 logger
    logger = logging.getLogger('meta_stat')
    logger.setLevel(logging.DEBUG)
    
    # 控制台 handler
    console_handler = logging.StreamHandler()
    console_formatter = logging.Formatter('%(levelname)s: %(message)s')
    console_handler.setFormatter(console_formatter)
    console_handler.setLevel(logging.INFO)
    logger.addHandler(console_handler)
    
    # 文件 handler(带滚动)
    file_handler = RotatingFileHandler(LOG_FILE, maxBytes=1024 * 1024 * 5, backupCount=5)
    file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(file_formatter)
    file_handler.setLevel(logging.DEBUG)
    logger.addHandler(file_handler)
    
    # 其他常量定义
    PYTHON_FILE_PATH = os.path.abspath(__file__)
    SCRIPT_DIR = os.path.dirname(PYTHON_FILE_PATH)
    os.chdir(SCRIPT_DIR)  # 切换到脚本所在目录
    PYTHON_PATH = sys.executable
    DATE = datetime.now().strftime("%Y%m%dT%H%M")
    SYSTEM_DIRS = [".sys_recyclebin", ".snapshots", ".sys_healthchecker",
                   ".audit_log", ".msgq", ".fakesnapshots"]
    
    thread_executor = ThreadPoolExecutor(max_workers=10)
    MAX_OTS_FUTURES = 10
    
    
    def read_from_json(key, filename, default=None):
        config_path = os.path.join(SCRIPT_DIR, filename)
        if not os.path.exists(config_path):
            if default is not None:
                return default
            raise FileNotFoundError(f"找不到凭证文件: {config_path}")
        with open(config_path, 'r') as f:
            try:
                config_json = json.load(f)
            except json.JSONDecodeError:
                raise ValueError(f".meta_stat_key.json 文件格式错误,请检查 JSON 格式是否正确。路径: {config_path}")
        value = config_json.get(key)
        if value is None:
            if default is not None:
                return default
            raise ValueError(
                f".meta_stat_key.json 缺少必要字段: {key}。请确认文件内容包含 access_key_id 和 access_key_secret")
        return value
    
    
    BASE_PATH = read_from_json("BASE_PATH", '.meta_stat_key.json')
    if not BASE_PATH.endswith('/'):
        BASE_PATH += '/'
    OUTPUT_DIR_PREFIX = read_from_json("OUTPUT_DIR_PREFIX", '.meta_stat_key.json')
    OSS_CONFIG_FILE_NAME = os.path.basename(read_from_json("OSS_KEY", '.meta_stat_key.json'))
    
    
    class OssTool:
        def __init__(self):
            self.region = read_from_json('OSS_REGION', '.meta_stat_key.json')
            self.bucket = read_from_json('OSS_BUCKET', '.meta_stat_key.json')
            self.key = read_from_json('OSS_KEY', '.meta_stat_key.json')
            self.endpoint = read_from_json('OSS_ENDPOINT', '.meta_stat_key.json')
            self.client = None
    
        def init_client(self):
            if self.client is not None:
                return
    
            try:
                credentials_provider = oss.credentials.StaticCredentialsProvider(
                    access_key_id=read_from_json('oss_access_key_id', '.meta_stat_key.json'),
                    access_key_secret=read_from_json('oss_access_key_secret', '.meta_stat_key.json')
                )
            except Exception as e:
                raise RuntimeError(f"无法加载本地密钥文件,请确认 .meta_stat_key.json 是否存在并正确配置。错误: {e}")
    
            cfg = oss.config.load_default()
            cfg.credentials_provider = credentials_provider
            cfg.region = self.region
            if self.endpoint:
                cfg.endpoint = self.endpoint
            self.client = oss.Client(cfg)
    
        def download_file_if_newer(self):
            self.init_client()
            _file_path = os.path.join(SCRIPT_DIR, os.path.basename(self.key))
            try:
                head_result = self.client.head_object(oss.HeadObjectRequest(bucket=self.bucket, key=self.key))
                if_modified_since = self.get_local_file_mtime(_file_path)
    
                if head_result.last_modified <= if_modified_since:
                    logger.info("Time:%s, 文件未更新,跳过下载。" % datetime.now())
                    return False
    
                result = self.client.get_object_to_file(oss.GetObjectRequest(bucket=self.bucket, key=self.key),
                                                        _file_path)
    
                if result.status_code == 200:
                    logger.info(f"文件已更新并保存至: {_file_path}")
                    return True
                else:
                    logger.warning(f"下载失败,状态码: {result.status_code}")
                    return False
    
            except OperationError as e:
                error = e.kwargs.get('error')
                if error and error.status_code == 304:
                    logger.info("Time:%s, 文件未更新,跳过下载。" % datetime.now())
                    return False
                else:
                    logger.error(f"请求异常: {e}", exc_info=True)
                    return False
            except Exception as e:
                logger.exception(f"发生异常: {e}")
                return False
    
        def upload_file_to_oss(self, local_file_path, oss_key):
            self.init_client()
    
            request = oss.PutObjectRequest(
                bucket=self.bucket,
                key=oss_key,
                body=open(local_file_path, 'rb')
            )
    
            try:
                result = self.client.put_object(request)
    
                if result.status_code == 200:
                    logger.info(f"文件已上传至 OSS: oss://{self.bucket}/{oss_key}")
                    return True
                else:
                    logger.warning(f"上传失败,状态码: {result.status_code}")
                    return False
    
            except OperationError as e:
                error = e.kwargs.get('error')
                if error and error.status_code == 404:
                    logger.error("目标Bucket或路径不存在,请确认配置正确性。")
                else:
                    logger.error(f"请求异常: {e}", exc_info=True)
                return False
            except Exception as e:
                logger.exception(f"发生异常: {e}")
                return False
    
        def upload_dir_to_oss(self, new_dir_path, oss_path):
            # 记录时间
            start_time = time.time()
            for file in os.listdir(new_dir_path):
                if file.endswith(".csv"):
                    file_path = os.path.join(new_dir_path, file)
                    self.upload_file_to_oss(file_path, os.path.join(oss_path, file))
            logger.info(f"oss上传完成,上传耗时 {time.time() - start_time} 秒")
    
        @staticmethod
        def get_local_file_mtime(_file_path):
            if not os.path.exists(_file_path):
                return datetime.fromtimestamp(0, tz=timezone.utc)
            mtime = os.path.getmtime(_file_path)
            return datetime.fromtimestamp(mtime, tz=timezone.utc)
    
    
    class TablestoreWriter:
        def __init__(self):
            ots_config = read_from_json("otsConfig", OSS_CONFIG_FILE_NAME)
            self.table_name = ots_config.get("tableName")
            self.endpoint = ots_config.get("endpoint")
            self.instance_name = ots_config.get("instanceName")
            self.row_exist_exception_str = ots_config.get("rowExistenceException")
            self.column_condition = ots_config.get("columnCondition")
            self.ots_write_type = ots_config.get("writeType")
            self.access_key_id = read_from_json("ots_access_key_id", '.meta_stat_key.json')
            self.access_key_secret = read_from_json("ots_access_key_secret", '.meta_stat_key.json')
            self._tablestore_client = tablestore.OTSClient(
                self.endpoint,
                self.access_key_id,
                self.access_key_secret,
                self.instance_name
            )
    
        def single_put(self, row: tablestore.PutRowItem) -> bool:
            retry_count = 20
            for i in range(retry_count):
                try:
                    self._tablestore_client.put_row(self.table_name, row.row)
                    return True
                except Exception as e:
                    if i == retry_count - 1:
                        print("update single row failed", e)
                        return False
            return True
    
        def single_update(self, row: tablestore.UpdateRowItem) -> bool:
            retry_count = 20
            for i in range(retry_count):
                try:
                    self._tablestore_client.update_row(self.table_name, row.row, row.condition)
                    return True
                except Exception as e:
                    if i == retry_count - 1:
                        print("update single row failed", e)
                        return False
    
        @staticmethod
        def batch_write(rows: list[tablestore.UpdateRowItem]) -> int:
            global ots_writer, thread_executor
            if ots_writer is None:
                ots_writer = TablestoreWriter()
    
            succeed_cnt = 0
            row_items = []
            cur_batch_size = 0
            futures = set()
            for idx, updateItem in enumerate(rows):
                row_items.append(updateItem)
                cur_batch_size = cur_batch_size + 1
                if cur_batch_size == TABLESTORE_BATCH_SIZE or idx == len(rows) - 1:
                    batch_write_req = tablestore.BatchWriteRowRequest()
                    batch_write_req.add(tablestore.TableInBatchWriteRowItem(ots_writer.table_name, list(row_items)))
                    row_items.clear()
                    futures.add(
                        thread_executor.submit(TablestoreWriter.single_batch_update, batch_write_req, cur_batch_size))
                    cur_batch_size = 0
            for future in futures:
                succeed_cnt += future.result()
            return succeed_cnt
    
        @staticmethod
        def single_batch_update(batch_write_req, cur_batch_size):
            succeed_cnt = 0
            try:
                batch_write_row_response = ots_writer._tablestore_client.batch_write_row(batch_write_req)
                if not batch_write_row_response.is_all_succeed():
                    succeed_cnt += len(batch_write_row_response.get_succeed_of_update())
                    succeed_cnt += len(batch_write_row_response.get_succeed_of_put())
                    # update row item
                    failed_update = batch_write_row_response.get_failed_of_update()
                    for row_item in failed_update:
                        if not row_item.is_ok and row_item.error_code != "OTSConditionCheckFail":
                            logger.warning("update single row failed")
                            single_update_item = batch_write_req.items[ots_writer.table_name].row_items[row_item.index]
                            if ots_writer.single_update(single_update_item):
                                succeed_cnt += 1
    
                    # put row item
                    failed_put = batch_write_row_response.get_failed_of_put()
                    for row_item in failed_put:
                        if not row_item.is_ok:
                            logger.warning("put single row failed")
                            single_put_item = batch_write_req.items[ots_writer.table_name].row_items[row_item.index]
                            if ots_writer.single_put(single_put_item):
                                succeed_cnt += 1
                else:
                    succeed_cnt += cur_batch_size
            except Exception as e:
                print("batch update failed: ", e)
            return succeed_cnt
    
        @staticmethod
        def create_put_row_item(pk: list[tuple], columns: list[tuple],
                                column_conditions: list[tablestore.ColumnCondition]) -> PutRowItem:
            global ots_writer
            if ots_writer is None:
                ots_writer = TablestoreWriter()
            if pk is None or len(pk) != 1:
                return None
            row = tablestore.Row(pk, columns)
            return tablestore.PutRowItem(row,
                                         tablestore.Condition(ots_writer.row_exist_exception_str, column_conditions[0]))
    
        @staticmethod
        def create_update_row_item(pk: list[tuple], columns: list[tuple],
                                   column_conditions: list[tablestore.ColumnCondition]) -> tablestore.UpdateRowItem:
            global ots_writer
            if ots_writer is None:
                ots_writer = TablestoreWriter()
            if pk is None:
                return None
            row = tablestore.Row(pk, {"update": columns})
            if column_conditions is None or len(column_conditions) == 0:
                return tablestore.UpdateRowItem(row, None)
            elif len(column_conditions) == 1:
                return tablestore.UpdateRowItem(row, tablestore.Condition(ots_writer.row_exist_exception_str,
                                                                          column_conditions[0]))
            else:
                composite_condition = tablestore.CompositeColumnCondition(tablestore.LogicalOperator.AND)
                for condition in column_conditions:
                    composite_condition.add_sub_condition(condition)
                row_conditions = tablestore.Condition(ots_writer.row_exist_exception_str, composite_condition)
                return tablestore.UpdateRowItem(row, row_conditions)
    
        def extract_pk(self, pk_fields, _dict) -> list[tuple]:
            ots_pk_fields = []
            for field_key, field_value in pk_fields.items():
                ots_pk_field = TablestoreWriter.get_ots_update_field(_dict, field_key, field_value)
                ots_pk_fields.append(ots_pk_field)
            return ots_pk_fields
    
        @staticmethod
        def less_than_condition(column_name: str, time_stamp: int) -> tablestore.SingleColumnCondition:
            return tablestore.SingleColumnCondition(column_name=column_name, column_value=time_stamp,
                                                    comparator=tablestore.ComparatorType.LESS_THAN)
    
        @staticmethod
        def upload_dir_to_ots(new_dir_path):
            """
            过滤出文件夹下以 ots 结尾的文件,
            每次读取 200 行 CSV 数据并写入 Tablestore,
            写完后删除原始文件。
            """
            global ots_writer
            if ots_writer is None:
                ots_writer = TablestoreWriter()
            with ProcessPoolExecutor(max_workers=MAX_OTS_FUTURES) as executor:
                start_time = time.time()
                # 记录时间
                for file in os.listdir(new_dir_path):
                    if file.endswith(".ots"):
                        file_path = os.path.join(new_dir_path, file)
                        TablestoreWriter.upload_csv_to_ots(executor, file_path)
                        # 删除已完成上传的文件
                        os.remove(file_path)
                        logger.info(f"已删除文件: {file_path},已用耗时 {time.time() - start_time} 秒")
                logger.info(f"ots上传完成,上传耗时 {time.time() - start_time} 秒")
    
        @staticmethod
        def upload_csv_to_ots(executor, file_path):
            global ots_writer
            logger.info(f"开始上传文件: {file_path}")
            start_time = time.time()
            if ots_writer is None:
                ots_writer = TablestoreWriter()
            fields = read_from_json('fields', OSS_CONFIG_FILE_NAME)
            fields_value_to_key = {value: key for key, value in fields.items()}
            pk_fields = read_from_json('pkFields', OSS_CONFIG_FILE_NAME)
            with open(file_path, 'r', newline='', encoding='utf-8') as f:
                future_set = set()
                timestamp_ms = int(time.time() * 1000)
                reader = csv.reader(f)
                _ = next(reader)  # 跳过表头
                rows = []
                for idx, row in enumerate(reader):
                    if len(row) != 6:
                        logger.warning(f"跳过非法行 {idx + 1},字段数量不正确: {row}")
                        continue
                    path, size, inode, atime_ms, mtime_ms, ctime_ms = row
                    try:
                        _dict = {
                            'path': path,
                            'size': int(size),
                            'inode': int(inode),
                            'atime_ms': int(atime_ms),
                            'mtime_ms': int(mtime_ms),
                            'ctime_ms': int(ctime_ms),
                            'wtime_ms': int(timestamp_ms)
                        }
                    except ValueError as e:
                        logger.error(f"数值转换失败 at line {idx + 1}: {e}")
                        continue
    
                    # 构造主键 (assetId, fileName)
                    pk = ots_writer.extract_pk(pk_fields, _dict)
                    if not pk:
                        logger.warning(f"无法提取主键 from _dict: {_dict}")
                        continue
                    if ots_writer.column_condition == IGNORE:
                        conditions = [None]
                    elif ots_writer.column_condition == NEW_ATIME:
                        conditions = [TablestoreWriter.less_than_condition(fields_value_to_key['$atime_ms'],
                                                                           _dict['atime_ms'])]
                    else:
                        conditions = [None]
                    if ots_writer.ots_write_type == UPDATE:
                        # todo 这里的conditions后续可考虑改为配置的形式
                        ots_fields = []
                        for field_key, field_value in fields.items():
                            ots_field = TablestoreWriter.get_ots_update_field(_dict, field_key, field_value)
                            if ots_field is not None:
                                ots_fields.append(ots_field)
                        update_row_item = TablestoreWriter.create_update_row_item(pk, ots_fields, conditions)
                        if update_row_item:
                            rows.append(update_row_item)
                    elif ots_writer.ots_write_type == PUT:
                        ots_fields = []
                        for field_key, field_value in fields.items():
                            ots_field = TablestoreWriter.get_ots_update_field(_dict, field_key, field_value)
                            if ots_field is not None:
                                ots_fields.append(ots_field)
                        put_row_item = TablestoreWriter.create_put_row_item(pk, ots_fields, conditions)
                        if put_row_item:
                            rows.append(put_row_item)
                    else:
                        logger.error(f"未知的 ots 写入类型: {ots_writer.ots_write_type}")
                        continue
    
                    # 每 10 * TABLESTORE_BATCH_SIZE 行提交一次
                    if len(rows) >= 10 * TABLESTORE_BATCH_SIZE:
                        future_set.add(executor.submit(TablestoreWriter.batch_write, list(rows)))
                        rows.clear()
    
                        # 控制最大未完成 futures 数量
                        if len(future_set) >= MAX_OTS_FUTURES:
                            done, future_set = concurrent.futures.wait(
                                future_set,
                                return_when=concurrent.futures.FIRST_COMPLETED
                            )
    
                            # 可选:检查异常
                            for future in done:
                                try:
                                    future.result()
                                except Exception as e:
                                    logger.error(f"Tablestore 提交失败: {e}")
                # 提交剩余的数据
                if rows:
                    future_set.add(executor.submit(TablestoreWriter.batch_write, list(rows)))
                # 等待所有任务完成,并处理异常
                for future in as_completed(future_set):
                    try:
                        future.result()
                    except Exception as e:
                        logger.info(f"Batch failed with exception: {e}")
                logger.info(f"Tablestore 提交完成,耗时 {time.time() - start_time} 秒")
    
        @staticmethod
        def get_ots_update_field(_dict, field_key, field_value):
            global nas_client
            if field_value[0] != '$':
                return field_key, field_value
            else:
                if field_value == "$dajiang_asset_Id":
                    # 分割路径并寻找符合条件的资产ID
                    path = _dict['path']
                    parts = path.strip('/').split('/')
                    for i in range(len(parts) - 1):
                        # 检查当前部分是否是10位,且前一部分是其前三位
                        if len(parts[i + 1]) == 10 and parts[i] == parts[i + 1][:3]:
                            return field_key, parts[i + 1]
                    return None
                if field_value == "$dajiang_file_name":
                    # 分割路径并寻找符合条件的资产ID
                    path = _dict['path']
                    parts = path.strip('/').split('/')
                    for i in range(len(parts) - 1):
                        # 检查当前部分是否是10位,且前一部分是其前三位
                        if len(parts[i + 1]) == 10 and parts[i] == parts[i + 1][:3]:
                            return field_key, '/'.join(parts[i + 1:])
                if field_value in ["$fset_id", "$fset_name", "$fset_file_count_quota", "$fset_size_quota"]:
                    # 获取文件对应的 fileset 信息
                    global nas_client
                    if nas_client is None:
                        logger.warning("NAS 客户端未初始化,无法获取 fileset 信息")
                        return None
                    
                    # 从配置中获取 fsid
                    fsid = read_from_json("fsid", OSS_CONFIG_FILE_NAME, None)
                    if not fsid:
                        logger.warning("无法从配置中获取 fsid")
                        return None
                    
                    # 确保是完整的 fsid(包含前缀)
                    if not fsid.startswith('bmcpfs-') and not fsid.startswith('cpfs-'):
                        full_fsid = f"bmcpfs-{fsid}"
                    else:
                        full_fsid = fsid
                    
                    # 获取文件路径并查找对应的 fileset 信息
                    file_path = _dict.get('path', '')
                    fset_info = nas_client.get_fset_info_by_path(full_fsid, file_path)
                    
                    if fset_info:
                        # 根据字段类型返回对应的值
                        if field_value == "$fset_id":
                            return field_key, fset_info['fset_id']
                        elif field_value == "$fset_name":
                            return field_key, fset_info['description']
                        elif field_value == "$fset_file_count_quota":
                            v = fset_info['file_count_limit']
                            return (field_key, v) if v is not None else None
                        elif field_value == "$fset_size_quota":
                            v = fset_info['size_limit']
                            return (field_key, v) if v is not None else None
                    else:
                        logger.debug(f"无法找到路径 {file_path} 对应的 fileset 信息")
                        return None
                return field_key, _dict.get(field_value[1:], field_value)
    
    
    class NasClient:
        """NAS OpenAPI 客户端,用于获取 fileset 信息"""
        def __init__(self):
            self.client = None
            # 缓存 fileset 信息 {fsid: [{fset_id, file_system_path, description, quota}, ...]}
            self.fileset_cache = {}
            self.last_cache_time = {}  # 记录每个 fsid 的缓存时间
            self.cache_ttl = 3600  # 缓存有效期 1 小时
    
        def init_client(self, access_key_id: str, access_key_secret: str, endpoint: str):
            """初始化 NAS 客户端"""
            if self.client is not None:
                return
    
            try:
                credentials_config = CredentialConfig(
                    type='access_key',
                    access_key_id=access_key_id,
                    access_key_secret=access_key_secret
                )
                credentials_client = CredentialClient(credentials_config)
    
                config = open_api_models.Config(
                    credential=credentials_client,
                    endpoint=endpoint
                )
                self.client = NAS20170626Client(config)
                logger.info("NAS 客户端初始化成功")
            except Exception as e:
                logger.error(f"NAS 客户端初始化失败: {e}")
                raise RuntimeError(f"无法初始化 NAS 客户端: {e}")
    
        def get_filesets(self, fsid: str) -> Optional[list]:
            """获取指定文件系统的所有 fileset 信息,使用缓存"""
            current_time = time.time()
    
            # 检查缓存是否存在且未过期
            if fsid in self.fileset_cache and fsid in self.last_cache_time:
                if current_time - self.last_cache_time[fsid] < self.cache_ttl:
                    logger.debug(f"使用缓存的 fileset 信息: {fsid}")
                    return self.fileset_cache[fsid]
    
            # 调用 API 获取 fileset 信息
            try:
                request = nas20170626_models.DescribeFilesetsRequest()
                request.file_system_id = fsid
                runtime = util_models.RuntimeOptions()
    
                resp = self.client.describe_filesets_with_options(request, runtime)
    
                if resp.body and resp.body.entries and resp.body.entries.entrie:
                    filesets = []
                    for entry in resp.body.entries.entrie:
                        if entry.fset_id and entry.file_system_path:
                            # 提取 quota 信息(可能为空)
                            file_count_limit = None
                            size_limit = None
                            if entry.quota:
                                file_count_limit = entry.quota.file_count_limit
                                size_limit = entry.quota.size_limit
                            
                            filesets.append({
                                'fset_id': entry.fset_id,
                                'file_system_path': entry.file_system_path,
                                'description': entry.description,
                                'file_count_limit': file_count_limit,
                                'size_limit': size_limit
                            })
    
                    # 更新缓存
                    self.fileset_cache[fsid] = filesets
                    self.last_cache_time[fsid] = current_time
                    logger.info(f"成功获取 {fsid} 的 {len(filesets)} 个 fileset 信息")
                    return filesets
                else:
                    logger.warning(f"文件系统 {fsid} 没有找到任何 fileset")
                    return []
    
            except Exception as e:
                logger.error(f"获取 fileset 信息失败: {e}")
                return None
    
        def get_fset_info_by_path(self, fsid: str, file_path: str) -> Optional[dict]:
            """根据文件路径获取对应的 fileset 完整信息"""
            filesets = self.get_filesets(fsid)
            if filesets is None:
                return None
    
            # 查找最匹配的 fileset(最长路径匹配)
            matched_fset = None
            matched_path_length = -1
    
            for fset in filesets:
                fs_path = fset['file_system_path']
                # 确保 file system path 以 / 结尾,便于匹配
                if not fs_path.endswith('/'):
                    fs_path += '/'
    
                # 检查文件路径是否以 file system path 为前缀
                if file_path.startswith(fs_path):
                    # 选择路径最长的匹配(最精确的匹配)
                    if len(fs_path) > matched_path_length:
                        matched_path_length = len(fs_path)
                        matched_fset = fset
    
            return matched_fset
    
    oss_tool: OssTool = None
    ots_writer: TablestoreWriter = None
    nas_client: NasClient = None
    
    
    def run_bmstat_for_dir(mode, fsid):
        input_dir = mode.input_dir
        if not check_path_exists_with_sudo(input_dir):
            logger.error(f"Input directory {input_dir} does not exist")
            return
        temp_output_dir = f"{OUTPUT_DIR_PREFIX}{fsid}/"
        os.makedirs(temp_output_dir, exist_ok=True)
        # 如果input_dir 满足/cpfs/{fsid}/
        if input_dir == f"/cpfs/{fsid}/":
            exclude_dirs = [f"{input_dir}{d}" for d in SYSTEM_DIRS]
        else:
            exclude_dirs = []
        exclude_dirs.extend(mode.exclude_dir_set)
    
        # 构造子命令
        command = [
            "sudo",
            PYTHON_PATH, "nas_stat_util.py",
            "-t", "inode_list",
            "-i", input_dir,
            "--backup-count", "0",
            "--output-format", "path,size,inode,atime_ms,mtime_ms,ctime_ms",
            "--filter-inode-type", "regular_file",
            "--output-without-prefix", BASE_PATH[0:len(BASE_PATH) - 1],
            "--use-csv-format"
        ]
    
        if exclude_dirs:
            command.extend(["--exclude-full-path-dirs", ",".join(exclude_dirs)])
        if mode.ots_output_path_set:
            command.extend(["--ots-output-path", ",".join(mode.ots_output_path_set)])
        if mode.output_path_set:
            command.extend(["-o", ",".join([f"{temp_output_dir}{d}" for d in mode.output_path_set])])
    
        logger.info(f"Running command: {' '.join(command)}")
        try:
            result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            logger.info(f"Finished {mode.input_dir}: {result.stdout.decode()}")
        except subprocess.CalledProcessError as e:
            logger.error(f"Error running command for {mode.input_dir}: {e.stderr.decode()}")
    
    
    def update_crontab(new_cron_expr):
        """
        查找 crontab 中包含 {PYTHON_FILE_PATH} --type run 的行,
        将其前面的 cron 表达式替换为 new_cron_expr。
        如果不存在则添加一行新的。
        """
        try:
            # 获取当前 crontab 内容
            result = subprocess.run(['crontab', '-l'], capture_output=True, text=True)
            if result.returncode not in (0, 1):  # 1 是空文件的正常状态码
                logger.error(f"无法读取 crontab: {result.stderr}")
                return
    
            lines = result.stdout.splitlines()
            updated = False
            new_lines = []
    
            pattern = re.compile(
                r'.*{PYTHON_FILE_PATH}\s+--type\s+run.*'.format(PYTHON_FILE_PATH=re.escape(PYTHON_FILE_PATH)))
    
            for line in lines:
                if pattern.search(line):
                    # 匹配成功,替换前5个字段(即 cron 表达式)
                    parts = line.strip().split(maxsplit=5)
                    if len(parts) >= 6:
                        command_part = parts[5]
                        new_line = f"{new_cron_expr} {command_part}"
                        new_lines.append(new_line)
                        updated = True
                    else:
                        new_lines.append(line)
                else:
                    new_lines.append(line)
            # 如果没有找到匹配项,添加新行
            if not updated:
                new_cron_line = f"{new_cron_expr} {PYTHON_PATH} {PYTHON_FILE_PATH} --type run"
                new_lines.append(new_cron_line)
                logger.info("未找到匹配行,已新增 cron 任务。")
    
            # 写回新的 crontab
            new_crontab = '\n'.join(new_lines) + '\n'
            subprocess.run(['crontab', '-'], input=new_crontab, text=True, check=True)
            logger.info("Crontab 已更新。")
    
        except Exception as e:
            logger.error(f"更新 crontab 失败: {e}")
    
    
    def remove_crontab_entry():
        """
        删除 crontab 中包含指定命令的行。
        """
        try:
            result = subprocess.run(['crontab', '-l'], capture_output=True, text=True)
            if result.returncode not in (0, 1):  # 1 是空文件时的正常状态码
                logger.error(f"无法读取 crontab: {result.stderr}")
                return
    
            lines = result.stdout.splitlines()
            new_lines = []
    
            for line in lines:
                # 匹配包含 PYTHON_FILE_PATH 和 --type run 的行
                if re.search(r'.*{PYTHON_FILE_PATH}\s+--type\s+run.*'.format(PYTHON_FILE_PATH=PYTHON_FILE_PATH), line):
                    logger.info(f"移除 cron 行: {line}")
                    continue
                new_lines.append(line)
    
            # 写回新的 crontab
            new_crontab = '\n'.join(new_lines) + '\n'
            subprocess.run(['crontab', '-'], input=new_crontab, text=True, check=True)
            logger.info("Crontab 已清理。")
    
        except Exception as e:
            logger.error(f"删除 crontab 失败: {e}")
    
    
    def download_only():
        """
        仅执行 OSS 配置文件下载逻辑。
        配置在crontab中每分钟执行
        * * * * * {PYTHON_PATH} {PYTHON_FILE_PATH} --type download
        """
        download_success = oss_tool.download_file_if_newer()
        if download_success:
            file_path = os.path.join(SCRIPT_DIR, os.path.basename(read_from_json('OSS_KEY', '.meta_stat_key.json')))
            meta_stat_config = parse_config(file_path)
            schedule = meta_stat_config.get('schedule')
            if schedule['type'] == 'cron':
                cron_expr = schedule.get('cron')
                if cron_expr:
                    update_crontab(cron_expr)
                else:
                    logger.info("缺少 cron 表达式,无法更新任务")
                    return
            else:
                # 删除crontab 中的内容
                remove_crontab_entry()
    
            if schedule['type'] == 'once' or schedule['type'] == 'one':
                # 直接调度
                run_only()
                return
    
    
    def compress_directory(new_dir_name, output_dir):
        """
        将指定目录打包并压缩为 .tar.gz 文件。
    
        :param new_dir_name: 要压缩的目录名
        :param output_dir: 输出目录
        :return: 生成的压缩文件路径
        """
    
        base_name = os.path.basename(new_dir_name)
        tar_path = os.path.join(output_dir, f"{base_name}.tar")
        gz_path = f"{tar_path}.gz"
    
        # Step 1: 创建 .tar 文件
        with tarfile.open(tar_path, "w") as tar:
            tar.add(new_dir_name, arcname=base_name)
    
        # Step 2: 使用 pigz 压缩 .tar 文件
        try:
            subprocess.run(['pigz', '-f', '-v', tar_path], check=True)
        except subprocess.CalledProcessError as e:
            logger.error(f"压缩失败: {e}")
            return None
    
        # Step 3: 删除原始目录和tar文件
        if os.path.exists(tar_path):
            os.remove(tar_path)
            logger.info(f"已删除原始 .tar 文件: {tar_path}")
        if os.path.exists(new_dir_name):
            shutil.rmtree(new_dir_name)
            logger.info(f"已删除原始目录: {new_dir_name}")
    
        logger.info(f"压缩文件夹逻辑完成: {gz_path}")
        return gz_path
    
    
    def run_only():
        """
        执行逻辑。如果调用模式为cron
        则配置在crontab中执行
        {schedule['cron']} {PYTHON_PATH} {PYTHON_FILE_PATH} --type run
        """
        global ots_writer
        lock_file = os.path.join(SCRIPT_DIR, '.meta_stat.lock')
        try:
            lock_fd = os.open(lock_file, os.O_CREAT | os.O_RDWR)
            fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except (IOError, OSError):
            logger.info("Another instance is already running. Exiting.")
            exit(1)
    
        begin_date = datetime.now().strftime("%Y%m%dT%H%M")
        file_path = os.path.join(SCRIPT_DIR, os.path.basename(read_from_json('OSS_KEY', '.meta_stat_key.json')))
        meta_stat_config = parse_config(file_path)
        # 获取目录输出
        dir_output_modes, ots_file_name_to_input_dir_dict = determine_output_modes(meta_stat_config)
        fsid = read_from_json("fsid", OSS_CONFIG_FILE_NAME, 'fsid')
        if fsid.startswith('bmcpfs-'):
            fsid = fsid[7:]
        elif fsid.startswith('cpfs-'):
            fsid = fsid[5:]
        for dir_path in dir_output_modes:
            mode = dir_output_modes[dir_path]
            run_bmstat_for_dir(mode, fsid)
        end_date = datetime.now().strftime("%Y%m%dT%H%M")
        new_dir_name = f"beg_{begin_date}_end_{end_date}_{fsid}"
        new_dir_path = f"{OUTPUT_DIR_PREFIX}{new_dir_name}"
        temp_output_dir = f"{OUTPUT_DIR_PREFIX}{fsid}/"
        os.rename(temp_output_dir, new_dir_path)
        # 上传并删除本地文件
        TablestoreWriter.upload_dir_to_ots(new_dir_path)
        # 上传oss
        if meta_stat_config['scan_to_files']:
            oss_upload_type = read_from_json("ossUploadType", OSS_CONFIG_FILE_NAME, 'gz')  # csv
            if oss_upload_type == 'csv':
                oss_tool.upload_dir_to_oss(new_dir_path, meta_stat_config['oss_path'])
            local_file_path = compress_directory(new_dir_path, OUTPUT_DIR_PREFIX)
            if oss_upload_type == 'gz':
                oss_tool.upload_file_to_oss(local_file_path,
                                            os.path.join(meta_stat_config['oss_path'], f"{new_dir_name}.tar.gz"))
            # 本地只保留最近一个gz,删除其他的
            for file in os.listdir(OUTPUT_DIR_PREFIX):
                if file.startswith(f"beg_") and file.endswith(".tar.gz") and file != f"{new_dir_name}.tar.gz":
                    os.remove(f"{OUTPUT_DIR_PREFIX}{file}")
    
    
    def upload_only():
        upload_to_tablestore_list = read_from_json('uploadToTablestore', OSS_CONFIG_FILE_NAME)
        for file_path in upload_to_tablestore_list:
            if not os.path.exists(file_path):
                logger.error(f"文件不存在: {file_path}")
                exit(1)
        with ProcessPoolExecutor(max_workers=MAX_OTS_FUTURES) as executor:
            for file_path in upload_to_tablestore_list:
                TablestoreWriter.upload_csv_to_ots(executor, file_path)
    
    
    def main():
        global oss_tool, nas_client
        # 创建命令行参数解析器
        parser = argparse.ArgumentParser(description="Download file from OSS with conditional check")
    
        # 添加命令行参数
        parser.add_argument('--type', choices=['download', 'run', 'upload'], required=True,
                            help='指定类型: download - 仅检查并下载配置; run - 直接执行统计任务')
        # 解析命令行参数
        args = parser.parse_args()
    
        oss_tool = OssTool()
    
        # 初始化 NAS 客户端(如果配置了 NAS 相关参数)
        try:
            nas_access_key_id = read_from_json('nas_access_key_id', '.meta_stat_key.json', None)
            nas_access_key_secret = read_from_json('nas_access_key_secret', '.meta_stat_key.json', None)
            nas_endpoint = read_from_json('nas_endpoint', '.meta_stat_key.json', None)
    
            if nas_access_key_id and nas_access_key_secret and nas_endpoint:
                nas_client = NasClient()
                nas_client.init_client(nas_access_key_id, nas_access_key_secret, nas_endpoint)
                logger.info("NAS 客户端初始化成功")
            else:
                logger.info("未配置 NAS 相关参数,$fset 字段映射将不可用")
                nas_client = None
        except Exception as e:
            logger.warning(f"NAS 客户端初始化失败: {e},$fset 字段映射将不可用")
            nas_client = None
    
        if args.type == 'download':
            download_only()
        elif args.type == 'run':
            run_only()
        elif args.type == 'upload':
            upload_only()
    
    
    def determine_output_modes(meta_stat_config):
        scan_to_files = set(meta_stat_config['scan_to_files'])
        scan_to_tablestore = set(meta_stat_config['scan_to_tablestore'])
        all_directories = set(scan_to_files).union(set(scan_to_tablestore))
        dir_output_modes = OrderedDict()
        ots_file_name_to_input_dir_dict = {}
    
        for dir_path in sorted(all_directories):
            output_mode = DirectoryOutputMode(input_dir=dir_path)
            dir_output_modes[dir_path] = output_mode
        # 先确认每个任务需要输出的什么
        for _item in scan_to_files:
            output_file_name = _item
            if output_file_name.startswith('/'):
                output_file_name = output_file_name[1:]
            if output_file_name.endswith('/'):
                output_file_name = output_file_name[:-1]
            output_file_name = output_file_name.replace('/', '_') + ".csv"
            dir_output_modes[_item].output_path_set.add(output_file_name)
        for _item in scan_to_tablestore:
            output_file_name = _item
            if output_file_name.startswith('/'):
                output_file_name = output_file_name[1:]
            if output_file_name.endswith('/'):
                output_file_name = output_file_name[:-1]
            output_file_name = output_file_name.replace('/', '_') + ".ots"
            dir_output_modes[_item].output_path_set.add(output_file_name)
            ots_file_name_to_input_dir_dict[output_file_name] = _item
        # 如果一个目录b是现存某个任务a的子目录,则a目录中的exclude需要加上b目录,且b目录的输出需要加上a目录的输出
        for _item_b in dir_output_modes:
            for _item_a in dir_output_modes:
                if _item_b.startswith(_item_a) and _item_b != _item_a:
                    dir_output_modes[_item_a].exclude_dir_set.add(_item_b)
                    dir_output_modes[_item_b].output_path_set.update(dir_output_modes[_item_a].output_path_set)
    
        return dir_output_modes, ots_file_name_to_input_dir_dict
    
    
    class DirectoryOutputMode:
        def __init__(self, input_dir: str, exclude_dir_list=None, output_path_set=None, ots_output_path_set=None):
            self.input_dir = input_dir
            if exclude_dir_list is None:
                exclude_dir_list = set()
            self.exclude_dir_set = exclude_dir_list
            if output_path_set is None:
                output_path_set = set()
            self.output_path_set = output_path_set
            if ots_output_path_set is None:
                ots_output_path_set = set()
            self.ots_output_path_set = ots_output_path_set
    
        def __repr__(self):
            return f"DirectoryOutputMode(input_dir={self.input_dir}, exclude_dir_list={self.exclude_dir_set}, output_path_set={self.output_path_set}, ots_output_path_set={self.ots_output_path_set})"
    
    
    def is_valid_cron(cron_str: str) -> bool:
        """
        判断输入字符串是否为合法的 cron 表达式(不包含秒字段)。
        支持 *, /, -, 数字组合,不校验具体取值范围。
        """
        cron_pattern = r'^(\s*(\*|[\d\-\*/]+)(,\s*(\*|[\d\-\*/]+))*){5}$'
        return bool(re.match(cron_pattern, cron_str.strip()))
    
    
    def check_path_exists_with_sudo(path):
        try:
            result = subprocess.run(
                ['sudo', 'test', '-e', path],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                timeout=5
            )
            return result.returncode == 0
        except Exception as e:
            print(f"Error checking path: {e}")
            return False
    
    
    def parse_config(config_path: str) -> Dict:
        """
        解析配置文件,返回结构化数据
        :param config_path: JSON 配置文件路径
        :return: 包含配置字段的字典
        """
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                config = json.load(f)
    
            # 提取字段
            fsid = config.get("fsid")
            scan_to_tablestore = config.get("scanToTablestore", [])
            ots_config = config.get("otsConfig", {})
            fields = config.get("fields", {})
            pk_fields = config.get("pkFields", {})
            scan_to_files = config.get("scanToFiles", [])
            oss_path = config.get("ossPath")
            schedule = config.get("schedule", {})
    
            if scan_to_files is None:
                scan_to_files = []
            if oss_path is None:
                oss_path = ""
            if oss_path.startswith('/'):
                oss_path = oss_path[1:]
    
            # 基础校验
            if not fsid or not isinstance(fsid, str):
                raise ValueError("缺少有效的 fsid 字段")
            # fsid只能出现[a-zA-Z0-9字符]
            if fsid.startswith('bmcpfs-'):
                fsid = fsid[7:]
            elif fsid.startswith('cpfs-'):
                fsid = fsid[5:]
            if not fsid.isalnum():
                raise ValueError("fsid 只能包含字母和数字")
    
            if not isinstance(scan_to_files, list):
                raise ValueError("scanToFiles 必须是列表类型")
            # 更新scan_to_files
            concat_paths(fsid, scan_to_files)
    
            # 判断路径是否存在
            for path in scan_to_files:
                if not check_path_exists_with_sudo(path):
                    raise ValueError(f"路径 {path} 不存在")
    
            if not isinstance(scan_to_tablestore, list):
                raise ValueError("scanToTablestore 必须是列表类型")
    
            concat_paths(fsid, scan_to_tablestore)
    
            for path in scan_to_tablestore:
                if not check_path_exists_with_sudo(path):
                    raise ValueError(f"路径 {path} 不存在")
    
            if not isinstance(ots_config, dict):
                raise ValueError("otsConfig 必须是字典类型")
    
            # 校验ots_config
            for key in ['instanceName', 'tableName', 'endpoint', 'rowExistenceException', 'columnCondition', 'writeType']:
                if key not in ots_config:
                    raise ValueError(f"otsConfig 缺少 {key} 字段")
                if not ots_config[key]:
                    raise ValueError(f"otsConfig {key} 字段不能为空")
                if key == 'rowExistenceException':
                    if ots_config[key] not in [tablestore.RowExistenceExpectation.IGNORE,
                                               tablestore.RowExistenceExpectation.EXPECT_EXIST]:
                        raise ValueError(f"otsConfig {key} 字段只能为 IGNORE, EXPECT_EXIST")
                if key == 'columnCondition':
                    if ots_config[key] not in [IGNORE, NEW_ATIME]:
                        raise ValueError(f"otsConfig {key} 字段只能为 IGNORE, NEW_ATIME")
                if key == 'writeType':
                    if ots_config[key] not in AVAILABLE_OTS_WRITE_TYPE:
                        raise ValueError(f"otsConfig {key} 字段只能为 PUT, DELETE")
    
            if not isinstance(fields, dict):
                raise ValueError("fields 必须是字典类型")
    
            if not isinstance(pk_fields, dict):
                raise ValueError("pkFields 必须是字典类型")
    
            if not isinstance(schedule, dict):
                raise ValueError("schedule 必须是字典类型")
    
            if schedule['type'] == 'cron':
                # 判断是否是合法的cron表达式
                cron_expr = schedule.get('cron')
                if not cron_expr:
                    raise ValueError("cron 字段不能为空")
                if not is_valid_cron(cron_expr):
                    raise ValueError(f"Invalid cron expression: {cron_expr}")
    
            if oss_path and not oss_path.endswith('/'):
                oss_path += '/'
    
            return {
                "fsid": fsid,
                "scan_to_files": scan_to_files,
                "scan_to_tablestore": scan_to_tablestore,
                "ots_config": ots_config,
                "fields": fields,
                "oss_path": oss_path,
                "schedule": schedule,
            }
    
        except FileNotFoundError:
            raise FileNotFoundError(f"配置文件不存在: {config_path}")
        except json.JSONDecodeError:
            raise ValueError(f"配置文件格式错误(非合法 JSON): {config_path}")
        except Exception as e:
            raise RuntimeError(f"解析配置时出错: {e}")
    
    
    def concat_paths(fsid, paths):
        for i, path in enumerate(paths):
            concat_path = BASE_PATH
            if not concat_path.endswith('/'):
                concat_path += '/'
            if path.startswith('/'):
                path = path[1:]
            concat_path += path
            if not concat_path.endswith('/'):
                concat_path += '/'
            paths[i] = concat_path
    
    
    if __name__ == '__main__':
        main()
    
  2. 创建 nas_stat_util.py,内容如下。

    # coding=utf-8
    import argparse
    import concurrent
    import csv
    import io
    import os
    import stat
    import sys
    import threading
    import time
    from concurrent import futures
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    from logging.handlers import RotatingFileHandler
    from multiprocessing import Manager
    from queue import Queue
    from stat import S_ISREG, S_ISDIR
    
    TOOL_VERSION = "1.3"
    INODE_LIST = 'inode_list'
    DIR_TREE = 'dir_tree'
    ALL = 'all'
    REGULAR_FILE = 'regular_file'
    DIR = 'dir'
    VALID_TYPES = [INODE_LIST, DIR_TREE]
    VALID_FILTER_INODE_TYPES = [ALL, REGULAR_FILE, DIR]
    BATCH_LINES = 10000
    thread_pool = None
    
    
    class DirectoryStats:
        def __init__(self, args, _stat):
            self.item_count = 0
            self.item_size = 0
            self.skipped_item_count = 0
            if args is not None:
                if _stat is not None:
                    if args.use_atime:
                        self.atime = _stat.st_atime
                    if args.use_mtime:
                        self.mtime = _stat.st_mtime
                    if args.use_ctime:
                        self.ctime = _stat.st_ctime
                    if args.use_inode:
                        self.inode = int(_stat.st_ino)
                    if args.use_uid:
                        self.uid = _stat.st_uid
                    if args.use_gid:
                        self.gid = _stat.st_gid
                if args.use_all_inode_num:
                    self.all_inode_num = 0
                if args.use_all_size:
                    self.all_size = 0
    
        def add_item_count(self, size):
            self.item_count += size
    
        def add_item_size(self, size):
            self.item_size += size
    
        def add_skipped_item_count(self, size):
            self.skipped_item_count += size
    
        def add_all_inode_num(self, size):
            if self.all_inode_num is None:
                self.all_inode_num = 0
            self.all_inode_num += size
    
        def add_all_size(self, size):
            if self.all_size is None:
                self.all_size = 0
            self.all_size += size
    
        def __repr__(self):
            return (f"DirectoryStats(item_count={self.item_count}, "
                    f"item_size={self.item_size}, skipped_item_count={self.skipped_item_count})")
    
    
    class DirectoryStatMap:
        def __init__(self):
            self.directory_map = {}
            self.inode_num = 0
            self.wait_walk_dir_list = []
    
        def get_or_create(self, directory, args, _stat):
            if directory not in self.directory_map:
                if _stat is None:
                    try:
                        _stat = os.lstat(directory)
                    except:
                        pass
                self.directory_map[directory] = DirectoryStats(args, _stat)
            return self.directory_map[directory]
    
        def add_inode_num(self, num):
            self.inode_num += num
    
        def add_wait_walk_dir(self, directory):
            self.wait_walk_dir_list.append(directory)
    
    
    def output_timestamp(args, timestamp, with_ms=False):
        if args.human_readable:
            return time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime(timestamp))
        if with_ms:
            return int(timestamp * 1000)
        return int(timestamp)
    
    
    def output_bytes(args, num_bytes):
        if args.human_readable:
            for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
                if abs(num_bytes) < 1024.0:
                    return f"{num_bytes:3.1f}{unit}"
                num_bytes /= 1024.0
            return f"{num_bytes:.1f}YB"
        return num_bytes
    
    
    def to_csv_format(text):
        output = io.StringIO()
        writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL, lineterminator='')
        writer.writerow([text])
        return output.getvalue()
    
    
    def format_output_line(args, item_path, _stat=None, dir_stat=None):
        try:
            output_parts = []
            for field in args.output_format_list:
                try:
                    if args.stat_type == INODE_LIST:
                        if field == 'path':
                            if args.output_without_prefix:
                                raw_path = item_path.replace(args.output_without_prefix, '', 1)
                            else:
                                raw_path = item_path
                            if args.use_csv_format:
                                result_path = to_csv_format(raw_path)
                            else:
                                result_path = raw_path
                            output_parts.append(result_path)
                        elif field == 'size':
                            output_parts.append(
                                output_bytes(args,
                                             _stat.st_size if args.use_file_size else _stat.st_blocks * args.block_size))
                        elif field == 'raw_size':
                            output_parts.append(_stat.st_size if args.use_file_size else _stat.st_blocks * args.block_size)
                        elif field == 'inode':
                            output_parts.append(_stat.st_ino)
                        elif field == 'atime':
                            output_parts.append(output_timestamp(args, _stat.st_atime))
                        elif field == 'mtime':
                            output_parts.append(output_timestamp(args, _stat.st_mtime))
                        elif field == 'ctime':
                            output_parts.append(output_timestamp(args, _stat.st_ctime))
                        elif field == 'atime_ms':
                            output_parts.append(output_timestamp(args, _stat.st_atime, True))
                        elif field == 'mtime_ms':
                            output_parts.append(output_timestamp(args, _stat.st_mtime, True))
                        elif field == 'ctime_ms':
                            output_parts.append(output_timestamp(args, _stat.st_ctime, True))
                        elif field == 'uid':
                            output_parts.append(_stat.st_uid)
                        elif field == 'gid':
                            output_parts.append(_stat.st_gid)
                    elif args.stat_type == DIR_TREE:
                        if field == 'path':
                            if args.output_without_prefix:
                                raw_path = item_path.replace(args.output_without_prefix, '', 1)
                            else:
                                raw_path = item_path
                            if args.use_csv_format:
                                result_path = to_csv_format(raw_path)
                            else:
                                result_path = raw_path
                            output_parts.append(result_path)
                        elif field == 'inode_num':
                            output_parts.append(dir_stat.item_count)
                        elif field == 'size':
                            output_parts.append(output_bytes(args, dir_stat.item_size if (
                                args.use_file_size) else dir_stat.item_size * args.block_size))
                        elif field == 'raw_size':
                            output_parts.append(
                                dir_stat.item_size if args.use_file_size else dir_stat.item_size * args.block_size)
                        elif field == 'skip_num':
                            output_parts.append(dir_stat.skipped_item_count)
                        elif field == 'inode':
                            output_parts.append(dir_stat.inode)
                        elif field == 'atime':
                            output_parts.append(output_timestamp(args, dir_stat.atime))
                        elif field == 'mtime':
                            output_parts.append(output_timestamp(args, dir_stat.mtime))
                        elif field == 'ctime':
                            output_parts.append(output_timestamp(args, dir_stat.ctime))
                        elif field == 'atime_ms':
                            output_parts.append(output_timestamp(args, dir_stat.atime, True))
                        elif field == 'mtime_ms':
                            output_parts.append(output_timestamp(args, dir_stat.mtime, True))
                        elif field == 'ctime_ms':
                            output_parts.append(output_timestamp(args, dir_stat.ctime, True))
                        elif field == 'uid':
                            output_parts.append(dir_stat.uid)
                        elif field == 'gid':
                            output_parts.append(dir_stat.gid)
                        elif field == 'all_inode_num':
                            output_parts.append(dir_stat.all_inode_num)
                        elif field == 'all_size':
                            output_parts.append(output_bytes(args, dir_stat.all_size if (
                                args.use_file_size) else dir_stat.all_size * args.block_size))
                except Exception as e:
                    print(f"Error formatting output line: {e}")
                    output_parts.append('')
            if output_parts:
                output_parts = [str(part) for part in output_parts]
                return ','.join(output_parts) + '\n'
            return ''
        except Exception as e:
            print(f"Error formatting output line: {e}")
            return ''
    
    
    def inode_stat_print(args, local_dir_stat_map, item_path=None, _stat=None, thread_local_output_lines=None):
        if thread_local_output_lines is None:
            thread_local_output_lines = []
        if args.stat_type == INODE_LIST:
            thread_local_output_lines.append(format_output_line(args, item_path, _stat=_stat))
            if len(thread_local_output_lines) >= BATCH_LINES:
                output_stat_lines(args, local_dir_stat_map, thread_local_output_lines)
    
    
    def dir_stat_map_print(args, dir_stat_map, output_handle, print_elapsed_time=False, clear_screen=False):
        try:
            if clear_screen:
                os.system('cls' if os.name == 'nt' else 'clear')
            function_local_output_lines = []
            for iter_dir_path, iter_dir_stat in sorted(dir_stat_map.directory_map.items()):
                function_local_output_lines.append(format_output_line(args, iter_dir_path, dir_stat=iter_dir_stat))
    
                if len(function_local_output_lines) >= BATCH_LINES:
                    with args.m_output_file_lock:
                        output_handle.writelines(function_local_output_lines)
                        function_local_output_lines.clear()
    
            with args.m_output_file_lock:
                output_handle.writelines(function_local_output_lines)
                function_local_output_lines.clear()
                if print_elapsed_time:
                    end_time = time.time()
                    elapsed_time = end_time - args.start_time
                    output_handle.write(f"Elapsed time: {elapsed_time} seconds\n")
        except Exception as e:
            print(f"Error printing directory statistics: {e}")
    
    
    def output_total_line_print(args, dir_stat_map, output_handle, print_elapsed_time=False, clear_screen=False):
        if clear_screen:
            os.system('cls' if os.name == 'nt' else 'clear')
        with args.m_output_file_lock:
            output_handle.write(f"Output total lines: {dir_stat_map.inode_num}\n")
            if print_elapsed_time:
                end_time = time.time()
                elapsed_time = end_time - args.start_time
                output_handle.write(f"Elapsed time: {elapsed_time} seconds\n")
    
    
    def stat_print_at_runtime(args, dir_stat_map, output_handle, print_elapsed_time=False):
        time.sleep(args.print_process_time)
        while not args.stat_finish:
            if args.stat_type == DIR_TREE:
                dir_stat_map_print(args, dir_stat_map, output_handle, print_elapsed_time, clear_screen=True)
            elif args.stat_type == INODE_LIST:
                output_total_line_print(args, dir_stat_map, output_handle, print_elapsed_time, clear_screen=True)
            else:
                break
            time.sleep(args.print_process_time)
    
    
    def filter_inode(args, _stat):
        if args.min_size and _stat.st_size < args.min_size:
            return False
        if args.max_size and _stat.st_size > args.max_size:
            return False
        if args.atime and _stat.st_atime > args.atime:
            return False
        if args.atime_after and _stat.st_atime < args.atime_after:
            return False
        if args.mtime and _stat.st_mtime > args.mtime:
            return False
        if args.mtime_after and _stat.st_mtime < args.mtime_after:
            return False
        if args.ctime and _stat.st_ctime > args.ctime:
            return False
        if args.ctime_after and _stat.st_ctime < args.ctime_after:
            return False
        if args.filter_inode_type != ALL:
            if args.filter_inode_type == REGULAR_FILE and not S_ISREG(_stat.st_mode):
                return False
            if args.filter_inode_type == DIR and not S_ISDIR(_stat.st_mode):
                return False
    
        return True
    
    
    class CustomArgumentParser(argparse.ArgumentParser):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
    
            self.common_group = self.add_argument_group('Common arguments')
            self.common_group.add_argument("-t", "--type", dest="stat_type", required=True, choices=VALID_TYPES,
                                           help="supported types : " + ','.join(VALID_TYPES))
            self.common_group.add_argument("-i", "--input", dest="input", required=True,
                                           help="the input directories, split by comma")
            self.common_group.add_argument("--input-from-file", dest="input_from_file", action="store_true",
                                           help="read input dir from a file, one line one input dir")
            self.common_group.add_argument("-o", "--output-path", dest="output_path",
                                           default="/tmp/nas_stat_util.output", help="the output path")
            self.common_group.add_argument("--human", dest="human_readable", action="store_true",
                                           help="print human readable size")
            self.common_group.add_argument("--processes", dest="processes", type=int,
                                           default=10, help="the stat process concurrency")
            self.common_group.add_argument("--threads", dest="threads", type=int, default=10,
                                           help="the stat threads concurrency")
            self.common_group.add_argument("--backup-count", dest="backup_count", type=int, default=9,
                                           help="output backup count")
            self.common_group.add_argument("--print-process-time", dest="print_process_time", type=int, default=0,
                                           help="print process time (seconds)")
            self.common_group.add_argument("--exclude-dirs", dest="exclude_dirs", help="exclude dir names, split by comma")
            self.common_group.add_argument("--exclude-full-path-dirs", dest="exclude_full_path_dirs",
                                           help="exclude full dir paths, split by comma")
            self.common_group.add_argument("--output-format", dest="output_format",
                                           help="Output format each line,"
                                                " type file default: path,size."
                                                " Available choices is: [path,size,inode,atime,mtime,ctime]."
                                                " type dir default: path,inode_num,size,skip_num."
                                                " Available choices is: [path,inode_num,size,skip_num,inode,atime,mtime,ctime,all_inode_num,all_size]."
                                           )
            self.common_group.add_argument("--output-without-head", dest="output_without_header", action='store_true',
                                           help="output file without head line")
            self.common_group.add_argument("--output-without-prefix", dest="output_without_prefix", default='',
                                           help="output path without prefix")
            self.common_group.add_argument("--use-file-size", dest="use_file_size", action='store_true',
                                           help="output file size instead of disk usage")
            self.common_group.add_argument("--use-csv-format", dest="use_csv_format", action='store_true',
                                           help="use csv format for path")
            self.common_group.add_argument("--block-size", dest="block_size", type=int, default=512,
                                           help="file block size in storage, Usually 512 bytes")
            self.common_group.add_argument("-s", "--min-size", dest="min_size", type=int,
                                           help="Files whose size is bigger than this value, unit: byte")
            self.common_group.add_argument("--max-size", dest="max_size", type=int,
                                           help="Files whose size is smaller than this value, unit: byte")
            self.common_group.add_argument("-a", "--atime", dest="atime", type=int,
                                           help="Files whose access time is earlier than this value")
            self.common_group.add_argument("--atime-after", dest="atime_after", type=int,
                                           help="Files whose access time is later than this value")
            self.common_group.add_argument("-m", "--mtime", dest="mtime", type=int,
                                           help="Files whose modify time is earlier than this value")
            self.common_group.add_argument("--mtime-after", dest="mtime_after", type=int,
                                           help="Files whose modify time is later than this value")
            self.common_group.add_argument("-c", "--ctime", dest="ctime", type=int,
                                           help="Files whose change time is earlier than this value")
            self.common_group.add_argument("--ctime-after", dest="ctime_after", type=int,
                                           help="Files whose change time is later than this value")
            self.common_group.add_argument("--filter-inode-type", dest="filter_inode_type",
                                           choices=VALID_FILTER_INODE_TYPES, default=ALL,
                                           help=f"filter inode type, supported types : {VALID_FILTER_INODE_TYPES}, default: all")
    
            self.type_dir_group = self.add_argument_group('type dir_tree arguments')
            self.type_dir_group.add_argument("-d", "--depth", dest="depth", type=int, default=2,
                                             help="max depth of show dirs")
    
            self.type_file_group = self.add_argument_group('type file arguments')
    
    
    def add_dir_stat_map(args, sum_dir_stat_map, local_dir_stat_map):
        if sum_dir_stat_map is None or local_dir_stat_map is None:
            return
        if local_dir_stat_map.directory_map is not None:
            for iter_dir_path, iter_dir_stat in local_dir_stat_map.directory_map.items():
                if iter_dir_path not in sum_dir_stat_map.directory_map:
                    sum_dir_stat_map.directory_map[iter_dir_path] = iter_dir_stat
                else:
                    dir_stat = sum_dir_stat_map.directory_map[iter_dir_path]
                    dir_stat.add_item_count(iter_dir_stat.item_count)
                    dir_stat.add_item_size(iter_dir_stat.item_size)
                    dir_stat.add_skipped_item_count(iter_dir_stat.skipped_item_count)
                    if args.use_all_inode_num:
                        dir_stat.add_all_inode_num(iter_dir_stat.all_inode_num)
                    if args.use_all_size:
                        dir_stat.add_all_size(iter_dir_stat.all_size)
        if local_dir_stat_map.inode_num is not None:
            sum_dir_stat_map.add_inode_num(local_dir_stat_map.inode_num)
        if local_dir_stat_map.wait_walk_dir_list is not None:
            sum_dir_stat_map.wait_walk_dir_list.extend(local_dir_stat_map.wait_walk_dir_list)
    
    
    def stat_process(args, dir_list):
        global thread_pool
        if not thread_pool:
            thread_pool = ThreadPoolExecutor(max_workers=args.threads)
        args.thread_lock = threading.Lock()
        process_dir_stat_map = DirectoryStatMap()
        args.process_wait_queue_size = args.m_process_wait_queue.qsize()
        args.thread_future_set = set()
        args.thread_wait_queue = Queue()
        for dir_path in dir_list:
            future = thread_pool.submit(stat_walk, args, dir_path, True, None, time.perf_counter())
            args.thread_future_set.add(future)
        while len(args.thread_future_set) > 0 or not args.thread_wait_queue.empty():
            while not args.thread_wait_queue.empty():
                if args.m_process_wait_queue.qsize() < args.max_process_queue_size:
                    args.m_process_wait_queue.put(args.thread_wait_queue.get())
                elif len(args.thread_future_set) < args.max_thread_queue_size:
                    future = thread_pool.submit(stat_walk, args, args.thread_wait_queue.get(), True, None,
                                                time.perf_counter())
                    args.thread_future_set.add(future)
                else:
                    break
            args.process_wait_queue_size = args.m_process_wait_queue.qsize()
            try:
                for future in futures.as_completed(args.thread_future_set, timeout=1):
                    thread_dir_stat_map = future.result()
                    add_dir_stat_map(args, process_dir_stat_map, thread_dir_stat_map)
                    args.thread_future_set.remove(future)
            except concurrent.futures.TimeoutError:
                pass
            except Exception as e:
                print(f"threads error processing directory: {e}")
                args.thread_future_set.remove(future)
        return process_dir_stat_map
    
    
    def stat_walk(args, top, need_output, thread_local_output_lines, start_time):
        local_dir_stat_map = DirectoryStatMap()
        # Each thread maintains its own output list
        if thread_local_output_lines is None:
            thread_local_output_lines = []
        try:
            try:
                scandir_it = os.scandir(top)
            except OSError as error:
                add_skip_to_dir_stat_map(args, local_dir_stat_map, error, top)
                return local_dir_stat_map
    
            with scandir_it:
                while True:
                    try:
                        try:
                            entry = next(scandir_it)
                        except StopIteration:
                            break
                    except OSError as error:
                        add_skip_to_dir_stat_map(args, local_dir_stat_map, error, top)
                        break
    
                    try:
                        is_dir = entry.is_dir()
                    except OSError:
                        is_dir = False
    
                    path = os.path.join(top, entry.name)
                    if is_dir:
                        if entry.name in args.exclude_dir_set:
                            continue
                        if not path.endswith('/'):
                            path += '/'
                        if path in args.exclude_full_path_dir_set:
                            continue
                        my_stat(args, local_dir_stat_map, thread_local_output_lines, path)
                        if entry.is_symlink():
                            continue
                        if time.perf_counter() - start_time > 1 and args.thread_wait_queue.qsize() < args.max_thread_queue_size - len(
                                args.thread_future_set) + args.max_process_queue_size - args.process_wait_queue_size:
                            args.thread_wait_queue.put(path)
                        else:
                            _local_dir_stat_map = stat_walk(args, path, False, thread_local_output_lines, start_time)
                            add_dir_stat_map(args, local_dir_stat_map, _local_dir_stat_map)
                    else:
                        my_stat(args, local_dir_stat_map, thread_local_output_lines, path)
            if need_output and args.stat_type == INODE_LIST:
                output_stat_lines(args, local_dir_stat_map, thread_local_output_lines)
            return local_dir_stat_map
        except Exception as error:
            add_skip_to_dir_stat_map(args, local_dir_stat_map, error, top)
            return local_dir_stat_map
    
    
    def output_stat_lines(args, local_dir_stat_map, thread_local_output_lines):
        with args.thread_lock:
            with args.m_output_file_lock:
                for output_path in args.output_path_list:
                    with open(output_path, 'a') as output_handle:
                        output_handle.writelines(thread_local_output_lines)
        local_dir_stat_map.add_inode_num(len(thread_local_output_lines))
        thread_local_output_lines.clear()
    
    
    def my_stat(args, local_dir_stat_map, thread_local_output_lines, item_path):
        try:
            _stat = os.lstat(item_path)
            if not args.need_filter or filter_inode(args, _stat):
                if args.stat_type == DIR_TREE:
                    add_inode_to_dir_stat_map(args, local_dir_stat_map, item_path, _stat)
                    # Record directory information
                    if stat.S_ISDIR(_stat.st_mode) and get_depth_from_str(item_path) <= args.min_key_depth + args.depth:
                        local_dir_stat_map.get_or_create(item_path, args, _stat)
                elif args.stat_type == INODE_LIST:
                    if filter_inode(args, _stat):
                        inode_stat_print(args, local_dir_stat_map, item_path, _stat, thread_local_output_lines)
            if args.use_all_inode_num or args.use_all_size:
                if args.stat_type == DIR_TREE:
                    add_inode_to_dir_stat_map(args, local_dir_stat_map, item_path, _stat, all_mode=True)
                    # Record directory information
                    if stat.S_ISDIR(_stat.st_mode) and get_depth_from_str(item_path) <= args.min_key_depth + args.depth:
                        local_dir_stat_map.get_or_create(item_path, args, _stat)
        except Exception as error:
            add_skip_to_dir_stat_map(args, local_dir_stat_map, error, item_path)
    
    
    def roll_output_file(args):
        for output_path in args.output_path_list:
            need_roll = os.path.isfile(output_path)
            if args.backup_count > 0 and need_roll:
                file_handler = RotatingFileHandler(output_path, backupCount=args.backup_count)
                file_handler.doRollover()
    
    
    def validate_input_dirs(args):
        directories = []
        if args.input_from_file:
            with open(args.input, 'r') as file:
                for line in file:
                    line = line.strip()
                    if line:
                        directories.append(line)
        else:
            directories.extend([_dir.strip() for _dir in args.input.split(",")])
        # Check if each directory exists
        for index, directory in enumerate(directories):
            if not directory.endswith('/'):
                directory += '/'
                directories[index] = directory
            if not os.path.isdir(directory):
                raise argparse.ArgumentTypeError(f"Directory '{directory}' does not exist.")
        return directories
    
    
    def check_and_init_args(args):
        args.output_path_list = args.output_path.split(",")
        args.min_key_depth = None
        if args.stat_type == DIR_TREE or args.stat_type == INODE_LIST:
            args.input_dirs = validate_input_dirs(args)
            for _dir in args.input_dirs:
                if args.min_key_depth is None:
                    args.min_key_depth = get_depth_from_str(_dir)
                else:
                    args.min_key_depth = min(args.min_key_depth, get_depth_from_str(_dir))
        args.exclude_dir_set = frozenset()
        if args.exclude_dirs is not None:
            args.exclude_dir_set = frozenset(args.exclude_dirs.split(","))
        args.exclude_full_path_dir_set = frozenset()
        if args.exclude_full_path_dirs is not None:
            exclude_full_path_dir_list = []
            for path in args.exclude_full_path_dirs.split(","):
                if not path.endswith('/'):
                    path += '/'
                exclude_full_path_dir_list += [path]
            args.exclude_full_path_dir_set = frozenset(exclude_full_path_dir_list)
    
        args.max_process_queue_size = 100 * args.processes
        args.max_thread_queue_size = 100 * args.threads
        if args.output_format is None:
            if args.stat_type == DIR_TREE:
                args.output_format_list = ['path', 'inode_num', 'size', 'skip_num']
            elif args.stat_type == INODE_LIST:
                args.output_format_list = ['path', 'size']
        else:
            args.output_format_list = args.output_format.split(',')
        args.use_atime = "atime" in args.output_format_list or "atime_ms" in args.output_format_list
        args.use_mtime = "mtime" in args.output_format_list or "mtime_ms" in args.output_format_list
        args.use_ctime = "ctime" in args.output_format_list or "ctime_ms" in args.output_format_list
        args.use_inode = "inode" in args.output_format_list
        args.use_uid = "uid" in args.output_format_list
        args.use_gid = "gid" in args.output_format_list
        args.use_all_inode_num = "all_inode_num" in args.output_format_list
        args.use_all_size = "all_size" in args.output_format_list
        args.need_filter = (args.min_size is not None or args.max_size is not None
                            or args.atime is not None or args.atime_after is not None
                            or args.ctime is not None or args.ctime_after is not None
                            or args.mtime is not None or args.mtime_after is not None
                            or args.filter_inode_type is not None)
    
    
    # / -> depth 0
    # /a/ -> depth 1
    # /a.txt -> depth 1
    # /a/b/ -> depth 2
    # /a/b.txt -> depth 2
    def get_depth_from_str(file_path):
        if file_path.endswith('/'):
            return file_path.count('/') - 1
        return file_path.count('/')
    
    
    def get_depth_from_list(file_path_list):
        if file_path_list[-1] == '':
            return len(file_path_list) - 2
        return len(file_path_list) - 1
    
    
    def add_inode_to_dir_stat_map(args, input_dir_stat_map, inode_path, _stat, all_mode=False):
        inode_size = _stat.st_size if args.use_file_size else _stat.st_blocks
        base_depth = args.min_key_depth
        file_path_list = inode_path.split('/')
        file_depth = get_depth_from_list(file_path_list)
        dir_path = ''
        for i in range(0, min(1 + base_depth + args.depth, file_depth)):
            dir_path += file_path_list[i] + '/'
            if i >= base_depth:
                dir_stat = input_dir_stat_map.get_or_create(dir_path, args, _stat)
                if all_mode:
                    dir_stat.add_all_inode_num(1)
                    dir_stat.add_all_size(inode_size)
                else:
                    dir_stat.add_item_count(1)
                    dir_stat.add_item_size(inode_size)
    
    
    def add_skip_to_dir_stat_map(args, input_dir_stat_map, error, inode_path):
        print("inode_path:{}, skip because error:{}".format(inode_path, error))
        file_path_list = inode_path.split('/')
        file_depth = get_depth_from_list(file_path_list)
        base_depth = args.min_key_depth
        if inode_path.endswith('/'):
            file_depth += 1
        dir_path = ''
        for i in range(0, min(1 + base_depth + args.depth, file_depth)):
            dir_path += file_path_list[i] + '/'
            if i >= base_depth:
                dir_stat = input_dir_stat_map.get_or_create(dir_path, args, None)
                dir_stat.add_skipped_item_count(1)
    
    
    def chunks(lst, n):
        for i in range(0, len(lst), n):
            yield lst[i:i + n]
    
    
    def stat_func(args):
        args.stat_finish = False
        dir_stat_map = DirectoryStatMap()
        args.start_time = time.time()
        with Manager() as manager:
            args.m_output_file_lock = manager.Lock()
            args.m_process_wait_queue = manager.Queue()
            args.add_wait_flag = False
            process_future_set = set()
            try:
                if args.print_process_time > 0:
                    threading.Thread(target=stat_print_at_runtime, args=(args, dir_stat_map, sys.stdout, True)).start()
                if args.stat_type == DIR_TREE or args.stat_type == INODE_LIST:
                    roll_output_file(args)
                    init_file_header(args)
                    with ProcessPoolExecutor(max_workers=args.processes) as walk_executor:
                        input_dir_chunks = list(chunks(args.input_dirs, args.max_thread_queue_size))
                        for input_dir_chunk in input_dir_chunks:
                            future = walk_executor.submit(stat_process, args, input_dir_chunk)
                            process_future_set.add(future)
    
                        while len(process_future_set) > 0 or not args.m_process_wait_queue.empty():
                            while not args.m_process_wait_queue.empty():
                                if args.max_process_queue_size <= len(process_future_set):
                                    break
                                _dir = args.m_process_wait_queue.get()
                                future = walk_executor.submit(stat_process, args, [_dir])
                                process_future_set.add(future)
                            try:
                                for future in futures.as_completed(process_future_set, timeout=1):
                                    local_dir_stat_map = future.result()
                                    add_dir_stat_map(args, dir_stat_map, local_dir_stat_map)
                                    process_future_set.remove(future)
                            except concurrent.futures.TimeoutError:
                                pass
                            except Exception as e:
                                print(f"Stat error processing directory: {e}")
                                process_future_set.remove(future)
    
                        args.stat_finish = True
                        if args.stat_type == DIR_TREE:
                            for output_path in args.output_path_list:
                                with open(output_path, 'a+') as output_file:
                                    dir_stat_map_print(args, dir_stat_map, output_file)
                            dir_stat_map_print(args, dir_stat_map, sys.stdout, print_elapsed_time=True,
                                               clear_screen=(args.print_process_time > 0))
                        elif args.stat_type == INODE_LIST:
                            output_total_line_print(args, dir_stat_map, sys.stdout, print_elapsed_time=True,
                                                    clear_screen=(args.print_process_time > 0))
            except Exception as e:
                print(f"Error in stat_func: {e}")
            args.stat_finish = True
    
    
    def init_file_header(args):
        for output_path in args.output_path_list:
            file_exists = os.path.isfile(output_path)
            file_empty = not file_exists or os.path.getsize(output_path) == 0
            with open(output_path, 'a+') as output_file:
                with args.m_output_file_lock:
                    if args.output_without_header:
                        continue
                    if file_empty:
                        output_file.write(','.join(args.output_format_list) + '\n')
                        output_file.flush()
    
    
    def main():
        parser = CustomArgumentParser(description='nas stat util args info')
        args = parser.parse_args()
        check_and_init_args(args)
        stat_func(args)
    
    
    if __name__ == "__main__":
        main()
    
  3. 配置 .meta_stat_key.json,该文件存放访问各云服务所需的密钥和基础参数。

    {
        "oss_access_key_id": "<AccessKey ID>",
        "oss_access_key_secret": "<AccessKey Secret>",
        "ots_access_key_id": "<AccessKey ID>",
        "ots_access_key_secret": "<AccessKey Secret>",
        "BASE_PATH": "/cpfs",
        "OUTPUT_DIR_PREFIX": "/opt/cpfs-scanner/bmstat/",
        "OSS_REGION": "<region-id>",
        "OSS_BUCKET": "bmcpfs-scan",
        "OSS_KEY": "bmcpfs_stat_tool_config.json",
        "OSS_ENDPOINT": "oss-<region-id>-internal.aliyuncs.com",
        "nas_access_key_id": "<AccessKey ID>",
        "nas_access_key_secret": "<AccessKey Secret>",
        "nas_endpoint": "nas-vpc.<region-id>.aliyuncs.com"
    }

    各配置项说明如下。

    配置项

    说明

    oss_access_key_id / oss_access_key_secret

    用于访问 OSS 的 AccessKey,需具备目标 Bucket 的读写权限

    ots_access_key_id / ots_access_key_secret

    用于访问 Tablestore 的 AccessKey,需具备目标实例的读写权限

    BASE_PATH

    CPFS 文件系统在本机的挂载路径,本文示例为 /cpfs

    OUTPUT_DIR_PREFIX

    扫描结果文件的本地输出目录,需确保该目录有写入权限

    OSS_REGION

    OSS Bucket 所在地域

    OSS_BUCKET

    存放扫描结果归档文件的 OSS Bucket 名称

    OSS_KEY

    扫描行为配置文件的名称,用于本地读取配置,同时也是该文件在 OSS 中的对象名称(供 --type download 模式使用)

    OSS_ENDPOINT

    OSS 访问地址,支持内网访问的服务器建议使用内网地址以降低延迟和流量费用

    nas_access_key_id / nas_access_key_secret

    用于调用 NAS API 查询 Fileset 信息的 AccessKey

    nas_endpoint

    NAS API 的访问地址

  4. 配置 bmcpfs_stat_tool_config.json,该文件定义扫描范围、字段映射和 Tablestore 写入参数。

    {
        "fsid": "<文件系统ID>",
        "ossPath": "/",
        "scanToFiles": [
        ],
        "scanToTablestore": [
            "/fileset1",
            "/fileset2"
        ],
        "fields": {
            "aTime": "$atime_ms",
            "cTime": "$ctime_ms",
            "mTime": "$mtime_ms",
            "fileSize": "$size",
            "cluster_id": "cluster_id",
            "fileset_id": "$fset_id",
            "fileset_name": "$fset_name",
            "fileset_file_count_quota": "$fset_file_count_quota",
            "fileset_size_quota": "$fset_size_quota",
            "wTime": "$wtime_ms",
            "region": "region-id"
        },
        "pkFields": {
            "file_name": "$path"
        },
        "otsConfig": {
            "instanceName": "<实例名>",
            "tableName": "bmcpfs_test",
            "endpoint": "https://<实例名>.<region-id>.vpc.tablestore.aliyuncs.com",
            "rowExistenceException": "IGNORE",
            "columnCondition": "IGNORE",
            "writeType": "PUT"
        },
        "schedule": {
            "type": "once",
            "cron": "0 * * * *"
        }
    }

    各配置项说明如下。

    配置项

    说明

    fsid(必填)

    CPFS 文件系统 ID,可在 CPFS 控制台获取

    scanToTablestore(必填)

    需要扫描并写入 Tablestore 的目录列表,路径以 BASE_PATH 为基线(相对路径)

    otsConfig.instanceName(必填)

    Tablestore 实例名称

    otsConfig.tableName(必填)

    Tablestore 数据表名称,需与步骤二中创建的表名一致

    otsConfig.endpoint(必填)

    Tablestore 的 VPC 内网访问地址,格式为 https://<实例名>.<region-id>.vpc.tablestore.aliyuncs.com。如服务器不支持内网访问,请使用公网访问地址。

    fields

    扫描字段到 Tablestore 列名的映射关系,$ 开头的值为 CPFS 元数据变量

    pkFields

    主键字段映射,file_name 对应文件的完整路径

    cluster_id

    集群ID,可自定义

    region

    地域,可自定义

    schedule.type

    调度类型,once 表示单次执行

    schedule.cron

    Cron 表达式,当 type 为定时模式时生效

    说明

    scanToFilesscanToTablestore 中填写的路径是以 .meta_stat_key.jsonBASE_PATH 为基准的相对路径。例如,BASE_PATH/cpfs,则 /fileset1 对应实际扫描路径 /cpfs/fileset1

  5. 为扫描脚本添加可执行权限。

    chmod 755 meta_stat.py nas_stat_util.py

步骤四:执行扫描

执行以下命令启动扫描任务,扫描工具将读取本地配置文件,遍历指定目录,将元数据写入 Tablestore,并将扫描结果打包上传到 OSS。

python3 meta_stat.py --type run

--type参数支持以下三种模式。

参数值

说明

run

完整执行:读取本地配置,扫描 CPFS 元数据并写入 Tablestore,将扫描结果打包上传到 OSS

download

从 OSS 下载最新的 bmcpfs_stat_tool_config.json,并根据配置中的调度计划更新本机 crontab

upload

仅将本地已有的扫描结果上传到 Tablestore,不重新扫描

如需定期自动执行扫描,可配置 crontab 定时任务。执行以下命令打开 crontab 配置文件:

crontab -e

在文件末尾添加定时任务规则。例如,每天零点执行一次扫描:

0 0 * * * /usr/bin/python3 /opt/cpfs-scanner/meta_stat.py --type run >> /opt/cpfs-scanner/cron.log 2>&1

其中 >> /opt/cpfs-scanner/cron.log 2>&1 将标准输出和标准错误重定向到日志文件,便于排查问题。Python 解释器路径和脚本路径请根据实际部署情况调整。

步骤五:配置 Grafana 展示

1. 安装 Tablestore 数据源插件

  1. aliyun-tablestore-grafana-datasource-adapt-react.zip上传到 Grafana 的插件目录,如/var/lib/grafana/plugins

  2. 解压插件文件。

    unzip aliyun-tablestore-grafana-datasource-adapt-react.zip -d /var/lib/grafana/plugins/
  3. 修改 Grafana 配置文件,允许加载未签名插件。

    vi /etc/grafana/grafana.ini

    找到 allow_loading_unsigned_plugins 配置项,将其值设置为插件名称。

    allow_loading_unsigned_plugins = aliyun-tablestore-grafana-datasource-adapt-react
    说明

    该配置项默认处于注释状态(行首有 ;),修改时需同时去掉行首的 ;,否则配置不会生效。

  4. 重启 Grafana 服务使配置生效。

    systemctl restart grafana-server

2. 添加数据源

  1. 登录 Grafana,进入Connections > Data sources,单击Add new data source

  2. 在搜索框中输入 aliyun-tablestore,选择 aliyun-tablestore-grafana-datasource-adapt-react 类型。

  3. 填写以下连接参数。

    参数

    说明

    Endpoint

    Tablestore 实例访问地址,格式为 https://<实例名>.<region-id>.ots.aliyuncs.com

    Instance

    Tablestore 实例名称

    AccessId

    阿里云账号或 RAM 用户的 AccessKey ID

    AccessKey

    阿里云账号或 RAM 用户的 AccessKey Secret

    说明

    Endpoint 支持公网和 VPC 内网两种类型。如果 Grafana 所在机器与 Tablestore 实例处于同一地域的 VPC 中,建议使用 VPC 内网地址(格式为 https://<实例名>.<region-id>.vpc.tablestore.aliyuncs.com),以降低延迟并避免公网流量费用。否则,使用公网地址。

  4. 单击Save & test,验证连接是否成功。

3. 导入仪表盘

  1. 在导入前,打开仪表盘 JSON 文件,将其中 datasource 字段的 uid 修改为实际添加的数据源 UID。数据源 UID 可在数据源配置页面的 URL 中查看。以文件概览示例JSON为例,更多示例模板详见Grafana看板示例模板

    {
      "annotations": {
        "list": [
          {
            "builtIn": 1,
            "datasource": {
              "type": "grafana",
              "uid": "-- Grafana --"
            },
            "enable": true,
            "hide": true,
            "iconColor": "rgba(0, 211, 255, 1)",
            "name": "Annotations & Alerts",
            "type": "dashboard"
          }
        ]
      },
      "editable": true,
      "fiscalYearStartMonth": 0,
      "graphTooltip": 0,
      "id": 1,
      "links": [
        {
          "asDropdown": false,
          "icon": "external link",
          "includeVars": false,
          "keepTime": false,
          "tags": [],
          "targetBlank": false,
          "title": "New link",
          "tooltip": "",
          "type": "dashboards",
          "url": ""
        }
      ],
      "panels": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "description": "",
          "fieldConfig": {
            "defaults": {
              "color": {
                "mode": "continuous-GrYlRd"
              },
              "mappings": [],
              "thresholds": {
                "mode": "absolute",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "bytes"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 4,
            "w": 6,
            "x": 0,
            "y": 0
          },
          "id": 3,
          "options": {
            "colorMode": "background",
            "graphMode": "none",
            "justifyMode": "auto",
            "orientation": "horizontal",
            "percentChangeColorMode": "standard",
            "reduceOptions": {
              "calcs": [],
              "fields": "",
              "values": false
            },
            "showPercentChange": false,
            "textMode": "auto",
            "textStyle": {
              "fontSize": "40px",
              "fontWeight": "bold"
            },
            "wideLayout": true
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "datasource": {
                "type": "aliyun-tablestore-grafana-datasource-adapt-react",
                "uid": "fffrqt7w6setcc"
              },
              "formatType": "Table",
              "query": "SELECT SUM(fileSize) AS totalSize  FROM `$tableName` USE INDEX($indexName)",
              "refId": "A",
              "xcol": "",
              "ycol": ""
            }
          ],
          "title": "Total Capacity Usage",
          "transformations": [
            {
              "id": "prepareTimeSeries",
              "options": {}
            }
          ],
          "type": "stat"
        },
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "description": "",
          "fieldConfig": {
            "defaults": {
              "color": {
                "mode": "continuous-RdYlGr"
              },
              "mappings": [],
              "thresholds": {
                "mode": "absolute",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "none"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 4,
            "w": 6,
            "x": 6,
            "y": 0
          },
          "id": 4,
          "options": {
            "colorMode": "background",
            "graphMode": "area",
            "justifyMode": "auto",
            "orientation": "horizontal",
            "percentChangeColorMode": "standard",
            "reduceOptions": {
              "calcs": [],
              "fields": "",
              "values": false
            },
            "showPercentChange": false,
            "textMode": "auto",
            "textStyle": {
              "fontSize": "40px",
              "fontWeight": "bold"
            },
            "wideLayout": true
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "datasource": {
                "type": "aliyun-tablestore-grafana-datasource-adapt-react",
                "uid": "fffrqt7w6setcc"
              },
              "formatType": "Table",
              "query": "SELECT COUNT(*) AS file_count FROM `$tableName` USE INDEX($indexName)",
              "refId": "A",
              "xcol": "_time",
              "ycol": ""
            }
          ],
          "title": "Total File Count",
          "type": "stat"
        },
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "description": "",
          "fieldConfig": {
            "defaults": {
              "color": {
                "fixedColor": "orange",
                "mode": "shades"
              },
              "mappings": [],
              "thresholds": {
                "mode": "absolute",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "none"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 4,
            "w": 5,
            "x": 12,
            "y": 0
          },
          "id": 5,
          "options": {
            "colorMode": "background",
            "graphMode": "area",
            "justifyMode": "auto",
            "orientation": "horizontal",
            "percentChangeColorMode": "standard",
            "reduceOptions": {
              "calcs": [
                "lastNotNull"
              ],
              "fields": "",
              "values": false
            },
            "showPercentChange": false,
            "textMode": "auto",
            "textStyle": {
              "fontSize": "40px",
              "fontWeight": "bold"
            },
            "wideLayout": true
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "datasource": {
                "type": "aliyun-tablestore-grafana-datasource-adapt-react",
                "uid": "fffrqt7w6setcc"
              },
              "formatType": "Table",
              "query": "SELECT COUNT(DISTINCT region) AS regions_count FROM `$tableName` USE INDEX($indexName)",
              "refId": "A",
              "xcol": "_time",
              "ycol": ""
            }
          ],
          "title": "Regions Covered",
          "type": "stat"
        },
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "description": "",
          "fieldConfig": {
            "defaults": {
              "color": {
                "fixedColor": "purple",
                "mode": "shades"
              },
              "mappings": [],
              "thresholds": {
                "mode": "absolute",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "none"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 4,
            "w": 7,
            "x": 17,
            "y": 0
          },
          "id": 6,
          "options": {
            "colorMode": "background",
            "graphMode": "none",
            "justifyMode": "auto",
            "orientation": "horizontal",
            "percentChangeColorMode": "standard",
            "reduceOptions": {
              "calcs": [
                "lastNotNull"
              ],
              "fields": "",
              "values": false
            },
            "showPercentChange": false,
            "textMode": "auto",
            "textStyle": {
              "fontSize": "40px",
              "fontWeight": "bold"
            },
            "wideLayout": true
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "datasource": {
                "type": "aliyun-tablestore-grafana-datasource-adapt-react",
                "uid": "fffrqt7w6setcc"
              },
              "formatType": "Table",
              "query": "SELECT COUNT(DISTINCT cluster_id) AS cluster_count FROM `$tableName` USE INDEX($indexName)",
              "refId": "A",
              "xcol": "_time",
              "ycol": ""
            }
          ],
          "title": "Cluster Count",
          "type": "stat"
        },
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "fieldConfig": {
            "defaults": {
              "color": {
                "mode": "palette-classic"
              },
              "custom": {
                "axisBorderShow": false,
                "axisCenteredZero": false,
                "axisColorMode": "text",
                "axisLabel": "",
                "axisPlacement": "auto",
                "fillOpacity": 80,
                "gradientMode": "none",
                "hideFrom": {
                  "legend": false,
                  "tooltip": false,
                  "viz": false
                },
                "lineWidth": 1,
                "scaleDistribution": {
                  "log": 2,
                  "type": "log"
                },
                "thresholdsStyle": {
                  "mode": "off"
                }
              },
              "mappings": [],
              "thresholds": {
                "mode": "absolute",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "percent"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 10,
            "w": 24,
            "x": 0,
            "y": 4
          },
          "id": 7,
          "options": {
            "barRadius": 0,
            "barWidth": 0.15,
            "fullHighlight": true,
            "groupWidth": 0.7,
            "legend": {
              "calcs": [
                "first"
              ],
              "displayMode": "list",
              "placement": "bottom",
              "showLegend": false
            },
            "orientation": "horizontal",
            "showValue": "auto",
            "stacking": "none",
            "tooltip": {
              "hideZeros": false,
              "mode": "multi",
              "sort": "none"
            },
            "xField": "cluster_id",
            "xTickLabelRotation": 0,
            "xTickLabelSpacing": 0
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "datasource": {
                "type": "aliyun-tablestore-grafana-datasource-adapt-react",
                "uid": "fffrqt7w6setcc"
              },
              "formatType": "Table",
              "query": "SELECT \n    cluster_id,\n    ROUND((SUM(fileSize) * 100.0 / (SELECT SUM(fileSize) FROM `$tableName` USE INDEX($indexName))), 2) AS `Capacity Usage`\nFROM `$tableName` USE INDEX($indexName)\nGROUP BY cluster_id\nORDER BY cluster_id;",
              "refId": "A",
              "xcol": "_time",
              "ycol": ""
            }
          ],
          "title": "Capacity Distribution by Cluster",
          "type": "barchart"
        },
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "fieldConfig": {
            "defaults": {
              "color": {
                "mode": "palette-classic"
              },
              "mappings": [],
              "thresholds": {
                "mode": "percentage",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "percent"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 6,
            "w": 24,
            "x": 0,
            "y": 14
          },
          "id": 9,
          "options": {
            "minVizHeight": 75,
            "minVizWidth": 75,
            "orientation": "auto",
            "reduceOptions": {
              "calcs": [],
              "fields": "/^percentage$/",
              "values": true
            },
            "showThresholdLabels": false,
            "showThresholdMarkers": false,
            "sizing": "auto"
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "formatType": "Table",
              "query": "SELECT\n  fileset_id,\n  SUM(fileSize) * 100.0 / MAX(fileset_size_quota) AS percentage\nFROM `$tableName` USE INDEX($indexName)\nGROUP BY fileset_id\nORDER BY fileset_id",
              "refId": "A",
              "xcol": "_time",
              "ycol": ""
            }
          ],
          "title": "Fileset Capacity Usage",
          "type": "gauge"
        }
      ],
      "preload": false,
      "refresh": "5m",
      "schemaVersion": 42,
      "tags": [],
      "templating": {
        "list": [
          {
            "current": {
              "text": "bmcpfs_test",
              "value": "bmcpfs_test"
            },
            "hide": 2,
            "name": "tableName",
            "query": "bmcpfs_test",
            "skipUrlSync": true,
            "type": "constant"
          },
          {
            "current": {
              "text": "bmcpfs_test_idx",
              "value": "bmcpfs_test_idx"
            },
            "hide": 2,
            "name": "indexName",
            "query": "bmcpfs_test_idx",
            "skipUrlSync": true,
            "type": "constant"
          }
        ]
      },
      "time": {
        "from": "now-6h",
        "to": "now"
      },
      "timepicker": {
        "hidden": true
      },
      "timezone": "browser",
      "title": "CPFS Overview",
      "uid": "adqgdbx",
      "version": 64
    }
  2. 进入Dashboards,单击右上角New > Import,上传修改后的 JSON 文件。

4. 自定义表名和索引名(可选)

如果步骤二中使用了自定义的表名或索引名,需要在仪表盘变量中同步修改。

  1. 在仪表盘页面单击右上角Edit > Settings > Variables

  2. 找到 tableNameindexName 变量,将默认值修改为实际使用的表名和索引名。

  3. 单击Save dashboard

Grafana看板示例模板

容量趋势

{
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": {
          "type": "grafana",
          "uid": "-- Grafana --"
        },
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0, 211, 255, 1)",
        "name": "Annotations & Alerts",
        "type": "dashboard"
      }
    ]
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "id": 6,
  "links": [
    {
      "asDropdown": true,
      "icon": "external link",
      "includeVars": false,
      "keepTime": false,
      "tags": [],
      "targetBlank": false,
      "title": "Other Dashboards",
      "tooltip": "",
      "type": "dashboards",
      "url": ""
    }
  ],
  "liveNow": true,
  "panels": [
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "continuous-BlYlRd"
          },
          "custom": {
            "axisBorderShow": false,
            "axisCenteredZero": false,
            "axisColorMode": "text",
            "axisLabel": "",
            "axisPlacement": "auto",
            "fillOpacity": 80,
            "gradientMode": "none",
            "hideFrom": {
              "legend": false,
              "tooltip": false,
              "viz": false
            },
            "lineWidth": 1,
            "scaleDistribution": {
              "type": "linear"
            },
            "thresholdsStyle": {
              "mode": "off"
            }
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": [
          {
            "matcher": {
              "id": "byName",
              "options": "file_size"
            },
            "properties": [
              {
                "id": "unit",
                "value": "bytes"
              }
            ]
          }
        ]
      },
      "gridPos": {
        "h": 11,
        "w": 24,
        "x": 0,
        "y": 0
      },
      "id": 9,
      "options": {
        "barRadius": 0,
        "barWidth": 0.1,
        "colorByField": "time",
        "fullHighlight": true,
        "groupWidth": 0.7,
        "legend": {
          "calcs": [],
          "displayMode": "list",
          "placement": "bottom",
          "showLegend": false
        },
        "orientation": "auto",
        "showValue": "auto",
        "stacking": "none",
        "text": {
          "valueSize": 1
        },
        "tooltip": {
          "hideZeros": false,
          "mode": "single",
          "sort": "none"
        },
        "xField": "time",
        "xTickLabelRotation": 0,
        "xTickLabelSpacing": 100
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT DATE(DATE_ADD(FROM_UNIXTIME(cTime / 1000), INTERVAL 8 HOUR)) AS date ,SUM(fileSize) as file_size\n FROM (\n  SELECT \n    cTime,fileSize \n  FROM \n    $tableName USE INDEX($indexName)  \n  WHERE \n    cTime >= $__unixMilliTimeFrom() AND cTime < $__unixMilliTimeTo()\n) tl\nGROUP BY date\nORDER BY date",
          "refId": "A",
          "xcol": " time",
          "ycol": ""
        }
      ],
      "title": "New File Capacity Statistics",
      "type": "barchart"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "fixedColor": "green",
            "mode": "fixed"
          },
          "custom": {
            "axisBorderShow": false,
            "axisCenteredZero": false,
            "axisColorMode": "text",
            "axisLabel": "",
            "axisPlacement": "auto",
            "fillOpacity": 80,
            "gradientMode": "none",
            "hideFrom": {
              "legend": false,
              "tooltip": false,
              "viz": false
            },
            "lineWidth": 1,
            "scaleDistribution": {
              "type": "linear"
            },
            "thresholdsStyle": {
              "mode": "off"
            }
          },
          "displayName": "Total New Files",
          "fieldMinMax": false,
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              }
            ]
          },
          "unit": "none"
        },
        "overrides": []
      },
      "gridPos": {
        "h": 9,
        "w": 24,
        "x": 0,
        "y": 11
      },
      "id": 10,
      "options": {
        "barRadius": 0,
        "barWidth": 0.1,
        "colorByField": "day",
        "fullHighlight": true,
        "groupWidth": 0.7,
        "legend": {
          "calcs": [],
          "displayMode": "list",
          "placement": "right",
          "showLegend": false
        },
        "orientation": "auto",
        "showValue": "auto",
        "stacking": "none",
        "text": {
          "valueSize": 1
        },
        "tooltip": {
          "hideZeros": false,
          "mode": "multi",
          "sort": "none"
        },
        "xTickLabelRotation": 0,
        "xTickLabelSpacing": 100
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT DATE(DATE_ADD(FROM_UNIXTIME(cTime / 1000), INTERVAL 8 HOUR)) AS date ,COUNT(*) as file_count\nFROM (\n  SELECT \n    cTime,file_name \n  FROM \n    $tableName USE INDEX($indexName)  \n  WHERE \n    cTime >= $__unixMilliTimeFrom() AND cTime < $__unixMilliTimeTo()\n) tl\nGROUP BY date\nORDER BY date",
          "refId": "A",
          "xcol": "time",
          "ycol": ""
        }
      ],
      "title": "New File Count Statistics",
      "type": "barchart"
    }
  ],
  "preload": true,
  "refresh": "5m",
  "schemaVersion": 42,
  "tags": [],
  "templating": {
    "list": [
      {
        "current": {
          "text": "bmcpfs_test",
          "value": "bmcpfs_test"
        },
        "hide": 2,
        "label": "Table Name",
        "name": "tableName",
        "query": "bmcpfs_test",
        "skipUrlSync": true,
        "type": "constant"
      },
      {
        "current": {
          "text": "bmcpfs_test_idx",
          "value": "bmcpfs_test_idx"
        },
        "hide": 2,
        "name": "indexName",
        "query": "bmcpfs_test_idx",
        "skipUrlSync": true,
        "type": "constant"
      }
    ]
  },
  "time": {
    "from": "now-7d",
    "to": "now"
  },
  "timepicker": {},
  "timezone": "browser",
  "title": "CPFS Capacity Trend",
  "uid": "adj2l5q",
  "version": 41
}

目录对比

{
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": {
          "type": "grafana",
          "uid": "-- Grafana --"
        },
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0, 211, 255, 1)",
        "name": "Annotations & Alerts",
        "type": "dashboard"
      }
    ]
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "id": 7,
  "links": [
    {
      "asDropdown": false,
      "icon": "external link",
      "includeVars": false,
      "keepTime": false,
      "tags": [],
      "targetBlank": false,
      "title": "New link",
      "tooltip": "",
      "type": "dashboards",
      "url": ""
    }
  ],
  "panels": [
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "fixedColor": "blue",
            "mode": "fixed"
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": []
      },
      "gridPos": {
        "h": 4,
        "w": 8,
        "x": 0,
        "y": 0
      },
      "id": 1,
      "options": {
        "colorMode": "background",
        "graphMode": "area",
        "justifyMode": "auto",
        "orientation": "auto",
        "percentChangeColorMode": "standard",
        "reduceOptions": {
          "calcs": [
            "lastNotNull"
          ],
          "fields": "",
          "values": false
        },
        "showPercentChange": false,
        "textMode": "auto",
        "wideLayout": true
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT COUNT(*) AS total_count\nFROM (\n    SELECT file_name FROM $tableName USE INDEX() WHERE file_name Like CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n    UNION ALL\n    SELECT file_name FROM $tableName USE INDEX() WHERE file_name Like CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) t",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Total File Count",
      "type": "stat"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "fixedColor": "light-red",
            "mode": "fixed"
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": []
      },
      "gridPos": {
        "h": 4,
        "w": 8,
        "x": 8,
        "y": 0
      },
      "id": 2,
      "options": {
        "colorMode": "background",
        "graphMode": "area",
        "justifyMode": "auto",
        "orientation": "auto",
        "percentChangeColorMode": "standard",
        "reduceOptions": {
          "calcs": [
            "lastNotNull"
          ],
          "fields": "",
          "values": false
        },
        "showPercentChange": false,
        "textMode": "auto",
        "wideLayout": true
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT COUNT(*) AS duplicate_count\nFROM (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) a\nJOIN (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) b\nON (SUBSTRING_INDEX(a.file_name, '/', -1) = SUBSTRING_INDEX(b.file_name, '/', -1) AND a.fileSize = b.fileSize)",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Duplicate File Count",
      "type": "stat"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "fixedColor": "orange",
            "mode": "fixed"
          },
          "mappings": [],
          "noValue": "0GB",
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          },
          "unit": "bytes"
        },
        "overrides": []
      },
      "gridPos": {
        "h": 4,
        "w": 8,
        "x": 16,
        "y": 0
      },
      "id": 3,
      "options": {
        "colorMode": "background",
        "graphMode": "area",
        "justifyMode": "auto",
        "orientation": "auto",
        "percentChangeColorMode": "standard",
        "reduceOptions": {
          "calcs": [
            "lastNotNull"
          ],
          "fields": "",
          "values": false
        },
        "showPercentChange": false,
        "textMode": "auto",
        "wideLayout": true
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT SUM(a.fileSize) AS duplicate_size\nFROM (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) a\nJOIN (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) b\nON (SUBSTRING_INDEX(a.file_name, '/', -1) = SUBSTRING_INDEX(b.file_name, '/', -1) AND a.fileSize = b.fileSize)",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Duplicate Space Usage",
      "type": "stat"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "custom": {
            "align": "auto",
            "cellOptions": {
              "type": "auto"
            },
            "footer": {
              "reducers": []
            },
            "hideFrom": {
              "viz": false
            },
            "inspect": false,
            "wrapHeaderText": false
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": [
          {
            "matcher": {
              "id": "byName",
              "options": "Directory A/B Path"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 801
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File Name"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 405
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File Size"
            },
            "properties": [
              {
                "id": "unit",
                "value": "bytes"
              }
            ]
          }
        ]
      },
      "gridPos": {
        "h": 9,
        "w": 24,
        "x": 0,
        "y": 4
      },
      "id": 4,
      "options": {
        "cellHeight": "sm",
        "showHeader": true,
        "sortBy": [
          {
            "desc": false,
            "displayName": "File Size"
          }
        ]
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT \n    SUBSTRING_INDEX(a.file_name, '/', -1) AS 'File Name',\n    CONCAT('A:',a.file_name,'\\n','B:',b.file_name) AS 'Directory A/B Path',\n    a.fileSize AS 'File Size'\nFROM (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) a\nJOIN (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) b\nON (SUBSTRING_INDEX(a.file_name, '/', -1) = SUBSTRING_INDEX(b.file_name, '/', -1) AND a.fileSize = b.fileSize)",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Duplicate File List",
      "type": "table"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "custom": {
            "align": "auto",
            "cellOptions": {
              "type": "auto"
            },
            "footer": {
              "reducers": []
            },
            "hideFrom": {
              "viz": false
            },
            "inspect": false
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": [
          {
            "matcher": {
              "id": "byName",
              "options": "File Size"
            },
            "properties": [
              {
                "id": "unit",
                "value": "bytes"
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Path Source"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 118
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File Name"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 288
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File Path"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 904
              }
            ]
          }
        ]
      },
      "gridPos": {
        "h": 7,
        "w": 24,
        "x": 0,
        "y": 13
      },
      "id": 5,
      "options": {
        "cellHeight": "sm",
        "showHeader": true,
        "sortBy": [
          {
            "desc": false,
            "displayName": "File Path"
          }
        ]
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT \n    'Path A' AS 'Path Source',\n    SUBSTRING_INDEX(file_name, '/', -1) AS 'File Name',\n    file_name AS 'File Path',\n    fileSize AS 'File Size'\nFROM $tableName USE INDEX()\nWHERE \n    file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n    AND (SUBSTRING_INDEX(file_name, '/', -1), fileSize) NOT IN (\n    SELECT \n        SUBSTRING_INDEX(file_name, '/', -1), fileSize\n    FROM \n        $tableName USE INDEX()\n    WHERE \n        file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n  )\n\nUNION ALL\n\nSELECT \n    'Path B' AS 'Path Source',\n    SUBSTRING_INDEX(file_name, '/', -1) AS 'File Name',\n    file_name AS 'File Path',\n    fileSize AS 'File Size'\nFROM $tableName USE INDEX()\nWHERE \n    file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n    AND (SUBSTRING_INDEX(file_name, '/', -1), fileSize) NOT IN (\n    SELECT \n        SUBSTRING_INDEX(file_name, '/', -1), fileSize\n    FROM \n        $tableName USE INDEX()\n    WHERE \n        file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n  )",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Unique File List",
      "type": "table"
    }
  ],
  "preload": false,
  "schemaVersion": 42,
  "tags": [],
  "templating": {
    "list": [
      {
        "current": {
          "text": "/fileset1",
          "value": "/fileset1"
        },
        "label": "Source Directory (A)",
        "name": "path1",
        "options": [
          {
            "selected": true,
            "text": "/fileset1",
            "value": "/fileset1"
          }
        ],
        "query": "/fileset1",
        "type": "textbox"
      },
      {
        "current": {
          "text": "/fileset2",
          "value": "/fileset2"
        },
        "label": "Reference Directory (B)",
        "name": "path2",
        "options": [
          {
            "selected": true,
            "text": "/fileset2",
            "value": "/fileset2"
          }
        ],
        "query": "/fileset2",
        "type": "textbox"
      },
      {
        "current": {
          "text": "bmcpfs_test",
          "value": "bmcpfs_test"
        },
        "hide": 2,
        "label": "Table Name",
        "name": "tableName",
        "query": "bmcpfs_test",
        "skipUrlSync": true,
        "type": "constant"
      }
    ]
  },
  "time": {
    "from": "now-6h",
    "to": "now"
  },
  "timepicker": {
    "hidden": true
  },
  "timezone": "browser",
  "title": "CPFS Directory Comparison",
  "uid": "adwm968",
  "version": 44
}

文件搜索

{
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": "-- Grafana --",
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0, 211, 255, 1)",
        "name": "Annotations & Alerts",
        "type": "dashboard"
      }
    ]
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "id": 4,
  "links": [
    {
      "asDropdown": true,
      "icon": "external link",
      "includeVars": false,
      "keepTime": false,
      "tags": [],
      "targetBlank": false,
      "title": "Other Dashboards",
      "tooltip": "",
      "type": "dashboards",
      "url": ""
    }
  ],
  "panels": [
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "custom": {
            "align": "left",
            "cellOptions": {
              "type": "auto"
            },
            "footer": {
              "reducers": []
            },
            "inspect": false
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": [
          {
            "matcher": {
              "id": "byName",
              "options": "File Name"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 667
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Creation Time"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 238
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Region"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 277
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Creation Time"
            },
            "properties": [
              {
                "id": "unit",
                "value": "dateTimeAsIso"
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File Size"
            },
            "properties": [
              {
                "id": "unit",
                "value": "bytes"
              }
            ]
          }
        ]
      },
      "gridPos": {
        "h": 21,
        "w": 24,
        "x": 0,
        "y": 0
      },
      "id": 1,
      "options": {
        "cellHeight": "md",
        "showHeader": true,
        "sortBy": []
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT \n  file_name AS \"File Name\",\n  cTime AS \"Creation Time\",\n  region AS \"Region\",\n  fileSize AS \"File Size\"\nFROM $tableName USE INDEX($indexName)\nWHERE TEXT_MATCH_PHRASE(file_name,'$searchKeyword')\nLIMIT $limit",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "File Search Results",
      "type": "table"
    }
  ],
  "preload": false,
  "refresh": "",
  "schemaVersion": 42,
  "tags": [],
  "templating": {
    "list": [
      {
        "current": {
          "text": "/fileset1",
          "value": "/fileset1"
        },
        "label": "File Search",
        "name": "searchKeyword",
        "options": [
          {
            "selected": true,
            "text": "/fileset1",
            "value": "/fileset1"
          }
        ],
        "query": "/fileset1",
        "type": "textbox"
      },
      {
        "current": {
          "text": "bmcpfs_test",
          "value": "bmcpfs_test"
        },
        "hide": 2,
        "label": "Table Name",
        "name": "tableName",
        "query": "bmcpfs_test",
        "skipUrlSync": true,
        "type": "constant"
      },
      {
        "current": {
          "text": "5000",
          "value": "5000"
        },
        "label": "Display Limit (TOP)",
        "name": "limit",
        "options": [
          {
            "selected": false,
            "text": "10",
            "value": "10"
          },
          {
            "selected": false,
            "text": "50",
            "value": "50"
          },
          {
            "selected": false,
            "text": "100",
            "value": "100"
          },
          {
            "selected": false,
            "text": "500",
            "value": "500"
          },
          {
            "selected": false,
            "text": "1000",
            "value": "1000"
          },
          {
            "selected": true,
            "text": "5000",
            "value": "5000"
          },
          {
            "selected": false,
            "text": "10000",
            "value": "10000"
          }
        ],
        "query": "10,50,100,500,1000,5000,10000",
        "type": "custom"
      },
      {
        "current": {
          "text": "bmcpfs_test_idx",
          "value": "bmcpfs_test_idx"
        },
        "hide": 2,
        "name": "indexName",
        "query": "bmcpfs_test_idx",
        "skipUrlSync": true,
        "type": "constant"
      }
    ]
  },
  "time": {
    "from": "now-1h",
    "to": "now"
  },
  "timepicker": {
    "hidden": true
  },
  "timezone": "",
  "title": "CPFS File Search",
  "uid": "file-search-analysis",
  "version": 33
}

文件检索与分析

{
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": "-- Grafana --",
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0, 211, 255, 1)",
        "name": "Annotations & Alerts",
        "type": "dashboard"
      }
    ]
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "id": 3,
  "links": [
    {
      "asDropdown": true,
      "icon": "external link",
      "includeVars": false,
      "keepTime": false,
      "tags": [],
      "targetBlank": false,
      "title": "Other Dashboards",
      "tooltip": "",
      "type": "dashboards",
      "url": ""
    }
  ],
  "panels": [
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "continuous-BlPu"
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          },
          "unit": "none"
        },
        "overrides": []
      },
      "gridPos": {
        "h": 3,
        "w": 12,
        "x": 0,
        "y": 0
      },
      "id": 3,
      "options": {
        "colorMode": "background",
        "graphMode": "none",
        "justifyMode": "auto",
        "orientation": "horizontal",
        "percentChangeColorMode": "standard",
        "reduceOptions": {
          "calcs": [
            "last"
          ],
          "fields": "",
          "values": false
        },
        "showPercentChange": false,
        "textMode": "auto",
        "wideLayout": true
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT COUNT(*) AS file_count\nFROM `$tableName` USE INDEX($indexName)\nWHERE \n  fileSize >= IF('$fileSize'='',0,$fileSize+0) * 1024\nAND \n  aTime >= $__unixMilliTimeFrom() AND aTime < $__unixMilliTimeTo()\nAND\n  TEXT_MATCH_PHRASE(file_name,'$pathFilter')",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Matching File Count",
      "type": "stat"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "mappings": [],
          "noValue": "0GB",
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 100
              }
            ]
          },
          "unit": "bytes"
        },
        "overrides": []
      },
      "gridPos": {
        "h": 3,
        "w": 12,
        "x": 12,
        "y": 0
      },
      "id": 5,
      "options": {
        "colorMode": "background",
        "graphMode": "area",
        "justifyMode": "auto",
        "orientation": "horizontal",
        "percentChangeColorMode": "standard",
        "reduceOptions": {
          "calcs": [
            "last"
          ],
          "fields": "",
          "values": false
        },
        "showPercentChange": false,
        "textMode": "value",
        "wideLayout": true
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT \n  SUM(fileSize) AS cpfs_total_capacity_bytes\nFROM `$tableName` USE INDEX($indexName)\nWHERE \n  fileSize >= IF('$fileSize'='',0,$fileSize+0) * 1024\nAND \n  aTime >= $__unixMilliTimeFrom() AND aTime < $__unixMilliTimeTo()\nAND\n  TEXT_MATCH_PHRASE(file_name,'$pathFilter')",
          "refId": "A",
          "xcol": "aTime",
          "ycol": ""
        }
      ],
      "title": "Total Capacity of Matching Files",
      "type": "stat"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "custom": {
            "align": "left",
            "cellOptions": {
              "type": "auto"
            },
            "footer": {
              "reducers": []
            },
            "inspect": false
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": [
          {
            "matcher": {
              "id": "byName",
              "options": "File Path"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 561
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File Size"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 198
              },
              {
                "id": "unit",
                "value": "bytes"
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Modification Time"
            },
            "properties": [
              {
                "id": "unit",
                "value": "dateTimeAsIso"
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Access Time"
            },
            "properties": [
              {
                "id": "unit",
                "value": "dateTimeAsIso"
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Update Time"
            },
            "properties": [
              {
                "id": "unit",
                "value": "dateTimeAsIso"
              }
            ]
          }
        ]
      },
      "gridPos": {
        "h": 17,
        "w": 24,
        "x": 0,
        "y": 3
      },
      "id": 1,
      "options": {
        "cellHeight": "md",
        "showHeader": true,
        "sortBy": [
          {
            "desc": false,
            "displayName": "File Path"
          }
        ]
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT \n  file_name AS \"File Path\",\n  fileSize AS \"File Size\",\n  cTime AS \"Modification Time\",\n  aTime AS \"Access Time\",\n  mTime AS \"Update Time\"\nFROM `$tableName` USE INDEX($indexName)\nWHERE \n  fileSize >= IF('$fileSize'='',0,$fileSize+0) * 1024\nAND \n  aTime >= $__unixMilliTimeFrom() AND aTime < $__unixMilliTimeTo()\nAND\n  TEXT_MATCH_PHRASE(file_name,'$pathFilter')\nORDER BY fileSize ASC\nLIMIT $limit",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "File Query Results",
      "type": "table"
    }
  ],
  "preload": false,
  "refresh": "5m",
  "schemaVersion": 42,
  "tags": [],
  "templating": {
    "list": [
      {
        "current": {
          "text": "",
          "value": ""
        },
        "label": "File Size Threshold (KiB)",
        "name": "fileSize",
        "options": [
          {
            "selected": true,
            "text": "",
            "value": ""
          }
        ],
        "query": "",
        "type": "textbox"
      },
      {
        "current": {
          "text": "fileset1",
          "value": "fileset1"
        },
        "label": "Directory Filter",
        "name": "pathFilter",
        "options": [
          {
            "selected": true,
            "text": "fileset1",
            "value": "fileset1"
          }
        ],
        "query": "fileset1",
        "type": "textbox"
      },
      {
        "current": {
          "text": "5000",
          "value": "5000"
        },
        "label": "Display Limit (TOP)",
        "name": "limit",
        "options": [
          {
            "selected": false,
            "text": "10",
            "value": "10"
          },
          {
            "selected": false,
            "text": "50",
            "value": "50"
          },
          {
            "selected": false,
            "text": "100",
            "value": "100"
          },
          {
            "selected": false,
            "text": "500",
            "value": "500"
          },
          {
            "selected": false,
            "text": "1000",
            "value": "1000"
          },
          {
            "selected": true,
            "text": "5000",
            "value": "5000"
          },
          {
            "selected": false,
            "text": "10000",
            "value": "10000"
          }
        ],
        "query": "10,50,100,500,1000,5000,10000",
        "type": "custom"
      },
      {
        "current": {
          "text": "bmcpfs_test",
          "value": "bmcpfs_test"
        },
        "description": "",
        "hide": 2,
        "label": "Table Name",
        "name": "tableName",
        "query": "bmcpfs_test",
        "skipUrlSync": true,
        "type": "constant"
      },
      {
        "current": {
          "text": "bmcpfs_test_idx",
          "value": "bmcpfs_test_idx"
        },
        "hide": 2,
        "name": "indexName",
        "query": "bmcpfs_test_idx",
        "skipUrlSync": true,
        "type": "constant"
      }
    ]
  },
  "time": {
    "from": "now-30d",
    "to": "now"
  },
  "timepicker": {},
  "timezone": "",
  "title": "CPFS File Search & Analysis",
  "uid": "cpfs-analysis",
  "version": 90
}