Store and query CPFS file metadata in Tablestore

更新时间:
复制 MD 格式

CPFS is widely used for high-performance computing and AI training. As file counts grow, understanding the file system's capacity distribution, data hotness, and fileset quota usage is essential for resource management. You can scan CPFS file metadata, store it in Tablestore, and visualize it through Grafana dashboards for real-time monitoring.

Architecture

The overall data flow is as follows:

  1. CPFS file system: A scanning script runs on a Linux machine with the CPFS file system mounted. The script traverses specified directories to collect file metadata, including paths, sizes, access times, and modification times, as well as fileset quota information. The script first saves the scan results locally, then writes them to Tablestore, and finally packages and uploads them to OSS for archiving.

  2. Tablestore: Provides persistent storage for metadata. It uses Search Index to support flexible queries based on dimensions such as time ranges, file paths, and filesets. It integrates with Grafana using SQL-mapped tables.

  3. Grafana: Connects to Tablestore using the Alibaba Cloud Tablestore data source plugin. By importing a pre-built dashboard, you can visualize metrics such as file system capacity, file counts, and fileset usage.

image

Prerequisites

Prepare the following environment and resources.

  1. Mount a CPFS file system: This example uses the CPFS Intelligent Computing Edition. Mount the root directory of the file system to the /cpfs directory on a Linux server. For mounting instructions, see the CPFS documentation:

  2. Python environment: Ensure Python 3.8 or later is installed on the machine that runs the scan.

  3. Tablestore instance: Create an instance in the Tablestore console and obtain the instance name and endpoint.

  4. OSS bucket: Create an OSS bucket to store archived scan results, and note the bucket name and its region.

  5. Grafana: Ensure Grafana is installed on the machine used for visual monitoring.

Step 1: Install dependencies

On the machine where you will run the scan, install the required Python dependency packages.

pip3 install alibabacloud-oss-v2 tablestore alibabacloud-nas20170626
Note

These dependency packages require Python 3.8 or later. Before installing them, run python3 --version to check your Python version.

Step 2: Initialize Tablestore

Before you run the scan, you must create a table, a Search Index, and an SQL mapping table in your Tablestore instance. The init_ots.py script automates these tasks.

The script performs the following actions:

  • Table: Uses file_name (file path) as the primary key to store file metadata. The time to live (TTL) is set to 86,400 seconds (1 day).

  • Search Index: Creates an index on fields such as file size, timestamps, and fileset information to enable multidimensional queries and aggregations.

  • SQL mapping table: Creates an SQL mapping for the table, which enables the Grafana Tablestore data source plugin to query data using SQL.

Save the following script as init_ots.py.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Tablestore initialization script.
Creates a table, a Search Index, and an SQL mapping table.
"""

import sys
import argparse
from tablestore import OTSClient, TableMeta, TableOptions, ReservedThroughput, CapacityUnit
from tablestore import FieldSchema, FieldType, SearchIndexMeta, IndexSetting, AnalyzerType, SplitAnalyzerParameter

def create_table(client, table_name):
    """
    Creates a table.
    Primary key: file_name (String)
    TTL: 86400
    """
    print(f"[1/3] Creating table: {table_name}")

    try:
        # Define the primary key schema.
        schema_of_primary_key = [('file_name', 'STRING')]

        # Define the table metadata.
        table_meta = TableMeta(table_name, schema_of_primary_key)

        # Define the table options. Set the TTL to 86,400 seconds.
        table_options = TableOptions(
            time_to_live=86400,
            max_version=1,
            max_time_deviation=86400,
            allow_update=False
        )

        # You must specify the reserved throughput when you create a table. The default value is 0.
        reserved_throughput = ReservedThroughput(CapacityUnit(0, 0))

        # Create the table.
        client.create_table(table_meta, table_options, reserved_throughput)

        print(f"✓ Table created: {table_name}")
        return True

    except Exception as e:
        print(f"✗ Failed to create table: {str(e)}")
        return False


def create_search_index(client, table_name, index_name):
    """
    Creates a Search Index.
    TTL: 86400
    """
    print(f"[2/3] Creating Search Index: {index_name}")

    try:
        # Define the field schema.
        fields = [
            # file_name: Text. Uses a split analyzer with "/" as the delimiter. Case-sensitive.
            FieldSchema('file_name', FieldType.TEXT, index=True,
                        analyzer=AnalyzerType.SPLIT,
                        analyzer_parameter=SplitAnalyzerParameter("/")),

            # aTime: Long
            FieldSchema('aTime', FieldType.LONG, index=True),

            # wTime: Long
            FieldSchema('wTime', FieldType.LONG, index=True),

            # cluster_id: Keyword
            FieldSchema('cluster_id', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),

            # fileSize: Long
            FieldSchema('fileSize', FieldType.LONG, index=True),

            # cTime: Long
            FieldSchema('cTime', FieldType.LONG, index=True),

            # region: Keyword
            FieldSchema('region', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),

            # mTime: Long
            FieldSchema('mTime', FieldType.LONG, index=True),

            # fileset_id: Keyword
            FieldSchema('fileset_id', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),

            # fileset_name: Keyword
            FieldSchema('fileset_name', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),

            # fileset_file_count_quota: Long
            FieldSchema('fileset_file_count_quota', FieldType.LONG, index=True),

            # fileset_size_quota: Long
            FieldSchema('fileset_size_quota', FieldType.LONG, index=True),
        ]

        # Configure the index settings.
        index_setting = IndexSetting(routing_fields=['file_name'])

        # Define the Search Index metadata.
        index_meta = SearchIndexMeta(
            fields,
            index_setting=index_setting,
            index_sort=None,
            time_to_live=86400
        )

        # Create the Search Index.
        client.create_search_index(table_name, index_name, index_meta)

        print(f"✓ Search Index created: {index_name}")
        return True

    except Exception as e:
        print(f"✗ Failed to create Search Index: {str(e)}")
        return False


def create_mapping_table(client, table_name):
    """
    Creates an SQL mapping table.
    """
    print(f"[3/3] Creating SQL mapping table")

    try:
        # Construct the SQL statement.
        sql_query = f"""CREATE TABLE `{table_name}` (
            `file_name` VARCHAR(1024),
            `aTime` BIGINT(20),
            `wTime` BIGINT(20),
            `cluster_id` MEDIUMTEXT,
            `fileSize` BIGINT(20),
            `cTime` BIGINT(20),
            `region` MEDIUMTEXT,
            `mTime` BIGINT(20),
            `fileset_id` MEDIUMTEXT,
            `fileset_name` MEDIUMTEXT,
            `fileset_file_count_quota` BIGINT(20),
            `fileset_size_quota` BIGINT(20),
            PRIMARY KEY(`file_name`)
        )"""

        # Execute the SQL statement.
        client.exe_sql_query(sql_query)

        print(f"✓ SQL mapping table created")
        return True

    except Exception as e:
        print(f"✗ Failed to create SQL mapping table: {str(e)}")
        return False


def main():
    """
    Main function
    """
    print("=" * 60)
    print("Tablestore Initialization Script")
    print("=" * 60)

    # Parse command-line arguments.
    parser = argparse.ArgumentParser(description='Tablestore initialization script - Creates a table, a Search Index, and an SQL mapping table')
    parser.add_argument('--endpoint', required=True, help='The endpoint of the Tablestore instance')
    parser.add_argument('--instance-name', required=True, help='The name of the Tablestore instance')
    parser.add_argument('--access-key-id', required=True, help='Your Access Key ID')
    parser.add_argument('--access-key-secret', required=True, help='Your Access Key Secret')
    parser.add_argument('--table-name', required=True, help='The name of the table to create')
    parser.add_argument('--search-index-name', required=True, help='The name of the Search Index to create')

    args = parser.parse_args()

    # Print the configuration information.
    print("\nConfiguration:")
    print(f"  Endpoint: {args.endpoint}")
    print(f"  Instance Name: {args.instance_name}")
    print(f"  Table Name: {args.table_name}")
    print(f"  Search Index Name: {args.search_index_name}")
    print()

    try:
        # Initialize the Tablestore client.
        print("Connecting to the Tablestore instance...")
        client = OTSClient(
            args.endpoint,
            args.access_key_id,
            args.access_key_secret,
            args.instance_name
        )
        print("✓ Connection successful\n")

        # Run the initialization steps.
        success = True

        # Step 1: Create the table
        if not create_table(client, args.table_name):
            success = False
            sys.exit(1)

        print()

        # Step 2: Create the Search Index
        if not create_search_index(client, args.table_name, args.search_index_name):
            success = False
            sys.exit(1)

        print()

        # Step 3: Create the SQL mapping table
        if not create_mapping_table(client, args.table_name):
            success = False
            sys.exit(1)

        print()
        print("=" * 60)
        if success:
            print("✓ All initialization steps are complete!")
        else:
            print("✗ An error occurred during initialization")
        print("=" * 60)

    except Exception as e:
        print(f"\n✗ Script execution failed: {str(e)}")
        sys.exit(1)


if __name__ == "__main__":
    main()

Run the following command to initialize Tablestore.

python3 init_ots.py \
  --endpoint https://<instance-name>.<region-id>.ots.aliyuncs.com \
  --instance-name <instance-name> \
  --access-key-id <Access Key ID> \
  --access-key-secret <Access Key Secret> \
  --table-name bmcpfs_test \
  --search-index-name bmcpfs_test_idx
Note
  • The --table-name and --search-index-name parameters use the example values bmcpfs_test and bmcpfs_test_idx. You can customize these values as needed. If you change these names, you must update the corresponding configurations in bmcpfs_stat_tool_config.json and the variables in the Grafana dashboard in later steps.

  • The --endpoint parameter must use the format https://<instance-name>.<region-id>.ots.aliyuncs.com. You can find the endpoint on the Instance Details page in the Tablestore Console.

Step 3: Deploy scanning script

On the machine that will run the scan, create a deployment directory, such as /opt/cpfs-scanner, and populate it with the following files.

  1. Create a file named meta_stat.py and add the following code.

    import argparse
    import concurrent
    import csv
    import fcntl
    import json
    import logging
    import os
    import re
    import shutil
    import subprocess
    import sys
    import tarfile
    import time
    from collections import OrderedDict
    from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor
    from datetime import datetime, timezone
    from logging.handlers import RotatingFileHandler
    from typing import Dict, Optional
    
    import alibabacloud_oss_v2 as oss
    import tablestore
    from alibabacloud_oss_v2.exceptions import OperationError
    from alibabacloud_nas20170626.client import Client as NAS20170626Client
    from alibabacloud_credentials.client import Client as CredentialClient
    from alibabacloud_credentials.models import Config as CredentialConfig
    from alibabacloud_tea_openapi import models as open_api_models
    from alibabacloud_nas20170626 import models as nas20170626_models
    from alibabacloud_tea_util import models as util_models
    from tablestore import PutRowItem
    
    VERSION = "1.3a"
    PUT = "PUT"
    UPDATE = "UPDATE"
    IGNORE = "IGNORE"
    NEW_ATIME = "NEW_ATIME"
    AVAILABLE_OTS_COLUMN_CONDITION = [IGNORE, NEW_ATIME]
    AVAILABLE_OTS_WRITE_TYPE = [UPDATE, PUT]
    TABLESTORE_BATCH_SIZE = 200
    # Set up logging.
    SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
    LOG_FILE = os.path.join(SCRIPT_DIR, 'meta_stat.log')
    
    # Initialize the logger.
    logger = logging.getLogger('meta_stat')
    logger.setLevel(logging.DEBUG)
    
    # Console handler.
    console_handler = logging.StreamHandler()
    console_formatter = logging.Formatter('%(levelname)s: %(message)s')
    console_handler.setFormatter(console_formatter)
    console_handler.setLevel(logging.INFO)
    logger.addHandler(console_handler)
    
    # File handler (with rotation).
    file_handler = RotatingFileHandler(LOG_FILE, maxBytes=1024 * 1024 * 5, backupCount=5)
    file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(file_formatter)
    file_handler.setLevel(logging.DEBUG)
    logger.addHandler(file_handler)
    
    # Other constant definitions.
    PYTHON_FILE_PATH = os.path.abspath(__file__)
    SCRIPT_DIR = os.path.dirname(PYTHON_FILE_PATH)
    os.chdir(SCRIPT_DIR)  # Change to the script's directory.
    PYTHON_PATH = sys.executable
    DATE = datetime.now().strftime("%Y%m%dT%H%M")
    SYSTEM_DIRS = [".sys_recyclebin", ".snapshots", ".sys_healthchecker",
                   ".audit_log", ".msgq", ".fakesnapshots"]
    
    thread_executor = ThreadPoolExecutor(max_workers=10)
    MAX_OTS_FUTURES = 10
    
    
    def read_from_json(key, filename, default=None):
        config_path = os.path.join(SCRIPT_DIR, filename)
        if not os.path.exists(config_path):
            if default is not None:
                return default
            raise FileNotFoundError(f"Credential file not found: {config_path}")
        with open(config_path, 'r') as f:
            try:
                config_json = json.load(f)
            except json.JSONDecodeError:
                raise ValueError(f"Invalid format in .meta_stat_key.json. Check if the file is valid JSON. Path: {config_path}")
        value = config_json.get(key)
        if value is None:
            if default is not None:
                return default
            raise ValueError(
                f".meta_stat_key.json is missing the required field: {key}. Make sure the file contains access_key_id and access_key_secret.")
        return value
    
    
    BASE_PATH = read_from_json("BASE_PATH", '.meta_stat_key.json')
    if not BASE_PATH.endswith('/'):
        BASE_PATH += '/'
    OUTPUT_DIR_PREFIX = read_from_json("OUTPUT_DIR_PREFIX", '.meta_stat_key.json')
    OSS_CONFIG_FILE_NAME = os.path.basename(read_from_json("OSS_KEY", '.meta_stat_key.json'))
    
    
    class OssTool:
        def __init__(self):
            self.region = read_from_json('OSS_REGION', '.meta_stat_key.json')
            self.bucket = read_from_json('OSS_BUCKET', '.meta_stat_key.json')
            self.key = read_from_json('OSS_KEY', '.meta_stat_key.json')
            self.endpoint = read_from_json('OSS_ENDPOINT', '.meta_stat_key.json')
            self.client = None
    
        def init_client(self):
            if self.client is not None:
                return
    
            try:
                credentials_provider = oss.credentials.StaticCredentialsProvider(
                    access_key_id=read_from_json('oss_access_key_id', '.meta_stat_key.json'),
                    access_key_secret=read_from_json('oss_access_key_secret', '.meta_stat_key.json')
                )
            except Exception as e:
                raise RuntimeError(f"Failed to load the local key file. Make sure that .meta_stat_key.json exists and is correctly configured. Error: {e}")
    
            cfg = oss.config.load_default()
            cfg.credentials_provider = credentials_provider
            cfg.region = self.region
            if self.endpoint:
                cfg.endpoint = self.endpoint
            self.client = oss.Client(cfg)
    
        def download_file_if_newer(self):
            self.init_client()
            _file_path = os.path.join(SCRIPT_DIR, os.path.basename(self.key))
            try:
                head_result = self.client.head_object(oss.HeadObjectRequest(bucket=self.bucket, key=self.key))
                if_modified_since = self.get_local_file_mtime(_file_path)
    
                if head_result.last_modified <= if_modified_since:
                    logger.info("Time:%s, File is up to date. Skipping download." % datetime.now())
                    return False
    
                result = self.client.get_object_to_file(oss.GetObjectRequest(bucket=self.bucket, key=self.key),
                                                        _file_path)
    
                if result.status_code == 200:
                    logger.info(f"File has been updated and saved to: {_file_path}")
                    return True
                else:
                    logger.warning(f"Download failed. Status code: {result.status_code}")
                    return False
    
            except OperationError as e:
                error = e.kwargs.get('error')
                if error and error.status_code == 304:
                    logger.info("Time:%s, File is up to date. Skipping download." % datetime.now())
                    return False
                else:
                    logger.error(f"Request failed: {e}", exc_info=True)
                    return False
            except Exception as e:
                logger.exception(f"An exception occurred: {e}")
                return False
    
        def upload_file_to_oss(self, local_file_path, oss_key):
            self.init_client()
    
            request = oss.PutObjectRequest(
                bucket=self.bucket,
                key=oss_key,
                body=open(local_file_path, 'rb')
            )
    
            try:
                result = self.client.put_object(request)
    
                if result.status_code == 200:
                    logger.info(f"File has been uploaded to OSS: oss://{self.bucket}/{oss_key}")
                    return True
                else:
                    logger.warning(f"Upload failed. Status code: {result.status_code}")
                    return False
    
            except OperationError as e:
                error = e.kwargs.get('error')
                if error and error.status_code == 404:
                    logger.error("The destination bucket or path does not exist. Check your configuration.")
                else:
                    logger.error(f"Request failed: {e}", exc_info=True)
                return False
            except Exception as e:
                logger.exception(f"An exception occurred: {e}")
                return False
    
        def upload_dir_to_oss(self, new_dir_path, oss_path):
            # Record the start time.
            start_time = time.time()
            for file in os.listdir(new_dir_path):
                if file.endswith(".csv"):
                    file_path = os.path.join(new_dir_path, file)
                    self.upload_file_to_oss(file_path, os.path.join(oss_path, file))
            logger.info(f"OSS upload complete. Time elapsed: {time.time() - start_time} seconds")
    
        @staticmethod
        def get_local_file_mtime(_file_path):
            if not os.path.exists(_file_path):
                return datetime.fromtimestamp(0, tz=timezone.utc)
            mtime = os.path.getmtime(_file_path)
            return datetime.fromtimestamp(mtime, tz=timezone.utc)
    
    
    class TablestoreWriter:
        def __init__(self):
            ots_config = read_from_json("otsConfig", OSS_CONFIG_FILE_NAME)
            self.table_name = ots_config.get("tableName")
            self.endpoint = ots_config.get("endpoint")
            self.instance_name = ots_config.get("instanceName")
            self.row_exist_exception_str = ots_config.get("rowExistenceException")
            self.column_condition = ots_config.get("columnCondition")
            self.ots_write_type = ots_config.get("writeType")
            self.access_key_id = read_from_json("ots_access_key_id", '.meta_stat_key.json')
            self.access_key_secret = read_from_json("ots_access_key_secret", '.meta_stat_key.json')
            self._tablestore_client = tablestore.OTSClient(
                self.endpoint,
                self.access_key_id,
                self.access_key_secret,
                self.instance_name
            )
    
        def single_put(self, row: tablestore.PutRowItem) -> bool:
            retry_count = 20
            for i in range(retry_count):
                try:
                    self._tablestore_client.put_row(self.table_name, row.row)
                    return True
                except Exception as e:
                    if i == retry_count - 1:
                        print("Failed to update single row", e)
                        return False
            return True
    
        def single_update(self, row: tablestore.UpdateRowItem) -> bool:
            retry_count = 20
            for i in range(retry_count):
                try:
                    self._tablestore_client.update_row(self.table_name, row.row, row.condition)
                    return True
                except Exception as e:
                    if i == retry_count - 1:
                        print("Failed to update single row", e)
                        return False
    
        @staticmethod
        def batch_write(rows: list[tablestore.UpdateRowItem]) -> int:
            global ots_writer, thread_executor
            if ots_writer is None:
                ots_writer = TablestoreWriter()
    
            succeed_cnt = 0
            row_items = []
            cur_batch_size = 0
            futures = set()
            for idx, updateItem in enumerate(rows):
                row_items.append(updateItem)
                cur_batch_size = cur_batch_size + 1
                if cur_batch_size == TABLESTORE_BATCH_SIZE or idx == len(rows) - 1:
                    batch_write_req = tablestore.BatchWriteRowRequest()
                    batch_write_req.add(tablestore.TableInBatchWriteRowItem(ots_writer.table_name, list(row_items)))
                    row_items.clear()
                    futures.add(
                        thread_executor.submit(TablestoreWriter.single_batch_update, batch_write_req, cur_batch_size))
                    cur_batch_size = 0
            for future in futures:
                succeed_cnt += future.result()
            return succeed_cnt
    
        @staticmethod
        def single_batch_update(batch_write_req, cur_batch_size):
            succeed_cnt = 0
            try:
                batch_write_row_response = ots_writer._tablestore_client.batch_write_row(batch_write_req)
                if not batch_write_row_response.is_all_succeed():
                    succeed_cnt += len(batch_write_row_response.get_succeed_of_update())
                    succeed_cnt += len(batch_write_row_response.get_succeed_of_put())
                    # Update row item.
                    failed_update = batch_write_row_response.get_failed_of_update()
                    for row_item in failed_update:
                        if not row_item.is_ok and row_item.error_code != "OTSConditionCheckFail":
                            logger.warning("Failed to update single row")
                            single_update_item = batch_write_req.items[ots_writer.table_name].row_items[row_item.index]
                            if ots_writer.single_update(single_update_item):
                                succeed_cnt += 1
    
                    # Put row item.
                    failed_put = batch_write_row_response.get_failed_of_put()
                    for row_item in failed_put:
                        if not row_item.is_ok:
                            logger.warning("Failed to put single row")
                            single_put_item = batch_write_req.items[ots_writer.table_name].row_items[row_item.index]
                            if ots_writer.single_put(single_put_item):
                                succeed_cnt += 1
                else:
                    succeed_cnt += cur_batch_size
            except Exception as e:
                print("batch update failed: ", e)
            return succeed_cnt
    
        @staticmethod
        def create_put_row_item(pk: list[tuple], columns: list[tuple],
                                column_conditions: list[tablestore.ColumnCondition]) -> PutRowItem:
            global ots_writer
            if ots_writer is None:
                ots_writer = TablestoreWriter()
            if pk is None or len(pk) != 1:
                return None
            row = tablestore.Row(pk, columns)
            return tablestore.PutRowItem(row,
                                         tablestore.Condition(ots_writer.row_exist_exception_str, column_conditions[0]))
    
        @staticmethod
        def create_update_row_item(pk: list[tuple], columns: list[tuple],
                                   column_conditions: list[tablestore.ColumnCondition]) -> tablestore.UpdateRowItem:
            global ots_writer
            if ots_writer is None:
                ots_writer = TablestoreWriter()
            if pk is None:
                return None
            row = tablestore.Row(pk, {"update": columns})
            if column_conditions is None or len(column_conditions) == 0:
                return tablestore.UpdateRowItem(row, None)
            elif len(column_conditions) == 1:
                return tablestore.UpdateRowItem(row, tablestore.Condition(ots_writer.row_exist_exception_str,
                                                                          column_conditions[0]))
            else:
                composite_condition = tablestore.CompositeColumnCondition(tablestore.LogicalOperator.AND)
                for condition in column_conditions:
                    composite_condition.add_sub_condition(condition)
                row_conditions = tablestore.Condition(ots_writer.row_exist_exception_str, composite_condition)
                return tablestore.UpdateRowItem(row, row_conditions)
    
        def extract_pk(self, pk_fields, _dict) -> list[tuple]:
            ots_pk_fields = []
            for field_key, field_value in pk_fields.items():
                ots_pk_field = TablestoreWriter.get_ots_update_field(_dict, field_key, field_value)
                ots_pk_fields.append(ots_pk_field)
            return ots_pk_fields
    
        @staticmethod
        def less_than_condition(column_name: str, time_stamp: int) -> tablestore.SingleColumnCondition:
            return tablestore.SingleColumnCondition(column_name=column_name, column_value=time_stamp,
                                                    comparator=tablestore.ComparatorType.LESS_THAN)
    
        @staticmethod
        def upload_dir_to_ots(new_dir_path):
            """
            Uploads all `.ots` files from a directory to Tablestore and deletes them upon completion.
            Data is read from the files and written to Tablestore in batches.
            """
            global ots_writer
            if ots_writer is None:
                ots_writer = TablestoreWriter()
            with ProcessPoolExecutor(max_workers=MAX_OTS_FUTURES) as executor:
                start_time = time.time()
                # Record the start time.
                for file in os.listdir(new_dir_path):
                    if file.endswith(".ots"):
                        file_path = os.path.join(new_dir_path, file)
                        TablestoreWriter.upload_csv_to_ots(executor, file_path)
                        # Delete the file after it has been uploaded.
                        os.remove(file_path)
                        logger.info(f"Deleted file: {file_path}. Time elapsed: {time.time() - start_time} seconds")
                logger.info(f"Tablestore upload complete. Time elapsed: {time.time() - start_time} seconds")
    
        @staticmethod
        def upload_csv_to_ots(executor, file_path):
            global ots_writer
            logger.info(f"Starting to upload file: {file_path}")
            start_time = time.time()
            if ots_writer is None:
                ots_writer = TablestoreWriter()
            fields = read_from_json('fields', OSS_CONFIG_FILE_NAME)
            fields_value_to_key = {value: key for key, value in fields.items()}
            pk_fields = read_from_json('pkFields', OSS_CONFIG_FILE_NAME)
            with open(file_path, 'r', newline='', encoding='utf-8') as f:
                future_set = set()
                timestamp_ms = int(time.time() * 1000)
                reader = csv.reader(f)
                _ = next(reader)  # Skip the header row.
                rows = []
                for idx, row in enumerate(reader):
                    if len(row) != 6:
                        logger.warning(f"Skipping invalid row {idx + 1}: incorrect number of fields. Row: {row}")
                        continue
                    path, size, inode, atime_ms, mtime_ms, ctime_ms = row
                    try:
                        _dict = {
                            'path': path,
                            'size': int(size),
                            'inode': int(inode),
                            'atime_ms': int(atime_ms),
                            'mtime_ms': int(mtime_ms),
                            'ctime_ms': int(ctime_ms),
                            'wtime_ms': int(timestamp_ms)
                        }
                    except ValueError as e:
                        logger.error(f"Value conversion failed at line {idx + 1}: {e}")
                        continue
    
                    # Construct the primary key (assetId, fileName).
                    pk = ots_writer.extract_pk(pk_fields, _dict)
                    if not pk:
                        logger.warning(f"Failed to extract primary key from _dict: {_dict}")
                        continue
                    if ots_writer.column_condition == IGNORE:
                        conditions = [None]
                    elif ots_writer.column_condition == NEW_ATIME:
                        conditions = [TablestoreWriter.less_than_condition(fields_value_to_key['$atime_ms'],
                                                                           _dict['atime_ms'])]
                    else:
                        conditions = [None]
                    if ots_writer.ots_write_type == UPDATE:
                        # TODO: Consider making these conditions configurable in the future.
                        ots_fields = []
                        for field_key, field_value in fields.items():
                            ots_field = TablestoreWriter.get_ots_update_field(_dict, field_key, field_value)
                            if ots_field is not None:
                                ots_fields.append(ots_field)
                        update_row_item = TablestoreWriter.create_update_row_item(pk, ots_fields, conditions)
                        if update_row_item:
                            rows.append(update_row_item)
                    elif ots_writer.ots_write_type == PUT:
                        ots_fields = []
                        for field_key, field_value in fields.items():
                            ots_field = TablestoreWriter.get_ots_update_field(_dict, field_key, field_value)
                            if ots_field is not None:
                                ots_fields.append(ots_field)
                        put_row_item = TablestoreWriter.create_put_row_item(pk, ots_fields, conditions)
                        if put_row_item:
                            rows.append(put_row_item)
                    else:
                        logger.error(f"Unknown Tablestore write type: {ots_writer.ots_write_type}")
                        continue
    
                    # Submit a batch every 10 * TABLESTORE_BATCH_SIZE rows.
                    if len(rows) >= 10 * TABLESTORE_BATCH_SIZE:
                        future_set.add(executor.submit(TablestoreWriter.batch_write, list(rows)))
                        rows.clear()
    
                        # Limit the maximum number of in-flight futures.
                        if len(future_set) >= MAX_OTS_FUTURES:
                            done, future_set = concurrent.futures.wait(
                                future_set,
                                return_when=concurrent.futures.FIRST_COMPLETED
                            )
    
                            # Optional: Check for exceptions.
                            for future in done:
                                try:
                                    future.result()
                                except Exception as e:
                                    logger.error(f"Tablestore submission failed: {e}")
                # Submit the remaining data.
                if rows:
                    future_set.add(executor.submit(TablestoreWriter.batch_write, list(rows)))
                # Wait for all tasks to complete and handle exceptions.
                for future in as_completed(future_set):
                    try:
                        future.result()
                    except Exception as e:
                        logger.info(f"Batch failed with exception: {e}")
                logger.info(f"Tablestore submission complete. Time elapsed: {time.time() - start_time} seconds")
    
        @staticmethod
        def get_ots_update_field(_dict, field_key, field_value):
            global nas_client
            if field_value[0] != '$':
                return field_key, field_value
            else:
                if field_value == "$dajiang_asset_Id":
                    # Split the path and find the matching asset ID.
                    path = _dict['path']
                    parts = path.strip('/').split('/')
                    for i in range(len(parts) - 1):
                        # Check if the current part is 10 digits long and the previous part is its first 3 digits.
                        if len(parts[i + 1]) == 10 and parts[i] == parts[i + 1][:3]:
                            return field_key, parts[i + 1]
                    return None
                if field_value == "$dajiang_file_name":
                    # Split the path and find the matching asset ID.
                    path = _dict['path']
                    parts = path.strip('/').split('/')
                    for i in range(len(parts) - 1):
                        # Check if the current part is 10 digits long and the previous part is its first 3 digits.
                        if len(parts[i + 1]) == 10 and parts[i] == parts[i + 1][:3]:
                            return field_key, '/'.join(parts[i + 1:])
                if field_value in ["$fset_id", "$fset_name", "$fset_file_count_quota", "$fset_size_quota"]:
                    # Get the fileset information for the file.
                    global nas_client
                    if nas_client is None:
                        logger.warning("NAS client is not initialized. Cannot retrieve fileset information.")
                        return None
                    
                    # Get the fsid from the configuration.
                    fsid = read_from_json("fsid", OSS_CONFIG_FILE_NAME, None)
                    if not fsid:
                        logger.warning("Cannot get fsid from the configuration.")
                        return None
                    
                    # Ensure the fsid is complete (includes a prefix).
                    if not fsid.startswith('bmcpfs-') and not fsid.startswith('cpfs-'):
                        full_fsid = f"bmcpfs-{fsid}"
                    else:
                        full_fsid = fsid
                    
                    # Get the file path and find the corresponding fileset information.
                    file_path = _dict.get('path', '')
                    fset_info = nas_client.get_fset_info_by_path(full_fsid, file_path)
                    
                    if fset_info:
                        # Return the value based on the field type.
                        if field_value == "$fset_id":
                            return field_key, fset_info['fset_id']
                        elif field_value == "$fset_name":
                            return field_key, fset_info['description']
                        elif field_value == "$fset_file_count_quota":
                            v = fset_info['file_count_limit']
                            return (field_key, v) if v is not None else None
                        elif field_value == "$fset_size_quota":
                            v = fset_info['size_limit']
                            return (field_key, v) if v is not None else None
                    else:
                        logger.debug(f"Cannot find fileset information for the path {file_path}")
                        return None
                return field_key, _dict.get(field_value[1:], field_value)
    
    
    class NasClient:
        """NAS OpenAPI client for retrieving fileset information."""
        def __init__(self):
            self.client = None
            # Cache for fileset information: {fsid: [{fset_id, file_system_path, description, quota}, ...]}
            self.fileset_cache = {}
            self.last_cache_time = {}  # Records the cache time for each fsid.
            self.cache_ttl = 3600  # Cache TTL: 1 hour.
    
        def init_client(self, access_key_id: str, access_key_secret: str, endpoint: str):
            """Initializes the NAS client."""
            if self.client is not None:
                return
    
            try:
                credentials_config = CredentialConfig(
                    type='access_key',
                    access_key_id=access_key_id,
                    access_key_secret=access_key_secret
                )
                credentials_client = CredentialClient(credentials_config)
    
                config = open_api_models.Config(
                    credential=credentials_client,
                    endpoint=endpoint
                )
                self.client = NAS20170626Client(config)
                logger.info("NAS client initialized successfully.")
            except Exception as e:
                logger.error(f"Failed to initialize NAS client: {e}")
                raise RuntimeError(f"Cannot initialize NAS client: {e}")
    
        def get_filesets(self, fsid: str) -> Optional[list]:
            """Gets all fileset information for a specified file system, using a cache."""
            current_time = time.time()
    
            # Check if the cache exists and has not expired.
            if fsid in self.fileset_cache and fsid in self.last_cache_time:
                if current_time - self.last_cache_time[fsid] < self.cache_ttl:
                    logger.debug(f"Using cached fileset information for: {fsid}")
                    return self.fileset_cache[fsid]
    
            # Call the API to get fileset information.
            try:
                request = nas20170626_models.DescribeFilesetsRequest()
                request.file_system_id = fsid
                runtime = util_models.RuntimeOptions()
    
                resp = self.client.describe_filesets_with_options(request, runtime)
    
                if resp.body and resp.body.entries and resp.body.entries.entrie:
                    filesets = []
                    for entry in resp.body.entries.entrie:
                        if entry.fset_id and entry.file_system_path:
                            # Extract quota information (can be empty).
                            file_count_limit = None
                            size_limit = None
                            if entry.quota:
                                file_count_limit = entry.quota.file_count_limit
                                size_limit = entry.quota.size_limit
                            
                            filesets.append({
                                'fset_id': entry.fset_id,
                                'file_system_path': entry.file_system_path,
                                'description': entry.description,
                                'file_count_limit': file_count_limit,
                                'size_limit': size_limit
                            })
    
                    # Update the cache.
                    self.fileset_cache[fsid] = filesets
                    self.last_cache_time[fsid] = current_time
                    logger.info(f"Successfully retrieved {len(filesets)} filesets for {fsid}")
                    return filesets
                else:
                    logger.warning(f"No filesets found for file system {fsid}")
                    return []
    
            except Exception as e:
                logger.error(f"Failed to get fileset information: {e}")
                return None
    
        def get_fset_info_by_path(self, fsid: str, file_path: str) -> Optional[dict]:
            """Gets the complete fileset information based on a file path."""
            filesets = self.get_filesets(fsid)
            if filesets is None:
                return None
    
            # Find the best-matching fileset (longest path match).
            matched_fset = None
            matched_path_length = -1
    
            for fset in filesets:
                fs_path = fset['file_system_path']
                # Make sure that the file system path ends with a forward slash (/) for easier matching.
                if not fs_path.endswith('/'):
                    fs_path += '/'
    
                # Check if the file path starts with the file system path.
                if file_path.startswith(fs_path):
                    # Select the longest match (most specific match).
                    if len(fs_path) > matched_path_length:
                        matched_path_length = len(fs_path)
                        matched_fset = fset
    
            return matched_fset
    
    oss_tool: OssTool = None
    ots_writer: TablestoreWriter = None
    nas_client: NasClient = None
    
    
    def run_bmstat_for_dir(mode, fsid):
        input_dir = mode.input_dir
        if not check_path_exists_with_sudo(input_dir):
            logger.error(f"Input directory {input_dir} does not exist")
            return
        temp_output_dir = f"{OUTPUT_DIR_PREFIX}{fsid}/"
        os.makedirs(temp_output_dir, exist_ok=True)
        # If input_dir matches /cpfs/{fsid}/.
        if input_dir == f"/cpfs/{fsid}/":
            exclude_dirs = [f"{input_dir}{d}" for d in SYSTEM_DIRS]
        else:
            exclude_dirs = []
        exclude_dirs.extend(mode.exclude_dir_set)
    
        # Construct the subcommand.
        command = [
            "sudo",
            PYTHON_PATH, "nas_stat_util.py",
            "-t", "inode_list",
            "-i", input_dir,
            "--backup-count", "0",
            "--output-format", "path,size,inode,atime_ms,mtime_ms,ctime_ms",
            "--filter-inode-type", "regular_file",
            "--output-without-prefix", BASE_PATH[0:len(BASE_PATH) - 1],
            "--use-csv-format"
        ]
    
        if exclude_dirs:
            command.extend(["--exclude-full-path-dirs", ",".join(exclude_dirs)])
        if mode.ots_output_path_set:
            command.extend(["--ots-output-path", ",".join(mode.ots_output_path_set)])
        if mode.output_path_set:
            command.extend(["-o", ",".join([f"{temp_output_dir}{d}" for d in mode.output_path_set])])
    
        logger.info(f"Running command: {' '.join(command)}")
        try:
            result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            logger.info(f"Finished {mode.input_dir}: {result.stdout.decode()}")
        except subprocess.CalledProcessError as e:
            logger.error(f"Error running command for {mode.input_dir}: {e.stderr.decode()}")
    
    
    def update_crontab(new_cron_expr):
        """
        Updates the crontab entry for the script's 'run' command.
        It replaces the schedule for an existing entry or adds a new one if it does not exist.
        """
        try:
            # Get the current crontab content.
            result = subprocess.run(['crontab', '-l'], capture_output=True, text=True)
            if result.returncode not in (0, 1):  # A return code of 1 is normal for an empty file.
                logger.error(f"Failed to read crontab: {result.stderr}")
                return
    
            lines = result.stdout.splitlines()
            updated = False
            new_lines = []
    
            pattern = re.compile(
                r'.*{PYTHON_FILE_PATH}\s+--type\s+run.*'.format(PYTHON_FILE_PATH=re.escape(PYTHON_FILE_PATH)))
    
            for line in lines:
                if pattern.search(line):
                    # If a match is found, replace the first 5 fields (the cron expression).
                    parts = line.strip().split(maxsplit=5)
                    if len(parts) >= 6:
                        command_part = parts[5]
                        new_line = f"{new_cron_expr} {command_part}"
                        new_lines.append(new_line)
                        updated = True
                    else:
                        new_lines.append(line)
                else:
                    new_lines.append(line)
            # If no match is found, add a new line.
            if not updated:
                new_cron_line = f"{new_cron_expr} {PYTHON_PATH} {PYTHON_FILE_PATH} --type run"
                new_lines.append(new_cron_line)
                logger.info("No matching line found. A new cron job has been added.")
    
            # Write back the new crontab.
            new_crontab = '\n'.join(new_lines) + '\n'
            subprocess.run(['crontab', '-'], input=new_crontab, text=True, check=True)
            logger.info("Crontab updated.")
    
        except Exception as e:
            logger.error(f"Failed to update crontab: {e}")
    
    
    def remove_crontab_entry():
        """
        Removes the crontab entry for the script's 'run' command.
        """
        try:
            result = subprocess.run(['crontab', '-l'], capture_output=True, text=True)
            if result.returncode not in (0, 1):  # A return code of 1 is normal for an empty file.
                logger.error(f"Failed to read crontab: {result.stderr}")
                return
    
            lines = result.stdout.splitlines()
            new_lines = []
    
            for line in lines:
                # Match the line containing PYTHON_FILE_PATH and --type run.
                if re.search(r'.*{PYTHON_FILE_PATH}\s+--type\s+run.*'.format(PYTHON_FILE_PATH=PYTHON_FILE_PATH), line):
                    logger.info(f"Removing cron line: {line}")
                    continue
                new_lines.append(line)
    
            # Write back the new crontab.
            new_crontab = '\n'.join(new_lines) + '\n'
            subprocess.run(['crontab', '-'], input=new_crontab, text=True, check=True)
            logger.info("Crontab has been cleaned up.")
    
        except Exception as e:
            logger.error(f"Failed to delete crontab entry: {e}")
    
    
    def download_only():
        """
        Performs the download-only operation. It checks for and downloads an updated
        configuration file from OSS. Typically scheduled to run frequently.
        """
        download_success = oss_tool.download_file_if_newer()
        if download_success:
            file_path = os.path.join(SCRIPT_DIR, os.path.basename(read_from_json('OSS_KEY', '.meta_stat_key.json')))
            meta_stat_config = parse_config(file_path)
            schedule = meta_stat_config.get('schedule')
            if schedule['type'] == 'cron':
                cron_expr = schedule.get('cron')
                if cron_expr:
                    update_crontab(cron_expr)
                else:
                    logger.info("Missing cron expression. Cannot update the task.")
                    return
            else:
                # Remove the cron job entry.
                remove_crontab_entry()
    
            if schedule['type'] == 'once' or schedule['type'] == 'one':
                # Schedule the task directly.
                run_only()
                return
    
    
    def compress_directory(new_dir_name, output_dir):
        """
        Compresses a directory into a .tar.gz archive using pigz.
    
        :param new_dir_name: The name of the directory to compress.
        :param output_dir: The output directory.
        :return: The path of the generated compressed file.
        """
    
        base_name = os.path.basename(new_dir_name)
        tar_path = os.path.join(output_dir, f"{base_name}.tar")
        gz_path = f"{tar_path}.gz"
    
        # Step 1: Create the .tar file.
        with tarfile.open(tar_path, "w") as tar:
            tar.add(new_dir_name, arcname=base_name)
    
        # Step 2: Compress the .tar file by using pigz.
        try:
            subprocess.run(['pigz', '-f', '-v', tar_path], check=True)
        except subprocess.CalledProcessError as e:
            logger.error(f"Compression failed: {e}")
            return None
    
        # Step 3: Delete the original directory and .tar file.
        if os.path.exists(tar_path):
            os.remove(tar_path)
            logger.info(f"Deleted the original .tar file: {tar_path}")
        if os.path.exists(new_dir_name):
            shutil.rmtree(new_dir_name)
            logger.info(f"Deleted the original directory: {new_dir_name}")
    
        logger.info(f"Directory compression complete: {gz_path}")
        return gz_path
    
    
    def run_only():
        """
        Runs the main metadata scanning and uploading process. This is the core
        'run' operation, which can be triggered directly or by a scheduled cron job.
        """
        global ots_writer
        lock_file = os.path.join(SCRIPT_DIR, '.meta_stat.lock')
        try:
            lock_fd = os.open(lock_file, os.O_CREAT | os.O_RDWR)
            fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except (IOError, OSError):
            logger.info("Another instance is already running. Exiting.")
            exit(1)
    
        begin_date = datetime.now().strftime("%Y%m%dT%H%M")
        file_path = os.path.join(SCRIPT_DIR, os.path.basename(read_from_json('OSS_KEY', '.meta_stat_key.json')))
        meta_stat_config = parse_config(file_path)
        # Get the directory output modes.
        dir_output_modes, ots_file_name_to_input_dir_dict = determine_output_modes(meta_stat_config)
        fsid = read_from_json("fsid", OSS_CONFIG_FILE_NAME, 'fsid')
        if fsid.startswith('bmcpfs-'):
            fsid = fsid[7:]
        elif fsid.startswith('cpfs-'):
            fsid = fsid[5:]
        for dir_path in dir_output_modes:
            mode = dir_output_modes[dir_path]
            run_bmstat_for_dir(mode, fsid)
        end_date = datetime.now().strftime("%Y%m%dT%H%M")
        new_dir_name = f"beg_{begin_date}_end_{end_date}_{fsid}"
        new_dir_path = f"{OUTPUT_DIR_PREFIX}{new_dir_name}"
        temp_output_dir = f"{OUTPUT_DIR_PREFIX}{fsid}/"
        os.rename(temp_output_dir, new_dir_path)
        # Upload and delete the local files.
        TablestoreWriter.upload_dir_to_ots(new_dir_path)
        # Upload to OSS.
        if meta_stat_config['scan_to_files']:
            oss_upload_type = read_from_json("ossUploadType", OSS_CONFIG_FILE_NAME, 'gz')  # csv
            if oss_upload_type == 'csv':
                oss_tool.upload_dir_to_oss(new_dir_path, meta_stat_config['oss_path'])
            local_file_path = compress_directory(new_dir_path, OUTPUT_DIR_PREFIX)
            if oss_upload_type == 'gz':
                oss_tool.upload_file_to_oss(local_file_path,
                                            os.path.join(meta_stat_config['oss_path'], f"{new_dir_name}.tar.gz"))
            # Keep only the latest .tar.gz file locally and delete the others.
            for file in os.listdir(OUTPUT_DIR_PREFIX):
                if file.startswith(f"beg_") and file.endswith(".tar.gz") and file != f"{new_dir_name}.tar.gz":
                    os.remove(f"{OUTPUT_DIR_PREFIX}{file}")
    
    
    def upload_only():
        upload_to_tablestore_list = read_from_json('uploadToTablestore', OSS_CONFIG_FILE_NAME)
        for file_path in upload_to_tablestore_list:
            if not os.path.exists(file_path):
                logger.error(f"File does not exist: {file_path}")
                exit(1)
        with ProcessPoolExecutor(max_workers=MAX_OTS_FUTURES) as executor:
            for file_path in upload_to_tablestore_list:
                TablestoreWriter.upload_csv_to_ots(executor, file_path)
    
    
    def main():
        global oss_tool, nas_client
        # Create a command-line argument parser.
        parser = argparse.ArgumentParser(description="Download file from OSS with conditional check")
    
        # Add command-line arguments.
        parser.add_argument('--type', choices=['download', 'run', 'upload'], required=True,
                            help='Specify the operation type: `download` only checks and downloads the configuration; `run` executes the statistics task; `upload` executes the upload task.')
        # Parse the command-line arguments.
        args = parser.parse_args()
    
        oss_tool = OssTool()
    
        # Initialize the NAS client (if NAS-related parameters are configured).
        try:
            nas_access_key_id = read_from_json('nas_access_key_id', '.meta_stat_key.json', None)
            nas_access_key_secret = read_from_json('nas_access_key_secret', '.meta_stat_key.json', None)
            nas_endpoint = read_from_json('nas_endpoint', '.meta_stat_key.json', None)
    
            if nas_access_key_id and nas_access_key_secret and nas_endpoint:
                nas_client = NasClient()
                nas_client.init_client(nas_access_key_id, nas_access_key_secret, nas_endpoint)
                logger.info("NAS client initialized successfully.")
            else:
                logger.info("NAS-related parameters are not configured. The `$fset` field mapping will be unavailable.")
                nas_client = None
        except Exception as e:
            logger.warning(f"Failed to initialize the NAS client: {e}. The `$fset` field mapping will be unavailable.")
            nas_client = None
    
        if args.type == 'download':
            download_only()
        elif args.type == 'run':
            run_only()
        elif args.type == 'upload':
            upload_only()
    
    
    def determine_output_modes(meta_stat_config):
        scan_to_files = set(meta_stat_config['scan_to_files'])
        scan_to_tablestore = set(meta_stat_config['scan_to_tablestore'])
        all_directories = set(scan_to_files).union(set(scan_to_tablestore))
        dir_output_modes = OrderedDict()
        ots_file_name_to_input_dir_dict = {}
    
        for dir_path in sorted(all_directories):
            output_mode = DirectoryOutputMode(input_dir=dir_path)
            dir_output_modes[dir_path] = output_mode
        # First, determine the required output for each task.
        for _item in scan_to_files:
            output_file_name = _item
            if output_file_name.startswith('/'):
                output_file_name = output_file_name[1:]
            if output_file_name.endswith('/'):
                output_file_name = output_file_name[:-1]
            output_file_name = output_file_name.replace('/', '_') + ".csv"
            dir_output_modes[_item].output_path_set.add(output_file_name)
        for _item in scan_to_tablestore:
            output_file_name = _item
            if output_file_name.startswith('/'):
                output_file_name = output_file_name[1:]
            if output_file_name.endswith('/'):
                output_file_name = output_file_name[:-1]
            output_file_name = output_file_name.replace('/', '_') + ".ots"
            dir_output_modes[_item].output_path_set.add(output_file_name)
            ots_file_name_to_input_dir_dict[output_file_name] = _item
        # If a directory `b` is a subdirectory of an existing task `a`, add `b` to `a`'s exclude list and add `a`'s output targets to `b`'s.
        for _item_b in dir_output_modes:
            for _item_a in dir_output_modes:
                if _item_b.startswith(_item_a) and _item_b != _item_a:
                    dir_output_modes[_item_a].exclude_dir_set.add(_item_b)
                    dir_output_modes[_item_b].output_path_set.update(dir_output_modes[_item_a].output_path_set)
    
        return dir_output_modes, ots_file_name_to_input_dir_dict
    
    
    class DirectoryOutputMode:
        def __init__(self, input_dir: str, exclude_dir_list=None, output_path_set=None, ots_output_path_set=None):
            self.input_dir = input_dir
            if exclude_dir_list is None:
                exclude_dir_list = set()
            self.exclude_dir_set = exclude_dir_list
            if output_path_set is None:
                output_path_set = set()
            self.output_path_set = output_path_set
            if ots_output_path_set is None:
                ots_output_path_set = set()
            self.ots_output_path_set = ots_output_path_set
    
        def __repr__(self):
            return f"DirectoryOutputMode(input_dir={self.input_dir}, exclude_dir_list={self.exclude_dir_set}, output_path_set={self.output_path_set}, ots_output_path_set={self.ots_output_path_set})"
    
    
    def is_valid_cron(cron_str: str) -> bool:
        """
        Checks if the input string is a valid cron expression (5 fields).
        Supports *, /, -, and number combinations. Does not validate the value range.
        """
        cron_pattern = r'^(\s*(\*|[\d\-\*/]+)(,\s*(\*|[\d\-\*/]+))*){5}$'
        return bool(re.match(cron_pattern, cron_str.strip()))
    
    
    def check_path_exists_with_sudo(path):
        try:
            result = subprocess.run(
                ['sudo', 'test', '-e', path],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                timeout=5
            )
            return result.returncode == 0
        except Exception as e:
            print(f"Error checking path: {e}")
            return False
    
    
    def parse_config(config_path: str) -> Dict:
        """
        Parses the configuration file and returns structured data.
        :param config_path: The path to the JSON configuration file.
        :return: A dictionary containing the configuration fields.
        """
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                config = json.load(f)
    
            # Extract fields.
            fsid = config.get("fsid")
            scan_to_tablestore = config.get("scanToTablestore", [])
            ots_config = config.get("otsConfig", {})
            fields = config.get("fields", {})
            pk_fields = config.get("pkFields", {})
            scan_to_files = config.get("scanToFiles", [])
            oss_path = config.get("ossPath")
            schedule = config.get("schedule", {})
    
            if scan_to_files is None:
                scan_to_files = []
            if oss_path is None:
                oss_path = ""
            if oss_path.startswith('/'):
                oss_path = oss_path[1:]
    
            # Basic validation.
            if not fsid or not isinstance(fsid, str):
                raise ValueError("Missing a valid fsid field")
            # The fsid can contain only alphanumeric characters.
            if fsid.startswith('bmcpfs-'):
                fsid = fsid[7:]
            elif fsid.startswith('cpfs-'):
                fsid = fsid[5:]
            if not fsid.isalnum():
                raise ValueError("fsid can contain only letters and digits")
    
            if not isinstance(scan_to_files, list):
                raise ValueError("scanToFiles must be a list")
            # Update scan_to_files.
            concat_paths(fsid, scan_to_files)
    
            # Check if the path exists.
            for path in scan_to_files:
                if not check_path_exists_with_sudo(path):
                    raise ValueError(f"Path {path} does not exist")
    
            if not isinstance(scan_to_tablestore, list):
                raise ValueError("scanToTablestore must be a list")
    
            concat_paths(fsid, scan_to_tablestore)
    
            for path in scan_to_tablestore:
                if not check_path_exists_with_sudo(path):
                    raise ValueError(f"Path {path} does not exist")
    
            if not isinstance(ots_config, dict):
                raise ValueError("otsConfig must be a dictionary")
    
            # Validate ots_config.
            for key in ['instanceName', 'tableName', 'endpoint', 'rowExistenceException', 'columnCondition', 'writeType']:
                if key not in ots_config:
                    raise ValueError(f"otsConfig is missing the {key} field")
                if not ots_config[key]:
                    raise ValueError(f"The {key} field in otsConfig cannot be empty")
                if key == 'rowExistenceException':
                    if ots_config[key] not in [tablestore.RowExistenceExpectation.IGNORE,
                                               tablestore.RowExistenceExpectation.EXPECT_EXIST]:
                        raise ValueError(f"The {key} field in otsConfig must be IGNORE or EXPECT_EXIST")
                if key == 'columnCondition':
                    if ots_config[key] not in [IGNORE, NEW_ATIME]:
                        raise ValueError(f"The {key} field in otsConfig must be IGNORE or NEW_ATIME")
                if key == 'writeType':
                    if ots_config[key] not in AVAILABLE_OTS_WRITE_TYPE:
                        raise ValueError(f"The {key} field in otsConfig must be PUT or UPDATE")
    
            if not isinstance(fields, dict):
                raise ValueError("fields must be a dictionary")
    
            if not isinstance(pk_fields, dict):
                raise ValueError("pkFields must be a dictionary")
    
            if not isinstance(schedule, dict):
                raise ValueError("schedule must be a dictionary")
    
            if schedule['type'] == 'cron':
                # Check if it is a valid cron expression.
                cron_expr = schedule.get('cron')
                if not cron_expr:
                    raise ValueError("The cron field cannot be empty")
                if not is_valid_cron(cron_expr):
                    raise ValueError(f"Invalid cron expression: {cron_expr}")
    
            if oss_path and not oss_path.endswith('/'):
                oss_path += '/'
    
            return {
                "fsid": fsid,
                "scan_to_files": scan_to_files,
                "scan_to_tablestore": scan_to_tablestore,
                "ots_config": ots_config,
                "fields": fields,
                "oss_path": oss_path,
                "schedule": schedule,
            }
    
        except FileNotFoundError:
            raise FileNotFoundError(f"Configuration file not found: {config_path}")
        except json.JSONDecodeError:
            raise ValueError(f"Invalid configuration file format (not valid JSON): {config_path}")
        except Exception as e:
            raise RuntimeError(f"Error parsing the configuration: {e}")
    
    
    def concat_paths(fsid, paths):
        for i, path in enumerate(paths):
            concat_path = BASE_PATH
            if not concat_path.endswith('/'):
                concat_path += '/'
            if path.startswith('/'):
                path = path[1:]
            concat_path += path
            if not concat_path.endswith('/'):
                concat_path += '/'
            paths[i] = concat_path
    
    
    if __name__ == '__main__':
        main()
    
  2. Create a nas_stat_util.py file with the following content.

    # coding=utf-8
    import argparse
    import concurrent
    import csv
    import io
    import os
    import stat
    import sys
    import threading
    import time
    from concurrent import futures
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    from logging.handlers import RotatingFileHandler
    from multiprocessing import Manager
    from queue import Queue
    from stat import S_ISREG, S_ISDIR
    
    TOOL_VERSION = "1.3"
    INODE_LIST = 'inode_list'
    DIR_TREE = 'dir_tree'
    ALL = 'all'
    REGULAR_FILE = 'regular_file'
    DIR = 'dir'
    VALID_TYPES = [INODE_LIST, DIR_TREE]
    VALID_FILTER_INODE_TYPES = [ALL, REGULAR_FILE, DIR]
    BATCH_LINES = 10000
    thread_pool = None
    
    
    class DirectoryStats:
        def __init__(self, args, _stat):
            self.item_count = 0
            self.item_size = 0
            self.skipped_item_count = 0
            if args is not None:
                if _stat is not None:
                    if args.use_atime:
                        self.atime = _stat.st_atime
                    if args.use_mtime:
                        self.mtime = _stat.st_mtime
                    if args.use_ctime:
                        self.ctime = _stat.st_ctime
                    if args.use_inode:
                        self.inode = int(_stat.st_ino)
                    if args.use_uid:
                        self.uid = _stat.st_uid
                    if args.use_gid:
                        self.gid = _stat.st_gid
                if args.use_all_inode_num:
                    self.all_inode_num = 0
                if args.use_all_size:
                    self.all_size = 0
    
        def add_item_count(self, size):
            self.item_count += size
    
        def add_item_size(self, size):
            self.item_size += size
    
        def add_skipped_item_count(self, size):
            self.skipped_item_count += size
    
        def add_all_inode_num(self, size):
            if self.all_inode_num is None:
                self.all_inode_num = 0
            self.all_inode_num += size
    
        def add_all_size(self, size):
            if self.all_size is None:
                self.all_size = 0
            self.all_size += size
    
        def __repr__(self):
            return (f"DirectoryStats(item_count={self.item_count}, "
                    f"item_size={self.item_size}, skipped_item_count={self.skipped_item_count})")
    
    
    class DirectoryStatMap:
        def __init__(self):
            self.directory_map = {}
            self.inode_num = 0
            self.wait_walk_dir_list = []
    
        def get_or_create(self, directory, args, _stat):
            if directory not in self.directory_map:
                if _stat is None:
                    try:
                        _stat = os.lstat(directory)
                    except:
                        pass
                self.directory_map[directory] = DirectoryStats(args, _stat)
            return self.directory_map[directory]
    
        def add_inode_num(self, num):
            self.inode_num += num
    
        def add_wait_walk_dir(self, directory):
            self.wait_walk_dir_list.append(directory)
    
    
    def output_timestamp(args, timestamp, with_ms=False):
        if args.human_readable:
            return time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime(timestamp))
        if with_ms:
            return int(timestamp * 1000)
        return int(timestamp)
    
    
    def output_bytes(args, num_bytes):
        if args.human_readable:
            for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
                if abs(num_bytes) < 1024.0:
                    return f"{num_bytes:3.1f}{unit}"
                num_bytes /= 1024.0
            return f"{num_bytes:.1f}YB"
        return num_bytes
    
    
    def to_csv_format(text):
        output = io.StringIO()
        writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL, lineterminator='')
        writer.writerow([text])
        return output.getvalue()
    
    
    def format_output_line(args, item_path, _stat=None, dir_stat=None):
        try:
            output_parts = []
            for field in args.output_format_list:
                try:
                    if args.stat_type == INODE_LIST:
                        if field == 'path':
                            if args.output_without_prefix:
                                raw_path = item_path.replace(args.output_without_prefix, '', 1)
                            else:
                                raw_path = item_path
                            if args.use_csv_format:
                                result_path = to_csv_format(raw_path)
                            else:
                                result_path = raw_path
                            output_parts.append(result_path)
                        elif field == 'size':
                            output_parts.append(
                                output_bytes(args,
                                             _stat.st_size if args.use_file_size else _stat.st_blocks * args.block_size))
                        elif field == 'raw_size':
                            output_parts.append(_stat.st_size if args.use_file_size else _stat.st_blocks * args.block_size)
                        elif field == 'inode':
                            output_parts.append(_stat.st_ino)
                        elif field == 'atime':
                            output_parts.append(output_timestamp(args, _stat.st_atime))
                        elif field == 'mtime':
                            output_parts.append(output_timestamp(args, _stat.st_mtime))
                        elif field == 'ctime':
                            output_parts.append(output_timestamp(args, _stat.st_ctime))
                        elif field == 'atime_ms':
                            output_parts.append(output_timestamp(args, _stat.st_atime, True))
                        elif field == 'mtime_ms':
                            output_parts.append(output_timestamp(args, _stat.st_mtime, True))
                        elif field == 'ctime_ms':
                            output_parts.append(output_timestamp(args, _stat.st_ctime, True))
                        elif field == 'uid':
                            output_parts.append(_stat.st_uid)
                        elif field == 'gid':
                            output_parts.append(_stat.st_gid)
                    elif args.stat_type == DIR_TREE:
                        if field == 'path':
                            if args.output_without_prefix:
                                raw_path = item_path.replace(args.output_without_prefix, '', 1)
                            else:
                                raw_path = item_path
                            if args.use_csv_format:
                                result_path = to_csv_format(raw_path)
                            else:
                                result_path = raw_path
                            output_parts.append(result_path)
                        elif field == 'inode_num':
                            output_parts.append(dir_stat.item_count)
                        elif field == 'size':
                            output_parts.append(output_bytes(args, dir_stat.item_size if (
                                args.use_file_size) else dir_stat.item_size * args.block_size))
                        elif field == 'raw_size':
                            output_parts.append(
                                dir_stat.item_size if args.use_file_size else dir_stat.item_size * args.block_size)
                        elif field == 'skip_num':
                            output_parts.append(dir_stat.skipped_item_count)
                        elif field == 'inode':
                            output_parts.append(dir_stat.inode)
                        elif field == 'atime':
                            output_parts.append(output_timestamp(args, dir_stat.atime))
                        elif field == 'mtime':
                            output_parts.append(output_timestamp(args, dir_stat.mtime))
                        elif field == 'ctime':
                            output_parts.append(output_timestamp(args, dir_stat.ctime))
                        elif field == 'atime_ms':
                            output_parts.append(output_timestamp(args, dir_stat.atime, True))
                        elif field == 'mtime_ms':
                            output_parts.append(output_timestamp(args, dir_stat.mtime, True))
                        elif field == 'ctime_ms':
                            output_parts.append(output_timestamp(args, dir_stat.ctime, True))
                        elif field == 'uid':
                            output_parts.append(dir_stat.uid)
                        elif field == 'gid':
                            output_parts.append(dir_stat.gid)
                        elif field == 'all_inode_num':
                            output_parts.append(dir_stat.all_inode_num)
                        elif field == 'all_size':
                            output_parts.append(output_bytes(args, dir_stat.all_size if (
                                args.use_file_size) else dir_stat.all_size * args.block_size))
                except Exception as e:
                    print(f"Error formatting output field: {e}")
                    output_parts.append('')
            if output_parts:
                output_parts = [str(part) for part in output_parts]
                return ','.join(output_parts) + '\n'
            return ''
        except Exception as e:
            print(f"Error formatting output line: {e}")
            return ''
    
    
    def inode_stat_print(args, local_dir_stat_map, item_path=None, _stat=None, thread_local_output_lines=None):
        if thread_local_output_lines is None:
            thread_local_output_lines = []
        if args.stat_type == INODE_LIST:
            thread_local_output_lines.append(format_output_line(args, item_path, _stat=_stat))
            if len(thread_local_output_lines) >= BATCH_LINES:
                output_stat_lines(args, local_dir_stat_map, thread_local_output_lines)
    
    
    def dir_stat_map_print(args, dir_stat_map, output_handle, print_elapsed_time=False, clear_screen=False):
        try:
            if clear_screen:
                os.system('cls' if os.name == 'nt' else 'clear')
            function_local_output_lines = []
            for iter_dir_path, iter_dir_stat in sorted(dir_stat_map.directory_map.items()):
                function_local_output_lines.append(format_output_line(args, iter_dir_path, dir_stat=iter_dir_stat))
    
                if len(function_local_output_lines) >= BATCH_LINES:
                    with args.m_output_file_lock:
                        output_handle.writelines(function_local_output_lines)
                        function_local_output_lines.clear()
    
            with args.m_output_file_lock:
                output_handle.writelines(function_local_output_lines)
                function_local_output_lines.clear()
                if print_elapsed_time:
                    end_time = time.time()
                    elapsed_time = end_time - args.start_time
                    output_handle.write(f"Elapsed time: {elapsed_time} seconds\n")
        except Exception as e:
            print(f"Error printing directory statistics: {e}")
    
    
    def output_total_line_print(args, dir_stat_map, output_handle, print_elapsed_time=False, clear_screen=False):
        if clear_screen:
            os.system('cls' if os.name == 'nt' else 'clear')
        with args.m_output_file_lock:
            output_handle.write(f"Total lines output: {dir_stat_map.inode_num}\n")
            if print_elapsed_time:
                end_time = time.time()
                elapsed_time = end_time - args.start_time
                output_handle.write(f"Elapsed time: {elapsed_time} seconds\n")
    
    
    def stat_print_at_runtime(args, dir_stat_map, output_handle, print_elapsed_time=False):
        time.sleep(args.print_process_time)
        while not args.stat_finish:
            if args.stat_type == DIR_TREE:
                dir_stat_map_print(args, dir_stat_map, output_handle, print_elapsed_time, clear_screen=True)
            elif args.stat_type == INODE_LIST:
                output_total_line_print(args, dir_stat_map, output_handle, print_elapsed_time, clear_screen=True)
            else:
                break
            time.sleep(args.print_process_time)
    
    
    def filter_inode(args, _stat):
        if args.min_size and _stat.st_size < args.min_size:
            return False
        if args.max_size and _stat.st_size > args.max_size:
            return False
        if args.atime and _stat.st_atime > args.atime:
            return False
        if args.atime_after and _stat.st_atime < args.atime_after:
            return False
        if args.mtime and _stat.st_mtime > args.mtime:
            return False
        if args.mtime_after and _stat.st_mtime < args.mtime_after:
            return False
        if args.ctime and _stat.st_ctime > args.ctime:
            return False
        if args.ctime_after and _stat.st_ctime < args.ctime_after:
            return False
        if args.filter_inode_type != ALL:
            if args.filter_inode_type == REGULAR_FILE and not S_ISREG(_stat.st_mode):
                return False
            if args.filter_inode_type == DIR and not S_ISDIR(_stat.st_mode):
                return False
    
        return True
    
    
    class CustomArgumentParser(argparse.ArgumentParser):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
    
            self.common_group = self.add_argument_group('Common arguments')
            self.common_group.add_argument("-t", "--type", dest="stat_type", required=True, choices=VALID_TYPES,
                                           help="The type of statistics to generate. Supported types: " + ','.join(VALID_TYPES))
            self.common_group.add_argument("-i", "--input", dest="input", required=True,
                                           help="A comma-separated list of input directories to scan.")
            self.common_group.add_argument("--input-from-file", dest="input_from_file", action="store_true",
                                           help="Read input directories from a file, with one directory path per line.")
            self.common_group.add_argument("-o", "--output-path", dest="output_path",
                                           default="/tmp/nas_stat_util.output", help="The path to the output file. Multiple paths can be specified as a comma-separated list.")
            self.common_group.add_argument("--human", dest="human_readable", action="store_true",
                                           help="Print sizes in a human-readable format (e.g., 1K, 2M, 3G).")
            self.common_group.add_argument("--processes", dest="processes", type=int,
                                           default=10, help="The number of concurrent processes for scanning.")
            self.common_group.add_argument("--threads", dest="threads", type=int, default=10,
                                           help="The number of concurrent threads per process.")
            self.common_group.add_argument("--backup-count", dest="backup_count", type=int, default=9,
                                           help="The number of old output files to retain as backups.")
            self.common_group.add_argument("--print-process-time", dest="print_process_time", type=int, default=0,
                                           help="The interval in seconds to print progress. Set to 0 to disable.")
            self.common_group.add_argument("--exclude-dirs", dest="exclude_dirs", help="A comma-separated list of directory names to exclude.")
            self.common_group.add_argument("--exclude-full-path-dirs", dest="exclude_full_path_dirs",
                                           help="A comma-separated list of full directory paths to exclude.")
            self.common_group.add_argument("--output-format", dest="output_format",
                                           help=('Specifies the output fields as a comma-separated list. '
                                                 'Available fields for inode_list: path,size,raw_size,inode,atime,mtime,ctime,atime_ms,mtime_ms,ctime_ms,uid,gid (default: path,size). '
                                                 'Available fields for dir_tree: path,inode_num,size,raw_size,skip_num,inode,atime,mtime,ctime,atime_ms,mtime_ms,ctime_ms,uid,gid,all_inode_num,all_size (default: path,inode_num,size,skip_num).')
                                           )
            self.common_group.add_argument("--output-without-head", dest="output_without_header", action='store_true',
                                           help="Omit the header from the output file.")
            self.common_group.add_argument("--output-without-prefix", dest="output_without_prefix", default='',
                                           help="A prefix to remove from all output paths.")
            self.common_group.add_argument("--use-file-size", dest="use_file_size", action='store_true',
                                           help="Use file size for calculations instead of disk usage.")
            self.common_group.add_argument("--use-csv-format", dest="use_csv_format", action='store_true',
                                           help="Enclose the path field in quotes, following CSV format.")
            self.common_group.add_argument("--block-size", dest="block_size", type=int, default=512,
                                           help="The block size for disk usage calculations, in bytes. Default: 512.")
            self.common_group.add_argument("-s", "--min-size", dest="min_size", type=int,
                                           help="Minimum file size in bytes. Files smaller than this will be excluded.")
            self.common_group.add_argument("--max-size", dest="max_size", type=int,
                                           help="Maximum file size in bytes. Files larger than this will be excluded.")
            self.common_group.add_argument("-a", "--atime", dest="atime", type=int,
                                           help="Includes files accessed on or before this Unix timestamp.")
            self.common_group.add_argument("--atime-after", dest="atime_after", type=int,
                                           help="Includes files accessed on or after this Unix timestamp.")
            self.common_group.add_argument("-m", "--mtime", dest="mtime", type=int,
                                           help="Includes files modified on or before this Unix timestamp.")
            self.common_group.add_argument("--mtime-after", dest="mtime_after", type=int,
                                           help="Includes files modified on or after this Unix timestamp.")
            self.common_group.add_argument("-c", "--ctime", dest="ctime", type=int,
                                           help="Includes files with an inode change on or before this Unix timestamp.")
            self.common_group.add_argument("--ctime-after", dest="ctime_after", type=int,
                                           help="Includes files with an inode change on or after this Unix timestamp.")
            self.common_group.add_argument("--filter-inode-type", dest="filter_inode_type",
                                           choices=VALID_FILTER_INODE_TYPES, default=ALL,
                                           help=f"Filter by inode type. Supported: {VALID_FILTER_INODE_TYPES}. Default: all.")
    
            self.type_dir_group = self.add_argument_group('Directory Tree Type Arguments')
            self.type_dir_group.add_argument("-d", "--depth", dest="depth", type=int, default=2,
                                             help="The maximum directory depth to report in directory tree mode.")
    
            self.type_file_group = self.add_argument_group('Inode List Type Arguments')
    
    
    def add_dir_stat_map(args, sum_dir_stat_map, local_dir_stat_map):
        if sum_dir_stat_map is None or local_dir_stat_map is None:
            return
        if local_dir_stat_map.directory_map is not None:
            for iter_dir_path, iter_dir_stat in local_dir_stat_map.directory_map.items():
                if iter_dir_path not in sum_dir_stat_map.directory_map:
                    sum_dir_stat_map.directory_map[iter_dir_path] = iter_dir_stat
                else:
                    dir_stat = sum_dir_stat_map.directory_map[iter_dir_path]
                    dir_stat.add_item_count(iter_dir_stat.item_count)
                    dir_stat.add_item_size(iter_dir_stat.item_size)
                    dir_stat.add_skipped_item_count(iter_dir_stat.skipped_item_count)
                    if args.use_all_inode_num:
                        dir_stat.add_all_inode_num(iter_dir_stat.all_inode_num)
                    if args.use_all_size:
                        dir_stat.add_all_size(iter_dir_stat.all_size)
        if local_dir_stat_map.inode_num is not None:
            sum_dir_stat_map.add_inode_num(local_dir_stat_map.inode_num)
        if local_dir_stat_map.wait_walk_dir_list is not None:
            sum_dir_stat_map.wait_walk_dir_list.extend(local_dir_stat_map.wait_walk_dir_list)
    
    
    def stat_process(args, dir_list):
        global thread_pool
        if not thread_pool:
            thread_pool = ThreadPoolExecutor(max_workers=args.threads)
        args.thread_lock = threading.Lock()
        process_dir_stat_map = DirectoryStatMap()
        args.process_wait_queue_size = args.m_process_wait_queue.qsize()
        args.thread_future_set = set()
        args.thread_wait_queue = Queue()
        for dir_path in dir_list:
            future = thread_pool.submit(stat_walk, args, dir_path, True, None, time.perf_counter())
            args.thread_future_set.add(future)
        while len(args.thread_future_set) > 0 or not args.thread_wait_queue.empty():
            while not args.thread_wait_queue.empty():
                if args.m_process_wait_queue.qsize() < args.max_process_queue_size:
                    args.m_process_wait_queue.put(args.thread_wait_queue.get())
                elif len(args.thread_future_set) < args.max_thread_queue_size:
                    future = thread_pool.submit(stat_walk, args, args.thread_wait_queue.get(), True, None,
                                                time.perf_counter())
                    args.thread_future_set.add(future)
                else:
                    break
            args.process_wait_queue_size = args.m_process_wait_queue.qsize()
            try:
                for future in futures.as_completed(args.thread_future_set, timeout=1):
                    thread_dir_stat_map = future.result()
                    add_dir_stat_map(args, process_dir_stat_map, thread_dir_stat_map)
                    args.thread_future_set.remove(future)
            except concurrent.futures.TimeoutError:
                pass
            except Exception as e:
                print(f"Error in worker thread: {e}")
                args.thread_future_set.remove(future)
        return process_dir_stat_map
    
    
    def stat_walk(args, top, need_output, thread_local_output_lines, start_time):
        local_dir_stat_map = DirectoryStatMap()
        # Each thread maintains its own output list.
        if thread_local_output_lines is None:
            thread_local_output_lines = []
        try:
            try:
                scandir_it = os.scandir(top)
            except OSError as error:
                add_skip_to_dir_stat_map(args, local_dir_stat_map, error, top)
                return local_dir_stat_map
    
            with scandir_it:
                while True:
                    try:
                        try:
                            entry = next(scandir_it)
                        except StopIteration:
                            break
                    except OSError as error:
                        add_skip_to_dir_stat_map(args, local_dir_stat_map, error, top)
                        break
    
                    try:
                        is_dir = entry.is_dir()
                    except OSError:
                        is_dir = False
    
                    path = os.path.join(top, entry.name)
                    if is_dir:
                        if entry.name in args.exclude_dir_set:
                            continue
                        if not path.endswith('/'):
                            path += '/'
                        if path in args.exclude_full_path_dir_set:
                            continue
                        my_stat(args, local_dir_stat_map, thread_local_output_lines, path)
                        if entry.is_symlink():
                            continue
                        if time.perf_counter() - start_time > 1 and args.thread_wait_queue.qsize() < args.max_thread_queue_size - len(
                                args.thread_future_set) + args.max_process_queue_size - args.process_wait_queue_size:
                            args.thread_wait_queue.put(path)
                        else:
                            _local_dir_stat_map = stat_walk(args, path, False, thread_local_output_lines, start_time)
                            add_dir_stat_map(args, local_dir_stat_map, _local_dir_stat_map)
                    else:
                        my_stat(args, local_dir_stat_map, thread_local_output_lines, path)
            if need_output and args.stat_type == INODE_LIST:
                output_stat_lines(args, local_dir_stat_map, thread_local_output_lines)
            return local_dir_stat_map
        except Exception as error:
            add_skip_to_dir_stat_map(args, local_dir_stat_map, error, top)
            return local_dir_stat_map
    
    
    def output_stat_lines(args, local_dir_stat_map, thread_local_output_lines):
        with args.thread_lock:
            with args.m_output_file_lock:
                for output_path in args.output_path_list:
                    with open(output_path, 'a') as output_handle:
                        output_handle.writelines(thread_local_output_lines)
        local_dir_stat_map.add_inode_num(len(thread_local_output_lines))
        thread_local_output_lines.clear()
    
    
    def my_stat(args, local_dir_stat_map, thread_local_output_lines, item_path):
        try:
            _stat = os.lstat(item_path)
            if not args.need_filter or filter_inode(args, _stat):
                if args.stat_type == DIR_TREE:
                    add_inode_to_dir_stat_map(args, local_dir_stat_map, item_path, _stat)
                    # Record directory information.
                    if stat.S_ISDIR(_stat.st_mode) and get_depth_from_str(item_path) <= args.min_key_depth + args.depth:
                        local_dir_stat_map.get_or_create(item_path, args, _stat)
                elif args.stat_type == INODE_LIST:
                    if filter_inode(args, _stat):
                        inode_stat_print(args, local_dir_stat_map, item_path, _stat, thread_local_output_lines)
            if args.use_all_inode_num or args.use_all_size:
                if args.stat_type == DIR_TREE:
                    add_inode_to_dir_stat_map(args, local_dir_stat_map, item_path, _stat, all_mode=True)
                    # Record directory information.
                    if stat.S_ISDIR(_stat.st_mode) and get_depth_from_str(item_path) <= args.min_key_depth + args.depth:
                        local_dir_stat_map.get_or_create(item_path, args, _stat)
        except Exception as error:
            add_skip_to_dir_stat_map(args, local_dir_stat_map, error, item_path)
    
    
    def roll_output_file(args):
        for output_path in args.output_path_list:
            need_roll = os.path.isfile(output_path)
            if args.backup_count > 0 and need_roll:
                file_handler = RotatingFileHandler(output_path, backupCount=args.backup_count)
                file_handler.doRollover()
    
    
    def validate_input_dirs(args):
        directories = []
        if args.input_from_file:
            with open(args.input, 'r') as file:
                for line in file:
                    line = line.strip()
                    if line:
                        directories.append(line)
        else:
            directories.extend([_dir.strip() for _dir in args.input.split(",")])
        # Check if each directory exists.
        for index, directory in enumerate(directories):
            if not directory.endswith('/'):
                directory += '/'
                directories[index] = directory
            if not os.path.isdir(directory):
                raise argparse.ArgumentTypeError(f"Directory '{directory}' does not exist.")
        return directories
    
    
    def check_and_init_args(args):
        args.output_path_list = args.output_path.split(",")
        args.min_key_depth = None
        if args.stat_type == DIR_TREE or args.stat_type == INODE_LIST:
            args.input_dirs = validate_input_dirs(args)
            for _dir in args.input_dirs:
                if args.min_key_depth is None:
                    args.min_key_depth = get_depth_from_str(_dir)
                else:
                    args.min_key_depth = min(args.min_key_depth, get_depth_from_str(_dir))
        args.exclude_dir_set = frozenset()
        if args.exclude_dirs is not None:
            args.exclude_dir_set = frozenset(args.exclude_dirs.split(","))
        args.exclude_full_path_dir_set = frozenset()
        if args.exclude_full_path_dirs is not None:
            exclude_full_path_dir_list = []
            for path in args.exclude_full_path_dirs.split(","):
                if not path.endswith('/'):
                    path += '/'
                exclude_full_path_dir_list += [path]
            args.exclude_full_path_dir_set = frozenset(exclude_full_path_dir_list)
    
        args.max_process_queue_size = 100 * args.processes
        args.max_thread_queue_size = 100 * args.threads
        if args.output_format is None:
            if args.stat_type == DIR_TREE:
                args.output_format_list = ['path', 'inode_num', 'size', 'skip_num']
            elif args.stat_type == INODE_LIST:
                args.output_format_list = ['path', 'size']
        else:
            args.output_format_list = args.output_format.split(',')
        args.use_atime = "atime" in args.output_format_list or "atime_ms" in args.output_format_list
        args.use_mtime = "mtime" in args.output_format_list or "mtime_ms" in args.output_format_list
        args.use_ctime = "ctime" in args.output_format_list or "ctime_ms" in args.output_format_list
        args.use_inode = "inode" in args.output_format_list
        args.use_uid = "uid" in args.output_format_list
        args.use_gid = "gid" in args.output_format_list
        args.use_all_inode_num = "all_inode_num" in args.output_format_list
        args.use_all_size = "all_size" in args.output_format_list
        args.need_filter = (args.min_size is not None or args.max_size is not None
                            or args.atime is not None or args.atime_after is not None
                            or args.ctime is not None or args.ctime_after is not None
                            or args.mtime is not None or args.mtime_after is not None
                            or args.filter_inode_type is not None)
    
    
    # / -> depth 0
    # /a/ -> depth 1
    # /a.txt -> depth 1
    # /a/b/ -> depth 2
    # /a/b.txt -> depth 2
    def get_depth_from_str(file_path):
        if file_path.endswith('/'):
            return file_path.count('/') - 1
        return file_path.count('/')
    
    
    def get_depth_from_list(file_path_list):
        if file_path_list[-1] == '':
            return len(file_path_list) - 2
        return len(file_path_list) - 1
    
    
    def add_inode_to_dir_stat_map(args, input_dir_stat_map, inode_path, _stat, all_mode=False):
        inode_size = _stat.st_size if args.use_file_size else _stat.st_blocks
        base_depth = args.min_key_depth
        file_path_list = inode_path.split('/')
        file_depth = get_depth_from_list(file_path_list)
        dir_path = ''
        for i in range(0, min(1 + base_depth + args.depth, file_depth)):
            dir_path += file_path_list[i] + '/'
            if i >= base_depth:
                dir_stat = input_dir_stat_map.get_or_create(dir_path, args, _stat)
                if all_mode:
                    dir_stat.add_all_inode_num(1)
                    dir_stat.add_all_size(inode_size)
                else:
                    dir_stat.add_item_count(1)
                    dir_stat.add_item_size(inode_size)
    
    
    def add_skip_to_dir_stat_map(args, input_dir_stat_map, error, inode_path):
        print(f"Skipping path due to an error. Path: {inode_path}, Error: {error}")
        file_path_list = inode_path.split('/')
        file_depth = get_depth_from_list(file_path_list)
        base_depth = args.min_key_depth
        if inode_path.endswith('/'):
            file_depth += 1
        dir_path = ''
        for i in range(0, min(1 + base_depth + args.depth, file_depth)):
            dir_path += file_path_list[i] + '/'
            if i >= base_depth:
                dir_stat = input_dir_stat_map.get_or_create(dir_path, args, None)
                dir_stat.add_skipped_item_count(1)
    
    
    def chunks(lst, n):
        for i in range(0, len(lst), n):
            yield lst[i:i + n]
    
    
    def stat_func(args):
        args.stat_finish = False
        dir_stat_map = DirectoryStatMap()
        args.start_time = time.time()
        with Manager() as manager:
            args.m_output_file_lock = manager.Lock()
            args.m_process_wait_queue = manager.Queue()
            args.add_wait_flag = False
            process_future_set = set()
            try:
                if args.print_process_time > 0:
                    threading.Thread(target=stat_print_at_runtime, args=(args, dir_stat_map, sys.stdout, True)).start()
                if args.stat_type == DIR_TREE or args.stat_type == INODE_LIST:
                    roll_output_file(args)
                    init_file_header(args)
                    with ProcessPoolExecutor(max_workers=args.processes) as walk_executor:
                        input_dir_chunks = list(chunks(args.input_dirs, args.max_thread_queue_size))
                        for input_dir_chunk in input_dir_chunks:
                            future = walk_executor.submit(stat_process, args, input_dir_chunk)
                            process_future_set.add(future)
    
                        while len(process_future_set) > 0 or not args.m_process_wait_queue.empty():
                            while not args.m_process_wait_queue.empty():
                                if args.max_process_queue_size <= len(process_future_set):
                                    break
                                _dir = args.m_process_wait_queue.get()
                                future = walk_executor.submit(stat_process, args, [_dir])
                                process_future_set.add(future)
                            try:
                                for future in futures.as_completed(process_future_set, timeout=1):
                                    local_dir_stat_map = future.result()
                                    add_dir_stat_map(args, dir_stat_map, local_dir_stat_map)
                                    process_future_set.remove(future)
                            except concurrent.futures.TimeoutError:
                                pass
                            except Exception as e:
                                print(f"Error in worker process: {e}")
                                process_future_set.remove(future)
    
                        args.stat_finish = True
                        if args.stat_type == DIR_TREE:
                            for output_path in args.output_path_list:
                                with open(output_path, 'a+') as output_file:
                                    dir_stat_map_print(args, dir_stat_map, output_file)
                            dir_stat_map_print(args, dir_stat_map, sys.stdout, print_elapsed_time=True,
                                               clear_screen=(args.print_process_time > 0))
                        elif args.stat_type == INODE_LIST:
                            output_total_line_print(args, dir_stat_map, sys.stdout, print_elapsed_time=True,
                                                    clear_screen=(args.print_process_time > 0))
            except Exception as e:
                print(f"Error in stat_func: {e}")
            args.stat_finish = True
    
    
    def init_file_header(args):
        for output_path in args.output_path_list:
            file_exists = os.path.isfile(output_path)
            file_empty = not file_exists or os.path.getsize(output_path) == 0
            with open(output_path, 'a+') as output_file:
                with args.m_output_file_lock:
                    if args.output_without_header:
                        continue
                    if file_empty:
                        output_file.write(','.join(args.output_format_list) + '\n')
                        output_file.flush()
    
    
    def main():
        parser = CustomArgumentParser(description='Scans and collects metadata statistics for NAS file systems.')
        args = parser.parse_args()
        check_and_init_args(args)
        stat_func(args)
    
    
    if __name__ == "__main__":
        main()
    
  3. Configure the .meta_stat_key.json file, which stores the keys and parameters for accessing various cloud services.

    {
        "oss_access_key_id": "<AccessKey ID>",
        "oss_access_key_secret": "<AccessKey Secret>",
        "ots_access_key_id": "<AccessKey ID>",
        "ots_access_key_secret": "<AccessKey Secret>",
        "BASE_PATH": "/cpfs",
        "OUTPUT_DIR_PREFIX": "/opt/cpfs-scanner/bmstat/",
        "OSS_REGION": "<region-id>",
        "OSS_BUCKET": "bmcpfs-scan",
        "OSS_KEY": "bmcpfs_stat_tool_config.json",
        "OSS_ENDPOINT": "oss-<region-id>-internal.aliyuncs.com",
        "nas_access_key_id": "<AccessKey ID>",
        "nas_access_key_secret": "<AccessKey Secret>",
        "nas_endpoint": "nas-vpc.<region-id>.aliyuncs.com"
    }

    The following table describes each configuration item.

    Parameter

    Description

    oss_access_key_id / oss_access_key_secret

    The AccessKey for accessing OSS. This key must have read and write permissions on the target bucket.

    ots_access_key_id / ots_access_key_secret

    The AccessKey for accessing Tablestore. This key must have read and write permissions on the target instance.

    BASE_PATH

    The mount path of the CPFS file system. For example, /cpfs.

    OUTPUT_DIR_PREFIX

    The local output directory for scan result files. Ensure you have write permissions on this directory.

    OSS_REGION

    The region of the OSS bucket.

    OSS_BUCKET

    The name of the OSS bucket for archived scan result files.

    OSS_KEY

    The name of the scan behavior configuration file. This name is used to read the configuration locally and is also the object name of the file in OSS (for the --type download mode).

    OSS_ENDPOINT

    The OSS endpoint. If your server supports internal network access, use an internal endpoint to reduce latency and data transfer costs.

    nas_access_key_id / nas_access_key_secret

    The AccessKey for querying fileset information via the NAS API.

    nas_endpoint

    The endpoint for the NAS API.

  4. Configure the bmcpfs_stat_tool_config.json file. This file defines the scan scope, field mappings, and Tablestore write parameters.

    {
        "fsid": "<file-system-id>",
        "ossPath": "/",
        "scanToFiles": [
        ],
        "scanToTablestore": [
            "/fileset1",
            "/fileset2"
        ],
        "fields": {
            "aTime": "$atime_ms",
            "cTime": "$ctime_ms",
            "mTime": "$mtime_ms",
            "fileSize": "$size",
            "cluster_id": "cluster_id",
            "fileset_id": "$fset_id",
            "fileset_name": "$fset_name",
            "fileset_file_count_quota": "$fset_file_count_quota",
            "fileset_size_quota": "$fset_size_quota",
            "wTime": "$wtime_ms",
            "region": "region-id"
        },
        "pkFields": {
            "file_name": "$path"
        },
        "otsConfig": {
            "instanceName": "<instance-name>",
            "tableName": "bmcpfs_test",
            "endpoint": "https://<instance-name>.<region-id>.vpc.tablestore.aliyuncs.com",
            "rowExistenceException": "IGNORE",
            "columnCondition": "IGNORE",
            "writeType": "PUT"
        },
        "schedule": {
            "type": "once",
            "cron": "0 * * * *"
        }
    }

    The following table describes each configuration item.

    Parameter

    Description

    fsid (Required)

    The CPFS file system ID. You can obtain this ID from the CPFS console.

    scanToTablestore (Required)

    Specifies the directories to scan and write to Tablestore. These paths are relative to the BASE_PATH parameter.

    otsConfig.instanceName (Required)

    The Tablestore instance name.

    otsConfig.tableName (Required)

    The Tablestore table name. This must match the table name that you created in Step 2.

    otsConfig.endpoint (Required)

    The Tablestore VPC endpoint, in the format https://<instance-name>.<region-id>.vpc.tablestore.aliyuncs.com. If your server cannot access the VPC endpoint, use a public endpoint.

    fields

    The mapping from scanned fields to Tablestore column names. Values that start with $ are CPFS metadata variables.

    pkFields

    The primary key field mapping. file_name maps to the full path of the file.

    cluster_id

    The cluster ID. You can customize this value.

    region

    The region. You can customize this value.

    schedule.type

    The scheduling type. The value once indicates a one-time execution.

    schedule.cron

    The Cron expression. This parameter applies when type is configured for scheduled execution.

    Note

    Paths specified in scanToFiles and scanToTablestore are relative to the BASE_PATH defined in the .meta_stat_key.json file. For example, if BASE_PATH is /cpfs, the path /fileset1 corresponds to the actual scan path /cpfs/fileset1.

  5. Make the scanning scripts executable.

    chmod 755 meta_stat.py nas_stat_util.py

Step 4: Run the scan

Run the following command to start the scan. The tool reads the local configuration, scans the specified directories, writes metadata to Tablestore, and uploads packaged results to OSS.

python3 meta_stat.py --type run

The --type parameter supports the following three modes.

Value

Description

run

Performs a full execution: reads the local configuration file, scans CPFS metadata, writes it to Tablestore, and uploads the packaged scan results to OSS.

download

Downloads the latest bmcpfs_stat_tool_config.json file from OSS and updates the local crontab based on the scheduling plan in the configuration file.

upload

Only uploads existing local scan results to Tablestore without running a new scan.

To run scans regularly, configure a scheduled task in crontab. Run the following command to open the crontab file:

crontab -e

Add a scheduled task rule at the end of the file. For example, to run a scan at midnight every day, add the following line:

0 0 * * * /usr/bin/python3 /opt/cpfs-scanner/meta_stat.py --type run >> /opt/cpfs-scanner/cron.log 2>&1

The >> /opt/cpfs-scanner/cron.log 2>&1 part redirects standard output and standard error to a log file for troubleshooting. Adjust the Python interpreter and script paths to match your environment.

Step 5: Configure Grafana display

1. Install Tablestore data source plugin

  1. Upload aliyun-tablestore-grafana-datasource-adapt-react.zip to the Grafana plugins directory, such as /var/lib/grafana/plugins.

  2. Unzip the plugin file.

    unzip aliyun-tablestore-grafana-datasource-adapt-react.zip -d /var/lib/grafana/plugins/
  3. Modify the Grafana configuration file to allow loading unsigned plugins.

    vi /etc/grafana/grafana.ini

    Find the allow_loading_unsigned_plugins parameter and set it to the plugin name.

    allow_loading_unsigned_plugins = aliyun-tablestore-grafana-datasource-adapt-react
    Note

    This configuration option is commented out by default with a ; at the beginning of the line, and you must remove the leading ; when you modify it for the configuration to take effect.

  4. Restart the Grafana service to apply the changes.

    systemctl restart grafana-server

2. Add a data source

  1. Log in to Grafana. Go to Connections > Data sources, then click Add new data source.

  2. In the search box, enter aliyun-tablestore and select the aliyun-tablestore-grafana-datasource-adapt-react type.

  3. Configure the following connection parameters.

    Parameter

    Description

    Endpoint

    The endpoint of the Tablestore instance. The format is https://<instance-name>.<region-id>.ots.aliyuncs.com.

    Instance

    The name of the Tablestore instance.

    AccessKey ID

    The AccessKey ID of your Alibaba Cloud account or RAM user.

    AccessKey Secret

    The AccessKey Secret of your Alibaba Cloud account or RAM user.

    Note

    The Endpoint parameter supports both public and VPC addresses. If your Grafana server and Tablestore instance are in the same VPC and region, use the VPC address (format: https://<instance-name>.<region-id>.vpc.tablestore.aliyuncs.com) to reduce latency and avoid public network traffic fees. Otherwise, use the public address.

  4. Click Save & test to verify the connection.

3. Import a dashboard

  1. Before importing, open the dashboard JSON file and set the datasource uid to the UID of your data source. You can find this UID in the URL on the data source settings page. The following code is an example of a file overview JSON. For more sample templates, see Grafana Dashboard Example Templates.

    {
      "annotations": {
        "list": [
          {
            "builtIn": 1,
            "datasource": {
              "type": "grafana",
              "uid": "-- Grafana --"
            },
            "enable": true,
            "hide": true,
            "iconColor": "rgba(0, 211, 255, 1)",
            "name": "Annotations & Alerts",
            "type": "dashboard"
          }
        ]
      },
      "editable": true,
      "fiscalYearStartMonth": 0,
      "graphTooltip": 0,
      "id": 1,
      "links": [
        {
          "asDropdown": false,
          "icon": "external link",
          "includeVars": false,
          "keepTime": false,
          "tags": [],
          "targetBlank": false,
          "title": "New link",
          "tooltip": "",
          "type": "dashboards",
          "url": ""
        }
      ],
      "panels": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "description": "",
          "fieldConfig": {
            "defaults": {
              "color": {
                "mode": "continuous-GrYlRd"
              },
              "mappings": [],
              "thresholds": {
                "mode": "absolute",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "bytes"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 4,
            "w": 6,
            "x": 0,
            "y": 0
          },
          "id": 3,
          "options": {
            "colorMode": "background",
            "graphMode": "none",
            "justifyMode": "auto",
            "orientation": "horizontal",
            "percentChangeColorMode": "standard",
            "reduceOptions": {
              "calcs": [],
              "fields": "",
              "values": false
            },
            "showPercentChange": false,
            "textMode": "auto",
            "textStyle": {
              "fontSize": "40px",
              "fontWeight": "bold"
            },
            "wideLayout": true
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "datasource": {
                "type": "aliyun-tablestore-grafana-datasource-adapt-react",
                "uid": "fffrqt7w6setcc"
              },
              "formatType": "Table",
              "query": "SELECT SUM(fileSize) AS totalSize  FROM `$tableName` USE INDEX($indexName)",
              "refId": "A",
              "xcol": "",
              "ycol": ""
            }
          ],
          "title": "Total Capacity Usage",
          "transformations": [
            {
              "id": "prepareTimeSeries",
              "options": {}
            }
          ],
          "type": "stat"
        },
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "description": "",
          "fieldConfig": {
            "defaults": {
              "color": {
                "mode": "continuous-RdYlGr"
              },
              "mappings": [],
              "thresholds": {
                "mode": "absolute",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "none"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 4,
            "w": 6,
            "x": 6,
            "y": 0
          },
          "id": 4,
          "options": {
            "colorMode": "background",
            "graphMode": "area",
            "justifyMode": "auto",
            "orientation": "horizontal",
            "percentChangeColorMode": "standard",
            "reduceOptions": {
              "calcs": [],
              "fields": "",
              "values": false
            },
            "showPercentChange": false,
            "textMode": "auto",
            "textStyle": {
              "fontSize": "40px",
              "fontWeight": "bold"
            },
            "wideLayout": true
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "datasource": {
                "type": "aliyun-tablestore-grafana-datasource-adapt-react",
                "uid": "fffrqt7w6setcc"
              },
              "formatType": "Table",
              "query": "SELECT COUNT(*) AS file_count FROM `$tableName` USE INDEX($indexName)",
              "refId": "A",
              "xcol": "_time",
              "ycol": ""
            }
          ],
          "title": "Total File Count",
          "type": "stat"
        },
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "description": "",
          "fieldConfig": {
            "defaults": {
              "color": {
                "fixedColor": "orange",
                "mode": "shades"
              },
              "mappings": [],
              "thresholds": {
                "mode": "absolute",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "none"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 4,
            "w": 5,
            "x": 12,
            "y": 0
          },
          "id": 5,
          "options": {
            "colorMode": "background",
            "graphMode": "area",
            "justifyMode": "auto",
            "orientation": "horizontal",
            "percentChangeColorMode": "standard",
            "reduceOptions": {
              "calcs": [
                "lastNotNull"
              ],
              "fields": "",
              "values": false
            },
            "showPercentChange": false,
            "textMode": "auto",
            "textStyle": {
              "fontSize": "40px",
              "fontWeight": "bold"
            },
            "wideLayout": true
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "datasource": {
                "type": "aliyun-tablestore-grafana-datasource-adapt-react",
                "uid": "fffrqt7w6setcc"
              },
              "formatType": "Table",
              "query": "SELECT COUNT(DISTINCT region) AS regions_count FROM `$tableName` USE INDEX($indexName)",
              "refId": "A",
              "xcol": "_time",
              "ycol": ""
            }
          ],
          "title": "Regions Covered",
          "type": "stat"
        },
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "description": "",
          "fieldConfig": {
            "defaults": {
              "color": {
                "fixedColor": "purple",
                "mode": "shades"
              },
              "mappings": [],
              "thresholds": {
                "mode": "absolute",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "none"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 4,
            "w": 7,
            "x": 17,
            "y": 0
          },
          "id": 6,
          "options": {
            "colorMode": "background",
            "graphMode": "none",
            "justifyMode": "auto",
            "orientation": "horizontal",
            "percentChangeColorMode": "standard",
            "reduceOptions": {
              "calcs": [
                "lastNotNull"
              ],
              "fields": "",
              "values": false
            },
            "showPercentChange": false,
            "textMode": "auto",
            "textStyle": {
              "fontSize": "40px",
              "fontWeight": "bold"
            },
            "wideLayout": true
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "datasource": {
                "type": "aliyun-tablestore-grafana-datasource-adapt-react",
                "uid": "fffrqt7w6setcc"
              },
              "formatType": "Table",
              "query": "SELECT COUNT(DISTINCT cluster_id) AS cluster_count FROM `$tableName` USE INDEX($indexName)",
              "refId": "A",
              "xcol": "_time",
              "ycol": ""
            }
          ],
          "title": "Cluster Count",
          "type": "stat"
        },
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "fieldConfig": {
            "defaults": {
              "color": {
                "mode": "palette-classic"
              },
              "custom": {
                "axisBorderShow": false,
                "axisCenteredZero": false,
                "axisColorMode": "text",
                "axisLabel": "",
                "axisPlacement": "auto",
                "fillOpacity": 80,
                "gradientMode": "none",
                "hideFrom": {
                  "legend": false,
                  "tooltip": false,
                  "viz": false
                },
                "lineWidth": 1,
                "scaleDistribution": {
                  "log": 2,
                  "type": "log"
                },
                "thresholdsStyle": {
                  "mode": "off"
                }
              },
              "mappings": [],
              "thresholds": {
                "mode": "absolute",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "percent"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 10,
            "w": 24,
            "x": 0,
            "y": 4
          },
          "id": 7,
          "options": {
            "barRadius": 0,
            "barWidth": 0.15,
            "fullHighlight": true,
            "groupWidth": 0.7,
            "legend": {
              "calcs": [
                "first"
              ],
              "displayMode": "list",
              "placement": "bottom",
              "showLegend": false
            },
            "orientation": "horizontal",
            "showValue": "auto",
            "stacking": "none",
            "tooltip": {
              "hideZeros": false,
              "mode": "multi",
              "sort": "none"
            },
            "xField": "cluster_id",
            "xTickLabelRotation": 0,
            "xTickLabelSpacing": 0
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "datasource": {
                "type": "aliyun-tablestore-grafana-datasource-adapt-react",
                "uid": "fffrqt7w6setcc"
              },
              "formatType": "Table",
              "query": "SELECT \n    cluster_id,\n    ROUND((SUM(fileSize) * 100.0 / (SELECT SUM(fileSize) FROM `$tableName` USE INDEX($indexName))), 2) AS `Capacity Usage`\nFROM `$tableName` USE INDEX($indexName)\nGROUP BY cluster_id\nORDER BY cluster_id;",
              "refId": "A",
              "xcol": "_time",
              "ycol": ""
            }
          ],
          "title": "Capacity Distribution by Cluster",
          "type": "barchart"
        },
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "fieldConfig": {
            "defaults": {
              "color": {
                "mode": "palette-classic"
              },
              "mappings": [],
              "thresholds": {
                "mode": "percentage",
                "steps": [
                  {
                    "color": "green",
                    "value": 0
                  },
                  {
                    "color": "red",
                    "value": 80
                  }
                ]
              },
              "unit": "percent"
            },
            "overrides": []
          },
          "gridPos": {
            "h": 6,
            "w": 24,
            "x": 0,
            "y": 14
          },
          "id": 9,
          "options": {
            "minVizHeight": 75,
            "minVizWidth": 75,
            "orientation": "auto",
            "reduceOptions": {
              "calcs": [],
              "fields": "/^percentage$/",
              "values": true
            },
            "showThresholdLabels": false,
            "showThresholdMarkers": false,
            "sizing": "auto"
          },
          "pluginVersion": "12.3.1",
          "targets": [
            {
              "formatType": "Table",
              "query": "SELECT\n  fileset_id,\n  SUM(fileSize) * 100.0 / MAX(fileset_size_quota) AS percentage\nFROM `$tableName` USE INDEX($indexName)\nGROUP BY fileset_id\nORDER BY fileset_id",
              "refId": "A",
              "xcol": "_time",
              "ycol": ""
            }
          ],
          "title": "Fileset Capacity Usage",
          "type": "gauge"
        }
      ],
      "preload": false,
      "refresh": "5m",
      "schemaVersion": 42,
      "tags": [],
      "templating": {
        "list": [
          {
            "current": {
              "text": "bmcpfs_test",
              "value": "bmcpfs_test"
            },
            "hide": 2,
            "name": "tableName",
            "query": "bmcpfs_test",
            "skipUrlSync": true,
            "type": "constant"
          },
          {
            "current": {
              "text": "bmcpfs_test_idx",
              "value": "bmcpfs_test_idx"
            },
            "hide": 2,
            "name": "indexName",
            "query": "bmcpfs_test_idx",
            "skipUrlSync": true,
            "type": "constant"
          }
        ]
      },
      "time": {
        "from": "now-6h",
        "to": "now"
      },
      "timepicker": {
        "hidden": true
      },
      "timezone": "browser",
      "title": "CPFS Overview",
      "uid": "adqgdbx",
      "version": 64
    }
  2. Go to Dashboards, click New > Import in the upper-right corner, then upload the modified JSON file.

4. Customize table and index names (Optional)

If your table and index names differ from the default values in the dashboard template, you must update the corresponding dashboard variables.

  1. On the dashboard page, click Edit > Settings > Variables in the upper-right corner.

  2. Find the tableName and indexName variables, and change their default values to your table and index names.

  3. Click Save dashboard.

Sample Grafana dashboard templates

CPFS capacity trend

{
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": {
          "type": "grafana",
          "uid": "-- Grafana --"
        },
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0, 211, 255, 1)",
        "name": "Annotations & Alerts",
        "type": "dashboard"
      }
    ]
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "id": 6,
  "links": [
    {
      "asDropdown": true,
      "icon": "external link",
      "includeVars": false,
      "keepTime": false,
      "tags": [],
      "targetBlank": false,
      "title": "Other Dashboards",
      "tooltip": "",
      "type": "dashboards",
      "url": ""
    }
  ],
  "liveNow": true,
  "panels": [
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "continuous-BlYlRd"
          },
          "custom": {
            "axisBorderShow": false,
            "axisCenteredZero": false,
            "axisColorMode": "text",
            "axisLabel": "",
            "axisPlacement": "auto",
            "fillOpacity": 80,
            "gradientMode": "none",
            "hideFrom": {
              "legend": false,
              "tooltip": false,
              "viz": false
            },
            "lineWidth": 1,
            "scaleDistribution": {
              "type": "linear"
            },
            "thresholdsStyle": {
              "mode": "off"
            }
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": [
          {
            "matcher": {
              "id": "byName",
              "options": "file_size"
            },
            "properties": [
              {
                "id": "unit",
                "value": "bytes"
              }
            ]
          }
        ]
      },
      "gridPos": {
        "h": 11,
        "w": 24,
        "x": 0,
        "y": 0
      },
      "id": 9,
      "options": {
        "barRadius": 0,
        "barWidth": 0.1,
        "colorByField": "time",
        "fullHighlight": true,
        "groupWidth": 0.7,
        "legend": {
          "calcs": [],
          "displayMode": "list",
          "placement": "bottom",
          "showLegend": false
        },
        "orientation": "auto",
        "showValue": "auto",
        "stacking": "none",
        "text": {
          "valueSize": 1
        },
        "tooltip": {
          "hideZeros": false,
          "mode": "single",
          "sort": "none"
        },
        "xField": "time",
        "xTickLabelRotation": 0,
        "xTickLabelSpacing": 100
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT DATE(DATE_ADD(FROM_UNIXTIME(cTime / 1000), INTERVAL 8 HOUR)) AS date ,SUM(fileSize) as file_size\n FROM (\n  SELECT \n    cTime,fileSize \n  FROM \n    $tableName USE INDEX($indexName)  \n  WHERE \n    cTime >= $__unixMilliTimeFrom() AND cTime < $__unixMilliTimeTo()\n) tl\nGROUP BY date\nORDER BY date",
          "refId": "A",
          "xcol": " time",
          "ycol": ""
        }
      ],
      "title": "New file capacity",
      "type": "barchart"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "fixedColor": "green",
            "mode": "fixed"
          },
          "custom": {
            "axisBorderShow": false,
            "axisCenteredZero": false,
            "axisColorMode": "text",
            "axisLabel": "",
            "axisPlacement": "auto",
            "fillOpacity": 80,
            "gradientMode": "none",
            "hideFrom": {
              "legend": false,
              "tooltip": false,
              "viz": false
            },
            "lineWidth": 1,
            "scaleDistribution": {
              "type": "linear"
            },
            "thresholdsStyle": {
              "mode": "off"
            }
          },
          "displayName": "Total New Files",
          "fieldMinMax": false,
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              }
            ]
          },
          "unit": "none"
        },
        "overrides": []
      },
      "gridPos": {
        "h": 9,
        "w": 24,
        "x": 0,
        "y": 11
      },
      "id": 10,
      "options": {
        "barRadius": 0,
        "barWidth": 0.1,
        "colorByField": "day",
        "fullHighlight": true,
        "groupWidth": 0.7,
        "legend": {
          "calcs": [],
          "displayMode": "list",
          "placement": "right",
          "showLegend": false
        },
        "orientation": "auto",
        "showValue": "auto",
        "stacking": "none",
        "text": {
          "valueSize": 1
        },
        "tooltip": {
          "hideZeros": false,
          "mode": "multi",
          "sort": "none"
        },
        "xTickLabelRotation": 0,
        "xTickLabelSpacing": 100
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT DATE(DATE_ADD(FROM_UNIXTIME(cTime / 1000), INTERVAL 8 HOUR)) AS date ,COUNT(*) as file_count\nFROM (\n  SELECT \n    cTime,file_name \n  FROM \n    $tableName USE INDEX($indexName)  \n  WHERE \n    cTime >= $__unixMilliTimeFrom() AND cTime < $__unixMilliTimeTo()\n) tl\nGROUP BY date\nORDER BY date",
          "refId": "A",
          "xcol": "time",
          "ycol": ""
        }
      ],
      "title": "New file count",
      "type": "barchart"
    }
  ],
  "preload": true,
  "refresh": "5m",
  "schemaVersion": 42,
  "tags": [],
  "templating": {
    "list": [
      {
        "current": {
          "text": "bmcpfs_test",
          "value": "bmcpfs_test"
        },
        "hide": 2,
        "label": "Table name",
        "name": "tableName",
        "query": "bmcpfs_test",
        "skipUrlSync": true,
        "type": "constant"
      },
      {
        "current": {
          "text": "bmcpfs_test_idx",
          "value": "bmcpfs_test_idx"
        },
        "hide": 2,
        "name": "indexName",
        "query": "bmcpfs_test_idx",
        "skipUrlSync": true,
        "type": "constant"
      }
    ]
  },
  "time": {
    "from": "now-7d",
    "to": "now"
  },
  "timepicker": {},
  "timezone": "browser",
  "title": "CPFS capacity trend",
  "uid": "adj2l5q",
  "version": 41
}

CPFS directory comparison

{
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": {
          "type": "grafana",
          "uid": "-- Grafana --"
        },
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0, 211, 255, 1)",
        "name": "Annotations & Alerts",
        "type": "dashboard"
      }
    ]
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "id": 7,
  "links": [
    {
      "asDropdown": false,
      "icon": "external link",
      "includeVars": false,
      "keepTime": false,
      "tags": [],
      "targetBlank": false,
      "title": "New link",
      "tooltip": "",
      "type": "dashboards",
      "url": ""
    }
  ],
  "panels": [
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "fixedColor": "blue",
            "mode": "fixed"
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": []
      },
      "gridPos": {
        "h": 4,
        "w": 8,
        "x": 0,
        "y": 0
      },
      "id": 1,
      "options": {
        "colorMode": "background",
        "graphMode": "area",
        "justifyMode": "auto",
        "orientation": "auto",
        "percentChangeColorMode": "standard",
        "reduceOptions": {
          "calcs": [
            "lastNotNull"
          ],
          "fields": "",
          "values": false
        },
        "showPercentChange": false,
        "textMode": "auto",
        "wideLayout": true
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT COUNT(*) AS total_count\nFROM (\n    SELECT file_name FROM $tableName USE INDEX() WHERE file_name Like CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n    UNION ALL\n    SELECT file_name FROM $tableName USE INDEX() WHERE file_name Like CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) t",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Total file count",
      "type": "stat"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "fixedColor": "light-red",
            "mode": "fixed"
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": []
      },
      "gridPos": {
        "h": 4,
        "w": 8,
        "x": 8,
        "y": 0
      },
      "id": 2,
      "options": {
        "colorMode": "background",
        "graphMode": "area",
        "justifyMode": "auto",
        "orientation": "auto",
        "percentChangeColorMode": "standard",
        "reduceOptions": {
          "calcs": [
            "lastNotNull"
          ],
          "fields": "",
          "values": false
        },
        "showPercentChange": false,
        "textMode": "auto",
        "wideLayout": true
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT COUNT(*) AS duplicate_count\nFROM (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) a\nJOIN (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) b\nON (SUBSTRING_INDEX(a.file_name, '/', -1) = SUBSTRING_INDEX(b.file_name, '/', -1) AND a.fileSize = b.fileSize)",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Duplicate file count",
      "type": "stat"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "fixedColor": "orange",
            "mode": "fixed"
          },
          "mappings": [],
          "noValue": "0GB",
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          },
          "unit": "bytes"
        },
        "overrides": []
      },
      "gridPos": {
        "h": 4,
        "w": 8,
        "x": 16,
        "y": 0
      },
      "id": 3,
      "options": {
        "colorMode": "background",
        "graphMode": "area",
        "justifyMode": "auto",
        "orientation": "auto",
        "percentChangeColorMode": "standard",
        "reduceOptions": {
          "calcs": [
            "lastNotNull"
          ],
          "fields": "",
          "values": false
        },
        "showPercentChange": false,
        "textMode": "auto",
        "wideLayout": true
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT SUM(a.fileSize) AS duplicate_size\nFROM (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) a\nJOIN (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) b\nON (SUBSTRING_INDEX(a.file_name, '/', -1) = SUBSTRING_INDEX(b.file_name, '/', -1) AND a.fileSize = b.fileSize)",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Space used by duplicates",
      "type": "stat"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "custom": {
            "align": "auto",
            "cellOptions": {
              "type": "auto"
            },
            "footer": {
              "reducers": []
            },
            "hideFrom": {
              "viz": false
            },
            "inspect": false,
            "wrapHeaderText": false
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": [
          {
            "matcher": {
              "id": "byName",
              "options": "Directory A/B path"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 801
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File name"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 405
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File size"
            },
            "properties": [
              {
                "id": "unit",
                "value": "bytes"
              }
            ]
          }
        ]
      },
      "gridPos": {
        "h": 9,
        "w": 24,
        "x": 0,
        "y": 4
      },
      "id": 4,
      "options": {
        "cellHeight": "sm",
        "showHeader": true,
        "sortBy": [
          {
            "desc": false,
            "displayName": "File size"
          }
        ]
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT \n    SUBSTRING_INDEX(a.file_name, '/', -1) AS 'File name',\n    CONCAT('A:',a.file_name,'\\n','B:',b.file_name) AS 'Directory A/B path',\n    a.fileSize AS 'File size'\nFROM (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) a\nJOIN (\n    SELECT file_name, fileSize\n    FROM $tableName USE INDEX()\n    WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) b\nON (SUBSTRING_INDEX(a.file_name, '/', -1) = SUBSTRING_INDEX(b.file_name, '/', -1) AND a.fileSize = b.fileSize)",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Duplicate file list",
      "type": "table"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "custom": {
            "align": "auto",
            "cellOptions": {
              "type": "auto"
            },
            "footer": {
              "reducers": []
            },
            "hideFrom": {
              "viz": false
            },
            "inspect": false
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": [
          {
            "matcher": {
              "id": "byName",
              "options": "File size"
            },
            "properties": [
              {
                "id": "unit",
                "value": "bytes"
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Path source"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 118
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File name"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 288
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File path"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 904
              }
            ]
          }
        ]
      },
      "gridPos": {
        "h": 7,
        "w": 24,
        "x": 0,
        "y": 13
      },
      "id": 5,
      "options": {
        "cellHeight": "sm",
        "showHeader": true,
        "sortBy": [
          {
            "desc": false,
            "displayName": "File path"
          }
        ]
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT \n    'Path A' AS 'Path source',\n    SUBSTRING_INDEX(file_name, '/', -1) AS 'File name',\n    file_name AS 'File path',\n    fileSize AS 'File size'\nFROM $tableName USE INDEX()\nWHERE \n    file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n    AND (SUBSTRING_INDEX(file_name, '/', -1), fileSize) NOT IN (\n    SELECT \n        SUBSTRING_INDEX(file_name, '/', -1), fileSize\n    FROM \n        $tableName USE INDEX()\n    WHERE \n        file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n  )\n\nUNION ALL\n\nSELECT \n    'Path B' AS 'Path source',\n    SUBSTRING_INDEX(file_name, '/', -1) AS 'File name',\n    file_name AS 'File path',\n    fileSize AS 'File size'\nFROM $tableName USE INDEX()\nWHERE \n    file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n    AND (SUBSTRING_INDEX(file_name, '/', -1), fileSize) NOT IN (\n    SELECT \n        SUBSTRING_INDEX(file_name, '/', -1), fileSize\n    FROM \n        $tableName USE INDEX()\n    WHERE \n        file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n  )",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Unique file list",
      "type": "table"
    }
  ],
  "preload": false,
  "schemaVersion": 42,
  "tags": [],
  "templating": {
    "list": [
      {
        "current": {
          "text": "/fileset1",
          "value": "/fileset1"
        },
        "label": "Source directory (A)",
        "name": "path1",
        "options": [
          {
            "selected": true,
            "text": "/fileset1",
            "value": "/fileset1"
          }
        ],
        "query": "/fileset1",
        "type": "textbox"
      },
      {
        "current": {
          "text": "/fileset2",
          "value": "/fileset2"
        },
        "label": "Reference directory (B)",
        "name": "path2",
        "options": [
          {
            "selected": true,
            "text": "/fileset2",
            "value": "/fileset2"
          }
        ],
        "query": "/fileset2",
        "type": "textbox"
      },
      {
        "current": {
          "text": "bmcpfs_test",
          "value": "bmcpfs_test"
        },
        "hide": 2,
        "label": "Table name",
        "name": "tableName",
        "query": "bmcpfs_test",
        "skipUrlSync": true,
        "type": "constant"
      }
    ]
  },
  "time": {
    "from": "now-6h",
    "to": "now"
  },
  "timepicker": {
    "hidden": true
  },
  "timezone": "browser",
  "title": "CPFS directory comparison",
  "uid": "adwm968",
  "version": 44
}

CPFS file search

{
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": "-- Grafana --",
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0, 211, 255, 1)",
        "name": "Annotations & Alerts",
        "type": "dashboard"
      }
    ]
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "id": 4,
  "links": [
    {
      "asDropdown": true,
      "icon": "external link",
      "includeVars": false,
      "keepTime": false,
      "tags": [],
      "targetBlank": false,
      "title": "Other Dashboards",
      "tooltip": "",
      "type": "dashboards",
      "url": ""
    }
  ],
  "panels": [
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "custom": {
            "align": "left",
            "cellOptions": {
              "type": "auto"
            },
            "footer": {
              "reducers": []
            },
            "inspect": false
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": [
          {
            "matcher": {
              "id": "byName",
              "options": "File name"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 667
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Creation time"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 238
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Region"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 277
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Creation time"
            },
            "properties": [
              {
                "id": "unit",
                "value": "dateTimeAsIso"
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File size"
            },
            "properties": [
              {
                "id": "unit",
                "value": "bytes"
              }
            ]
          }
        ]
      },
      "gridPos": {
        "h": 21,
        "w": 24,
        "x": 0,
        "y": 0
      },
      "id": 1,
      "options": {
        "cellHeight": "md",
        "showHeader": true,
        "sortBy": []
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT \n  file_name AS \"File name\",\n  cTime AS \"Creation time\",\n  region AS \"Region\",\n  fileSize AS \"File size\"\nFROM $tableName USE INDEX($indexName)\nWHERE TEXT_MATCH_PHRASE(file_name,'$searchKeyword')\nLIMIT $limit",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "File search results",
      "type": "table"
    }
  ],
  "preload": false,
  "refresh": "",
  "schemaVersion": 42,
  "tags": [],
  "templating": {
    "list": [
      {
        "current": {
          "text": "/fileset1",
          "value": "/fileset1"
        },
        "label": "File search",
        "name": "searchKeyword",
        "options": [
          {
            "selected": true,
            "text": "/fileset1",
            "value": "/fileset1"
          }
        ],
        "query": "/fileset1",
        "type": "textbox"
      },
      {
        "current": {
          "text": "bmcpfs_test",
          "value": "bmcpfs_test"
        },
        "hide": 2,
        "label": "Table name",
        "name": "tableName",
        "query": "bmcpfs_test",
        "skipUrlSync": true,
        "type": "constant"
      },
      {
        "current": {
          "text": "5000",
          "value": "5000"
        },
        "label": "Display limit",
        "name": "limit",
        "options": [
          {
            "selected": false,
            "text": "10",
            "value": "10"
          },
          {
            "selected": false,
            "text": "50",
            "value": "50"
          },
          {
            "selected": false,
            "text": "100",
            "value": "100"
          },
          {
            "selected": false,
            "text": "500",
            "value": "500"
          },
          {
            "selected": false,
            "text": "1000",
            "value": "1000"
          },
          {
            "selected": true,
            "text": "5000",
            "value": "5000"
          },
          {
            "selected": false,
            "text": "10000",
            "value": "10000"
          }
        ],
        "query": "10,50,100,500,1000,5000,10000",
        "type": "custom"
      },
      {
        "current": {
          "text": "bmcpfs_test_idx",
          "value": "bmcpfs_test_idx"
        },
        "hide": 2,
        "name": "indexName",
        "query": "bmcpfs_test_idx",
        "skipUrlSync": true,
        "type": "constant"
      }
    ]
  },
  "time": {
    "from": "now-1h",
    "to": "now"
  },
  "timepicker": {
    "hidden": true
  },
  "timezone": "",
  "title": "CPFS file search",
  "uid": "file-search-analysis",
  "version": 33
}

CPFS file search and analysis

{
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": "-- Grafana --",
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0, 211, 255, 1)",
        "name": "Annotations & Alerts",
        "type": "dashboard"
      }
    ]
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "id": 3,
  "links": [
    {
      "asDropdown": true,
      "icon": "external link",
      "includeVars": false,
      "keepTime": false,
      "tags": [],
      "targetBlank": false,
      "title": "Other Dashboards",
      "tooltip": "",
      "type": "dashboards",
      "url": ""
    }
  ],
  "panels": [
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "continuous-BlPu"
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          },
          "unit": "none"
        },
        "overrides": []
      },
      "gridPos": {
        "h": 3,
        "w": 12,
        "x": 0,
        "y": 0
      },
      "id": 3,
      "options": {
        "colorMode": "background",
        "graphMode": "none",
        "justifyMode": "auto",
        "orientation": "horizontal",
        "percentChangeColorMode": "standard",
        "reduceOptions": {
          "calcs": [
            "last"
          ],
          "fields": "",
          "values": false
        },
        "showPercentChange": false,
        "textMode": "auto",
        "wideLayout": true
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT COUNT(*) AS file_count\nFROM `$tableName` USE INDEX($indexName)\nWHERE \n  fileSize >= IF('$fileSize'='',0,$fileSize+0) * 1024\nAND \n  aTime >= $__unixMilliTimeFrom() AND aTime < $__unixMilliTimeTo()\nAND\n  TEXT_MATCH_PHRASE(file_name,'$pathFilter')",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "Matching file count",
      "type": "stat"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "mappings": [],
          "noValue": "0GB",
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 100
              }
            ]
          },
          "unit": "bytes"
        },
        "overrides": []
      },
      "gridPos": {
        "h": 3,
        "w": 12,
        "x": 12,
        "y": 0
      },
      "id": 5,
      "options": {
        "colorMode": "background",
        "graphMode": "area",
        "justifyMode": "auto",
        "orientation": "horizontal",
        "percentChangeColorMode": "standard",
        "reduceOptions": {
          "calcs": [
            "last"
          ],
          "fields": "",
          "values": false
        },
        "showPercentChange": false,
        "textMode": "value",
        "wideLayout": true
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT \n  SUM(fileSize) AS cpfs_total_capacity_bytes\nFROM `$tableName` USE INDEX($indexName)\nWHERE \n  fileSize >= IF('$fileSize'='',0,$fileSize+0) * 1024\nAND \n  aTime >= $__unixMilliTimeFrom() AND aTime < $__unixMilliTimeTo()\nAND\n  TEXT_MATCH_PHRASE(file_name,'$pathFilter')",
          "refId": "A",
          "xcol": "aTime",
          "ycol": ""
        }
      ],
      "title": "Total size of matching files",
      "type": "stat"
    },
    {
      "datasource": {
        "type": "aliyun-tablestore-grafana-datasource-adapt-react",
        "uid": "fffrqt7w6setcc"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "custom": {
            "align": "left",
            "cellOptions": {
              "type": "auto"
            },
            "footer": {
              "reducers": []
            },
            "inspect": false
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": 0
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": [
          {
            "matcher": {
              "id": "byName",
              "options": "File path"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 561
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "File size"
            },
            "properties": [
              {
                "id": "custom.width",
                "value": 198
              },
              {
                "id": "unit",
                "value": "bytes"
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Creation time"
            },
            "properties": [
              {
                "id": "unit",
                "value": "dateTimeAsIso"
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Access time"
            },
            "properties": [
              {
                "id": "unit",
                "value": "dateTimeAsIso"
              }
            ]
          },
          {
            "matcher": {
              "id": "byName",
              "options": "Modification time"
            },
            "properties": [
              {
                "id": "unit",
                "value": "dateTimeAsIso"
              }
            ]
          }
        ]
      },
      "gridPos": {
        "h": 17,
        "w": 24,
        "x": 0,
        "y": 3
      },
      "id": 1,
      "options": {
        "cellHeight": "md",
        "showHeader": true,
        "sortBy": [
          {
            "desc": false,
            "displayName": "File path"
          }
        ]
      },
      "pluginVersion": "12.3.1",
      "targets": [
        {
          "datasource": {
            "type": "aliyun-tablestore-grafana-datasource-adapt-react",
            "uid": "fffrqt7w6setcc"
          },
          "formatType": "Table",
          "query": "SELECT \n  file_name AS \"File path\",\n  fileSize AS \"File size\",\n  cTime AS \"Creation time\",\n  aTime AS \"Access time\",\n  mTime AS \"Modification time\"\nFROM `$tableName` USE INDEX($indexName)\nWHERE \n  fileSize >= IF('$fileSize'='',0,$fileSize+0) * 1024\nAND \n  aTime >= $__unixMilliTimeFrom() AND aTime < $__unixMilliTimeTo()\nAND\n  TEXT_MATCH_PHRASE(file_name,'$pathFilter')\nORDER BY fileSize ASC\nLIMIT $limit",
          "refId": "A",
          "xcol": "_time",
          "ycol": ""
        }
      ],
      "title": "File query results",
      "type": "table"
    }
  ],
  "preload": false,
  "refresh": "5m",
  "schemaVersion": 42,
  "tags": [],
  "templating": {
    "list": [
      {
        "current": {
          "text": "",
          "value": ""
        },
        "label": "File size threshold (KiB)",
        "name": "fileSize",
        "options": [
          {
            "selected": true,
            "text": "",
            "value": ""
          }
        ],
        "query": "",
        "type": "textbox"
      },
      {
        "current": {
          "text": "fileset1",
          "value": "fileset1"
        },
        "label": "Directory filter",
        "name": "pathFilter",
        "options": [
          {
            "selected": true,
            "text": "fileset1",
            "value": "fileset1"
          }
        ],
        "query": "fileset1",
        "type": "textbox"
      },
      {
        "current": {
          "text": "5000",
          "value": "5000"
        },
        "label": "Display limit",
        "name": "limit",
        "options": [
          {
            "selected": false,
            "text": "10",
            "value": "10"
          },
          {
            "selected": false,
            "text": "50",
            "value": "50"
          },
          {
            "selected": false,
            "text": "100",
            "value": "100"
          },
          {
            "selected": false,
            "text": "500",
            "value": "500"
          },
          {
            "selected": false,
            "text": "1000",
            "value": "1000"
          },
          {
            "selected": true,
            "text": "5000",
            "value": "5000"
          },
          {
            "selected": false,
            "text": "10000",
            "value": "10000"
          }
        ],
        "query": "10,50,100,500,1000,5000,10000",
        "type": "custom"
      },
      {
        "current": {
          "text": "bmcpfs_test",
          "value": "bmcpfs_test"
        },
        "description": "",
        "hide": 2,
        "label": "Table name",
        "name": "tableName",
        "query": "bmcpfs_test",
        "skipUrlSync": true,
        "type": "constant"
      },
      {
        "current": {
          "text": "bmcpfs_test_idx",
          "value": "bmcpfs_test_idx"
        },
        "hide": 2,
        "name": "indexName",
        "query": "bmcpfs_test_idx",
        "skipUrlSync": true,
        "type": "constant"
      }
    ]
  },
  "time": {
    "from": "now-30d",
    "to": "now"
  },
  "timepicker": {},
  "timezone": "",
  "title": "CPFS file search & analysis",
  "uid": "cpfs-analysis",
  "version": 90
}