CPFS is widely used for high-performance computing and AI training. As file counts grow, understanding the file system's capacity distribution, data hotness, and fileset quota usage is essential for resource management. You can scan CPFS file metadata, store it in Tablestore, and visualize it through Grafana dashboards for real-time monitoring.
Architecture
The overall data flow is as follows:
-
CPFS file system: A scanning script runs on a Linux machine with the CPFS file system mounted. The script traverses specified directories to collect file metadata, including paths, sizes, access times, and modification times, as well as fileset quota information. The script first saves the scan results locally, then writes them to Tablestore, and finally packages and uploads them to OSS for archiving.
-
Tablestore: Provides persistent storage for metadata. It uses Search Index to support flexible queries based on dimensions such as time ranges, file paths, and filesets. It integrates with Grafana using SQL-mapped tables.
-
Grafana: Connects to Tablestore using the Alibaba Cloud Tablestore data source plugin. By importing a pre-built dashboard, you can visualize metrics such as file system capacity, file counts, and fileset usage.

Prerequisites
Prepare the following environment and resources.
-
Mount a CPFS file system: This example uses the CPFS Intelligent Computing Edition. Mount the root directory of the file system to the
/cpfsdirectory on a Linux server. For mounting instructions, see the CPFS documentation:-
CPFS General-purpose Edition: Mount a file system
-
CPFS Intelligent Computing Edition: Mount a CPFS file system for intelligent computing in PAI | Mount a CPFS file system for intelligent computing in ACK
-
-
Python environment: Ensure Python 3.8 or later is installed on the machine that runs the scan.
-
Tablestore instance: Create an instance in the Tablestore console and obtain the instance name and endpoint.
-
OSS bucket: Create an OSS bucket to store archived scan results, and note the bucket name and its region.
-
Grafana: Ensure Grafana is installed on the machine used for visual monitoring.
Step 1: Install dependencies
On the machine where you will run the scan, install the required Python dependency packages.
pip3 install alibabacloud-oss-v2 tablestore alibabacloud-nas20170626
These dependency packages require Python 3.8 or later. Before installing them, run python3 --version to check your Python version.
Step 2: Initialize Tablestore
Before you run the scan, you must create a table, a Search Index, and an SQL mapping table in your Tablestore instance. The init_ots.py script automates these tasks.
The script performs the following actions:
-
Table: Uses
file_name(file path) as the primary key to store file metadata. The time to live (TTL) is set to 86,400 seconds (1 day). -
Search Index: Creates an index on fields such as file size, timestamps, and fileset information to enable multidimensional queries and aggregations.
-
SQL mapping table: Creates an SQL mapping for the table, which enables the Grafana Tablestore data source plugin to query data using SQL.
Save the following script as init_ots.py.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Tablestore initialization script.
Creates a table, a Search Index, and an SQL mapping table.
"""
import sys
import argparse
from tablestore import OTSClient, TableMeta, TableOptions, ReservedThroughput, CapacityUnit
from tablestore import FieldSchema, FieldType, SearchIndexMeta, IndexSetting, AnalyzerType, SplitAnalyzerParameter
def create_table(client, table_name):
"""
Creates a table.
Primary key: file_name (String)
TTL: 86400
"""
print(f"[1/3] Creating table: {table_name}")
try:
# Define the primary key schema.
schema_of_primary_key = [('file_name', 'STRING')]
# Define the table metadata.
table_meta = TableMeta(table_name, schema_of_primary_key)
# Define the table options. Set the TTL to 86,400 seconds.
table_options = TableOptions(
time_to_live=86400,
max_version=1,
max_time_deviation=86400,
allow_update=False
)
# You must specify the reserved throughput when you create a table. The default value is 0.
reserved_throughput = ReservedThroughput(CapacityUnit(0, 0))
# Create the table.
client.create_table(table_meta, table_options, reserved_throughput)
print(f"✓ Table created: {table_name}")
return True
except Exception as e:
print(f"✗ Failed to create table: {str(e)}")
return False
def create_search_index(client, table_name, index_name):
"""
Creates a Search Index.
TTL: 86400
"""
print(f"[2/3] Creating Search Index: {index_name}")
try:
# Define the field schema.
fields = [
# file_name: Text. Uses a split analyzer with "/" as the delimiter. Case-sensitive.
FieldSchema('file_name', FieldType.TEXT, index=True,
analyzer=AnalyzerType.SPLIT,
analyzer_parameter=SplitAnalyzerParameter("/")),
# aTime: Long
FieldSchema('aTime', FieldType.LONG, index=True),
# wTime: Long
FieldSchema('wTime', FieldType.LONG, index=True),
# cluster_id: Keyword
FieldSchema('cluster_id', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
# fileSize: Long
FieldSchema('fileSize', FieldType.LONG, index=True),
# cTime: Long
FieldSchema('cTime', FieldType.LONG, index=True),
# region: Keyword
FieldSchema('region', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
# mTime: Long
FieldSchema('mTime', FieldType.LONG, index=True),
# fileset_id: Keyword
FieldSchema('fileset_id', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
# fileset_name: Keyword
FieldSchema('fileset_name', FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
# fileset_file_count_quota: Long
FieldSchema('fileset_file_count_quota', FieldType.LONG, index=True),
# fileset_size_quota: Long
FieldSchema('fileset_size_quota', FieldType.LONG, index=True),
]
# Configure the index settings.
index_setting = IndexSetting(routing_fields=['file_name'])
# Define the Search Index metadata.
index_meta = SearchIndexMeta(
fields,
index_setting=index_setting,
index_sort=None,
time_to_live=86400
)
# Create the Search Index.
client.create_search_index(table_name, index_name, index_meta)
print(f"✓ Search Index created: {index_name}")
return True
except Exception as e:
print(f"✗ Failed to create Search Index: {str(e)}")
return False
def create_mapping_table(client, table_name):
"""
Creates an SQL mapping table.
"""
print(f"[3/3] Creating SQL mapping table")
try:
# Construct the SQL statement.
sql_query = f"""CREATE TABLE `{table_name}` (
`file_name` VARCHAR(1024),
`aTime` BIGINT(20),
`wTime` BIGINT(20),
`cluster_id` MEDIUMTEXT,
`fileSize` BIGINT(20),
`cTime` BIGINT(20),
`region` MEDIUMTEXT,
`mTime` BIGINT(20),
`fileset_id` MEDIUMTEXT,
`fileset_name` MEDIUMTEXT,
`fileset_file_count_quota` BIGINT(20),
`fileset_size_quota` BIGINT(20),
PRIMARY KEY(`file_name`)
)"""
# Execute the SQL statement.
client.exe_sql_query(sql_query)
print(f"✓ SQL mapping table created")
return True
except Exception as e:
print(f"✗ Failed to create SQL mapping table: {str(e)}")
return False
def main():
"""
Main function
"""
print("=" * 60)
print("Tablestore Initialization Script")
print("=" * 60)
# Parse command-line arguments.
parser = argparse.ArgumentParser(description='Tablestore initialization script - Creates a table, a Search Index, and an SQL mapping table')
parser.add_argument('--endpoint', required=True, help='The endpoint of the Tablestore instance')
parser.add_argument('--instance-name', required=True, help='The name of the Tablestore instance')
parser.add_argument('--access-key-id', required=True, help='Your Access Key ID')
parser.add_argument('--access-key-secret', required=True, help='Your Access Key Secret')
parser.add_argument('--table-name', required=True, help='The name of the table to create')
parser.add_argument('--search-index-name', required=True, help='The name of the Search Index to create')
args = parser.parse_args()
# Print the configuration information.
print("\nConfiguration:")
print(f" Endpoint: {args.endpoint}")
print(f" Instance Name: {args.instance_name}")
print(f" Table Name: {args.table_name}")
print(f" Search Index Name: {args.search_index_name}")
print()
try:
# Initialize the Tablestore client.
print("Connecting to the Tablestore instance...")
client = OTSClient(
args.endpoint,
args.access_key_id,
args.access_key_secret,
args.instance_name
)
print("✓ Connection successful\n")
# Run the initialization steps.
success = True
# Step 1: Create the table
if not create_table(client, args.table_name):
success = False
sys.exit(1)
print()
# Step 2: Create the Search Index
if not create_search_index(client, args.table_name, args.search_index_name):
success = False
sys.exit(1)
print()
# Step 3: Create the SQL mapping table
if not create_mapping_table(client, args.table_name):
success = False
sys.exit(1)
print()
print("=" * 60)
if success:
print("✓ All initialization steps are complete!")
else:
print("✗ An error occurred during initialization")
print("=" * 60)
except Exception as e:
print(f"\n✗ Script execution failed: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
Run the following command to initialize Tablestore.
python3 init_ots.py \
--endpoint https://<instance-name>.<region-id>.ots.aliyuncs.com \
--instance-name <instance-name> \
--access-key-id <Access Key ID> \
--access-key-secret <Access Key Secret> \
--table-name bmcpfs_test \
--search-index-name bmcpfs_test_idx
-
The
--table-nameand--search-index-nameparameters use the example valuesbmcpfs_testandbmcpfs_test_idx. You can customize these values as needed. If you change these names, you must update the corresponding configurations inbmcpfs_stat_tool_config.jsonand the variables in the Grafana dashboard in later steps. -
The
--endpointparameter must use the formathttps://<instance-name>.<region-id>.ots.aliyuncs.com. You can find the endpoint on the Instance Details page in the Tablestore Console.
Step 3: Deploy scanning script
On the machine that will run the scan, create a deployment directory, such as /opt/cpfs-scanner, and populate it with the following files.
-
Create a file named
meta_stat.pyand add the following code.import argparse import concurrent import csv import fcntl import json import logging import os import re import shutil import subprocess import sys import tarfile import time from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor from datetime import datetime, timezone from logging.handlers import RotatingFileHandler from typing import Dict, Optional import alibabacloud_oss_v2 as oss import tablestore from alibabacloud_oss_v2.exceptions import OperationError from alibabacloud_nas20170626.client import Client as NAS20170626Client from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_credentials.models import Config as CredentialConfig from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_nas20170626 import models as nas20170626_models from alibabacloud_tea_util import models as util_models from tablestore import PutRowItem VERSION = "1.3a" PUT = "PUT" UPDATE = "UPDATE" IGNORE = "IGNORE" NEW_ATIME = "NEW_ATIME" AVAILABLE_OTS_COLUMN_CONDITION = [IGNORE, NEW_ATIME] AVAILABLE_OTS_WRITE_TYPE = [UPDATE, PUT] TABLESTORE_BATCH_SIZE = 200 # Set up logging. SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) LOG_FILE = os.path.join(SCRIPT_DIR, 'meta_stat.log') # Initialize the logger. logger = logging.getLogger('meta_stat') logger.setLevel(logging.DEBUG) # Console handler. console_handler = logging.StreamHandler() console_formatter = logging.Formatter('%(levelname)s: %(message)s') console_handler.setFormatter(console_formatter) console_handler.setLevel(logging.INFO) logger.addHandler(console_handler) # File handler (with rotation). file_handler = RotatingFileHandler(LOG_FILE, maxBytes=1024 * 1024 * 5, backupCount=5) file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) file_handler.setLevel(logging.DEBUG) logger.addHandler(file_handler) # Other constant definitions. PYTHON_FILE_PATH = os.path.abspath(__file__) SCRIPT_DIR = os.path.dirname(PYTHON_FILE_PATH) os.chdir(SCRIPT_DIR) # Change to the script's directory. PYTHON_PATH = sys.executable DATE = datetime.now().strftime("%Y%m%dT%H%M") SYSTEM_DIRS = [".sys_recyclebin", ".snapshots", ".sys_healthchecker", ".audit_log", ".msgq", ".fakesnapshots"] thread_executor = ThreadPoolExecutor(max_workers=10) MAX_OTS_FUTURES = 10 def read_from_json(key, filename, default=None): config_path = os.path.join(SCRIPT_DIR, filename) if not os.path.exists(config_path): if default is not None: return default raise FileNotFoundError(f"Credential file not found: {config_path}") with open(config_path, 'r') as f: try: config_json = json.load(f) except json.JSONDecodeError: raise ValueError(f"Invalid format in .meta_stat_key.json. Check if the file is valid JSON. Path: {config_path}") value = config_json.get(key) if value is None: if default is not None: return default raise ValueError( f".meta_stat_key.json is missing the required field: {key}. Make sure the file contains access_key_id and access_key_secret.") return value BASE_PATH = read_from_json("BASE_PATH", '.meta_stat_key.json') if not BASE_PATH.endswith('/'): BASE_PATH += '/' OUTPUT_DIR_PREFIX = read_from_json("OUTPUT_DIR_PREFIX", '.meta_stat_key.json') OSS_CONFIG_FILE_NAME = os.path.basename(read_from_json("OSS_KEY", '.meta_stat_key.json')) class OssTool: def __init__(self): self.region = read_from_json('OSS_REGION', '.meta_stat_key.json') self.bucket = read_from_json('OSS_BUCKET', '.meta_stat_key.json') self.key = read_from_json('OSS_KEY', '.meta_stat_key.json') self.endpoint = read_from_json('OSS_ENDPOINT', '.meta_stat_key.json') self.client = None def init_client(self): if self.client is not None: return try: credentials_provider = oss.credentials.StaticCredentialsProvider( access_key_id=read_from_json('oss_access_key_id', '.meta_stat_key.json'), access_key_secret=read_from_json('oss_access_key_secret', '.meta_stat_key.json') ) except Exception as e: raise RuntimeError(f"Failed to load the local key file. Make sure that .meta_stat_key.json exists and is correctly configured. Error: {e}") cfg = oss.config.load_default() cfg.credentials_provider = credentials_provider cfg.region = self.region if self.endpoint: cfg.endpoint = self.endpoint self.client = oss.Client(cfg) def download_file_if_newer(self): self.init_client() _file_path = os.path.join(SCRIPT_DIR, os.path.basename(self.key)) try: head_result = self.client.head_object(oss.HeadObjectRequest(bucket=self.bucket, key=self.key)) if_modified_since = self.get_local_file_mtime(_file_path) if head_result.last_modified <= if_modified_since: logger.info("Time:%s, File is up to date. Skipping download." % datetime.now()) return False result = self.client.get_object_to_file(oss.GetObjectRequest(bucket=self.bucket, key=self.key), _file_path) if result.status_code == 200: logger.info(f"File has been updated and saved to: {_file_path}") return True else: logger.warning(f"Download failed. Status code: {result.status_code}") return False except OperationError as e: error = e.kwargs.get('error') if error and error.status_code == 304: logger.info("Time:%s, File is up to date. Skipping download." % datetime.now()) return False else: logger.error(f"Request failed: {e}", exc_info=True) return False except Exception as e: logger.exception(f"An exception occurred: {e}") return False def upload_file_to_oss(self, local_file_path, oss_key): self.init_client() request = oss.PutObjectRequest( bucket=self.bucket, key=oss_key, body=open(local_file_path, 'rb') ) try: result = self.client.put_object(request) if result.status_code == 200: logger.info(f"File has been uploaded to OSS: oss://{self.bucket}/{oss_key}") return True else: logger.warning(f"Upload failed. Status code: {result.status_code}") return False except OperationError as e: error = e.kwargs.get('error') if error and error.status_code == 404: logger.error("The destination bucket or path does not exist. Check your configuration.") else: logger.error(f"Request failed: {e}", exc_info=True) return False except Exception as e: logger.exception(f"An exception occurred: {e}") return False def upload_dir_to_oss(self, new_dir_path, oss_path): # Record the start time. start_time = time.time() for file in os.listdir(new_dir_path): if file.endswith(".csv"): file_path = os.path.join(new_dir_path, file) self.upload_file_to_oss(file_path, os.path.join(oss_path, file)) logger.info(f"OSS upload complete. Time elapsed: {time.time() - start_time} seconds") @staticmethod def get_local_file_mtime(_file_path): if not os.path.exists(_file_path): return datetime.fromtimestamp(0, tz=timezone.utc) mtime = os.path.getmtime(_file_path) return datetime.fromtimestamp(mtime, tz=timezone.utc) class TablestoreWriter: def __init__(self): ots_config = read_from_json("otsConfig", OSS_CONFIG_FILE_NAME) self.table_name = ots_config.get("tableName") self.endpoint = ots_config.get("endpoint") self.instance_name = ots_config.get("instanceName") self.row_exist_exception_str = ots_config.get("rowExistenceException") self.column_condition = ots_config.get("columnCondition") self.ots_write_type = ots_config.get("writeType") self.access_key_id = read_from_json("ots_access_key_id", '.meta_stat_key.json') self.access_key_secret = read_from_json("ots_access_key_secret", '.meta_stat_key.json') self._tablestore_client = tablestore.OTSClient( self.endpoint, self.access_key_id, self.access_key_secret, self.instance_name ) def single_put(self, row: tablestore.PutRowItem) -> bool: retry_count = 20 for i in range(retry_count): try: self._tablestore_client.put_row(self.table_name, row.row) return True except Exception as e: if i == retry_count - 1: print("Failed to update single row", e) return False return True def single_update(self, row: tablestore.UpdateRowItem) -> bool: retry_count = 20 for i in range(retry_count): try: self._tablestore_client.update_row(self.table_name, row.row, row.condition) return True except Exception as e: if i == retry_count - 1: print("Failed to update single row", e) return False @staticmethod def batch_write(rows: list[tablestore.UpdateRowItem]) -> int: global ots_writer, thread_executor if ots_writer is None: ots_writer = TablestoreWriter() succeed_cnt = 0 row_items = [] cur_batch_size = 0 futures = set() for idx, updateItem in enumerate(rows): row_items.append(updateItem) cur_batch_size = cur_batch_size + 1 if cur_batch_size == TABLESTORE_BATCH_SIZE or idx == len(rows) - 1: batch_write_req = tablestore.BatchWriteRowRequest() batch_write_req.add(tablestore.TableInBatchWriteRowItem(ots_writer.table_name, list(row_items))) row_items.clear() futures.add( thread_executor.submit(TablestoreWriter.single_batch_update, batch_write_req, cur_batch_size)) cur_batch_size = 0 for future in futures: succeed_cnt += future.result() return succeed_cnt @staticmethod def single_batch_update(batch_write_req, cur_batch_size): succeed_cnt = 0 try: batch_write_row_response = ots_writer._tablestore_client.batch_write_row(batch_write_req) if not batch_write_row_response.is_all_succeed(): succeed_cnt += len(batch_write_row_response.get_succeed_of_update()) succeed_cnt += len(batch_write_row_response.get_succeed_of_put()) # Update row item. failed_update = batch_write_row_response.get_failed_of_update() for row_item in failed_update: if not row_item.is_ok and row_item.error_code != "OTSConditionCheckFail": logger.warning("Failed to update single row") single_update_item = batch_write_req.items[ots_writer.table_name].row_items[row_item.index] if ots_writer.single_update(single_update_item): succeed_cnt += 1 # Put row item. failed_put = batch_write_row_response.get_failed_of_put() for row_item in failed_put: if not row_item.is_ok: logger.warning("Failed to put single row") single_put_item = batch_write_req.items[ots_writer.table_name].row_items[row_item.index] if ots_writer.single_put(single_put_item): succeed_cnt += 1 else: succeed_cnt += cur_batch_size except Exception as e: print("batch update failed: ", e) return succeed_cnt @staticmethod def create_put_row_item(pk: list[tuple], columns: list[tuple], column_conditions: list[tablestore.ColumnCondition]) -> PutRowItem: global ots_writer if ots_writer is None: ots_writer = TablestoreWriter() if pk is None or len(pk) != 1: return None row = tablestore.Row(pk, columns) return tablestore.PutRowItem(row, tablestore.Condition(ots_writer.row_exist_exception_str, column_conditions[0])) @staticmethod def create_update_row_item(pk: list[tuple], columns: list[tuple], column_conditions: list[tablestore.ColumnCondition]) -> tablestore.UpdateRowItem: global ots_writer if ots_writer is None: ots_writer = TablestoreWriter() if pk is None: return None row = tablestore.Row(pk, {"update": columns}) if column_conditions is None or len(column_conditions) == 0: return tablestore.UpdateRowItem(row, None) elif len(column_conditions) == 1: return tablestore.UpdateRowItem(row, tablestore.Condition(ots_writer.row_exist_exception_str, column_conditions[0])) else: composite_condition = tablestore.CompositeColumnCondition(tablestore.LogicalOperator.AND) for condition in column_conditions: composite_condition.add_sub_condition(condition) row_conditions = tablestore.Condition(ots_writer.row_exist_exception_str, composite_condition) return tablestore.UpdateRowItem(row, row_conditions) def extract_pk(self, pk_fields, _dict) -> list[tuple]: ots_pk_fields = [] for field_key, field_value in pk_fields.items(): ots_pk_field = TablestoreWriter.get_ots_update_field(_dict, field_key, field_value) ots_pk_fields.append(ots_pk_field) return ots_pk_fields @staticmethod def less_than_condition(column_name: str, time_stamp: int) -> tablestore.SingleColumnCondition: return tablestore.SingleColumnCondition(column_name=column_name, column_value=time_stamp, comparator=tablestore.ComparatorType.LESS_THAN) @staticmethod def upload_dir_to_ots(new_dir_path): """ Uploads all `.ots` files from a directory to Tablestore and deletes them upon completion. Data is read from the files and written to Tablestore in batches. """ global ots_writer if ots_writer is None: ots_writer = TablestoreWriter() with ProcessPoolExecutor(max_workers=MAX_OTS_FUTURES) as executor: start_time = time.time() # Record the start time. for file in os.listdir(new_dir_path): if file.endswith(".ots"): file_path = os.path.join(new_dir_path, file) TablestoreWriter.upload_csv_to_ots(executor, file_path) # Delete the file after it has been uploaded. os.remove(file_path) logger.info(f"Deleted file: {file_path}. Time elapsed: {time.time() - start_time} seconds") logger.info(f"Tablestore upload complete. Time elapsed: {time.time() - start_time} seconds") @staticmethod def upload_csv_to_ots(executor, file_path): global ots_writer logger.info(f"Starting to upload file: {file_path}") start_time = time.time() if ots_writer is None: ots_writer = TablestoreWriter() fields = read_from_json('fields', OSS_CONFIG_FILE_NAME) fields_value_to_key = {value: key for key, value in fields.items()} pk_fields = read_from_json('pkFields', OSS_CONFIG_FILE_NAME) with open(file_path, 'r', newline='', encoding='utf-8') as f: future_set = set() timestamp_ms = int(time.time() * 1000) reader = csv.reader(f) _ = next(reader) # Skip the header row. rows = [] for idx, row in enumerate(reader): if len(row) != 6: logger.warning(f"Skipping invalid row {idx + 1}: incorrect number of fields. Row: {row}") continue path, size, inode, atime_ms, mtime_ms, ctime_ms = row try: _dict = { 'path': path, 'size': int(size), 'inode': int(inode), 'atime_ms': int(atime_ms), 'mtime_ms': int(mtime_ms), 'ctime_ms': int(ctime_ms), 'wtime_ms': int(timestamp_ms) } except ValueError as e: logger.error(f"Value conversion failed at line {idx + 1}: {e}") continue # Construct the primary key (assetId, fileName). pk = ots_writer.extract_pk(pk_fields, _dict) if not pk: logger.warning(f"Failed to extract primary key from _dict: {_dict}") continue if ots_writer.column_condition == IGNORE: conditions = [None] elif ots_writer.column_condition == NEW_ATIME: conditions = [TablestoreWriter.less_than_condition(fields_value_to_key['$atime_ms'], _dict['atime_ms'])] else: conditions = [None] if ots_writer.ots_write_type == UPDATE: # TODO: Consider making these conditions configurable in the future. ots_fields = [] for field_key, field_value in fields.items(): ots_field = TablestoreWriter.get_ots_update_field(_dict, field_key, field_value) if ots_field is not None: ots_fields.append(ots_field) update_row_item = TablestoreWriter.create_update_row_item(pk, ots_fields, conditions) if update_row_item: rows.append(update_row_item) elif ots_writer.ots_write_type == PUT: ots_fields = [] for field_key, field_value in fields.items(): ots_field = TablestoreWriter.get_ots_update_field(_dict, field_key, field_value) if ots_field is not None: ots_fields.append(ots_field) put_row_item = TablestoreWriter.create_put_row_item(pk, ots_fields, conditions) if put_row_item: rows.append(put_row_item) else: logger.error(f"Unknown Tablestore write type: {ots_writer.ots_write_type}") continue # Submit a batch every 10 * TABLESTORE_BATCH_SIZE rows. if len(rows) >= 10 * TABLESTORE_BATCH_SIZE: future_set.add(executor.submit(TablestoreWriter.batch_write, list(rows))) rows.clear() # Limit the maximum number of in-flight futures. if len(future_set) >= MAX_OTS_FUTURES: done, future_set = concurrent.futures.wait( future_set, return_when=concurrent.futures.FIRST_COMPLETED ) # Optional: Check for exceptions. for future in done: try: future.result() except Exception as e: logger.error(f"Tablestore submission failed: {e}") # Submit the remaining data. if rows: future_set.add(executor.submit(TablestoreWriter.batch_write, list(rows))) # Wait for all tasks to complete and handle exceptions. for future in as_completed(future_set): try: future.result() except Exception as e: logger.info(f"Batch failed with exception: {e}") logger.info(f"Tablestore submission complete. Time elapsed: {time.time() - start_time} seconds") @staticmethod def get_ots_update_field(_dict, field_key, field_value): global nas_client if field_value[0] != '$': return field_key, field_value else: if field_value == "$dajiang_asset_Id": # Split the path and find the matching asset ID. path = _dict['path'] parts = path.strip('/').split('/') for i in range(len(parts) - 1): # Check if the current part is 10 digits long and the previous part is its first 3 digits. if len(parts[i + 1]) == 10 and parts[i] == parts[i + 1][:3]: return field_key, parts[i + 1] return None if field_value == "$dajiang_file_name": # Split the path and find the matching asset ID. path = _dict['path'] parts = path.strip('/').split('/') for i in range(len(parts) - 1): # Check if the current part is 10 digits long and the previous part is its first 3 digits. if len(parts[i + 1]) == 10 and parts[i] == parts[i + 1][:3]: return field_key, '/'.join(parts[i + 1:]) if field_value in ["$fset_id", "$fset_name", "$fset_file_count_quota", "$fset_size_quota"]: # Get the fileset information for the file. global nas_client if nas_client is None: logger.warning("NAS client is not initialized. Cannot retrieve fileset information.") return None # Get the fsid from the configuration. fsid = read_from_json("fsid", OSS_CONFIG_FILE_NAME, None) if not fsid: logger.warning("Cannot get fsid from the configuration.") return None # Ensure the fsid is complete (includes a prefix). if not fsid.startswith('bmcpfs-') and not fsid.startswith('cpfs-'): full_fsid = f"bmcpfs-{fsid}" else: full_fsid = fsid # Get the file path and find the corresponding fileset information. file_path = _dict.get('path', '') fset_info = nas_client.get_fset_info_by_path(full_fsid, file_path) if fset_info: # Return the value based on the field type. if field_value == "$fset_id": return field_key, fset_info['fset_id'] elif field_value == "$fset_name": return field_key, fset_info['description'] elif field_value == "$fset_file_count_quota": v = fset_info['file_count_limit'] return (field_key, v) if v is not None else None elif field_value == "$fset_size_quota": v = fset_info['size_limit'] return (field_key, v) if v is not None else None else: logger.debug(f"Cannot find fileset information for the path {file_path}") return None return field_key, _dict.get(field_value[1:], field_value) class NasClient: """NAS OpenAPI client for retrieving fileset information.""" def __init__(self): self.client = None # Cache for fileset information: {fsid: [{fset_id, file_system_path, description, quota}, ...]} self.fileset_cache = {} self.last_cache_time = {} # Records the cache time for each fsid. self.cache_ttl = 3600 # Cache TTL: 1 hour. def init_client(self, access_key_id: str, access_key_secret: str, endpoint: str): """Initializes the NAS client.""" if self.client is not None: return try: credentials_config = CredentialConfig( type='access_key', access_key_id=access_key_id, access_key_secret=access_key_secret ) credentials_client = CredentialClient(credentials_config) config = open_api_models.Config( credential=credentials_client, endpoint=endpoint ) self.client = NAS20170626Client(config) logger.info("NAS client initialized successfully.") except Exception as e: logger.error(f"Failed to initialize NAS client: {e}") raise RuntimeError(f"Cannot initialize NAS client: {e}") def get_filesets(self, fsid: str) -> Optional[list]: """Gets all fileset information for a specified file system, using a cache.""" current_time = time.time() # Check if the cache exists and has not expired. if fsid in self.fileset_cache and fsid in self.last_cache_time: if current_time - self.last_cache_time[fsid] < self.cache_ttl: logger.debug(f"Using cached fileset information for: {fsid}") return self.fileset_cache[fsid] # Call the API to get fileset information. try: request = nas20170626_models.DescribeFilesetsRequest() request.file_system_id = fsid runtime = util_models.RuntimeOptions() resp = self.client.describe_filesets_with_options(request, runtime) if resp.body and resp.body.entries and resp.body.entries.entrie: filesets = [] for entry in resp.body.entries.entrie: if entry.fset_id and entry.file_system_path: # Extract quota information (can be empty). file_count_limit = None size_limit = None if entry.quota: file_count_limit = entry.quota.file_count_limit size_limit = entry.quota.size_limit filesets.append({ 'fset_id': entry.fset_id, 'file_system_path': entry.file_system_path, 'description': entry.description, 'file_count_limit': file_count_limit, 'size_limit': size_limit }) # Update the cache. self.fileset_cache[fsid] = filesets self.last_cache_time[fsid] = current_time logger.info(f"Successfully retrieved {len(filesets)} filesets for {fsid}") return filesets else: logger.warning(f"No filesets found for file system {fsid}") return [] except Exception as e: logger.error(f"Failed to get fileset information: {e}") return None def get_fset_info_by_path(self, fsid: str, file_path: str) -> Optional[dict]: """Gets the complete fileset information based on a file path.""" filesets = self.get_filesets(fsid) if filesets is None: return None # Find the best-matching fileset (longest path match). matched_fset = None matched_path_length = -1 for fset in filesets: fs_path = fset['file_system_path'] # Make sure that the file system path ends with a forward slash (/) for easier matching. if not fs_path.endswith('/'): fs_path += '/' # Check if the file path starts with the file system path. if file_path.startswith(fs_path): # Select the longest match (most specific match). if len(fs_path) > matched_path_length: matched_path_length = len(fs_path) matched_fset = fset return matched_fset oss_tool: OssTool = None ots_writer: TablestoreWriter = None nas_client: NasClient = None def run_bmstat_for_dir(mode, fsid): input_dir = mode.input_dir if not check_path_exists_with_sudo(input_dir): logger.error(f"Input directory {input_dir} does not exist") return temp_output_dir = f"{OUTPUT_DIR_PREFIX}{fsid}/" os.makedirs(temp_output_dir, exist_ok=True) # If input_dir matches /cpfs/{fsid}/. if input_dir == f"/cpfs/{fsid}/": exclude_dirs = [f"{input_dir}{d}" for d in SYSTEM_DIRS] else: exclude_dirs = [] exclude_dirs.extend(mode.exclude_dir_set) # Construct the subcommand. command = [ "sudo", PYTHON_PATH, "nas_stat_util.py", "-t", "inode_list", "-i", input_dir, "--backup-count", "0", "--output-format", "path,size,inode,atime_ms,mtime_ms,ctime_ms", "--filter-inode-type", "regular_file", "--output-without-prefix", BASE_PATH[0:len(BASE_PATH) - 1], "--use-csv-format" ] if exclude_dirs: command.extend(["--exclude-full-path-dirs", ",".join(exclude_dirs)]) if mode.ots_output_path_set: command.extend(["--ots-output-path", ",".join(mode.ots_output_path_set)]) if mode.output_path_set: command.extend(["-o", ",".join([f"{temp_output_dir}{d}" for d in mode.output_path_set])]) logger.info(f"Running command: {' '.join(command)}") try: result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info(f"Finished {mode.input_dir}: {result.stdout.decode()}") except subprocess.CalledProcessError as e: logger.error(f"Error running command for {mode.input_dir}: {e.stderr.decode()}") def update_crontab(new_cron_expr): """ Updates the crontab entry for the script's 'run' command. It replaces the schedule for an existing entry or adds a new one if it does not exist. """ try: # Get the current crontab content. result = subprocess.run(['crontab', '-l'], capture_output=True, text=True) if result.returncode not in (0, 1): # A return code of 1 is normal for an empty file. logger.error(f"Failed to read crontab: {result.stderr}") return lines = result.stdout.splitlines() updated = False new_lines = [] pattern = re.compile( r'.*{PYTHON_FILE_PATH}\s+--type\s+run.*'.format(PYTHON_FILE_PATH=re.escape(PYTHON_FILE_PATH))) for line in lines: if pattern.search(line): # If a match is found, replace the first 5 fields (the cron expression). parts = line.strip().split(maxsplit=5) if len(parts) >= 6: command_part = parts[5] new_line = f"{new_cron_expr} {command_part}" new_lines.append(new_line) updated = True else: new_lines.append(line) else: new_lines.append(line) # If no match is found, add a new line. if not updated: new_cron_line = f"{new_cron_expr} {PYTHON_PATH} {PYTHON_FILE_PATH} --type run" new_lines.append(new_cron_line) logger.info("No matching line found. A new cron job has been added.") # Write back the new crontab. new_crontab = '\n'.join(new_lines) + '\n' subprocess.run(['crontab', '-'], input=new_crontab, text=True, check=True) logger.info("Crontab updated.") except Exception as e: logger.error(f"Failed to update crontab: {e}") def remove_crontab_entry(): """ Removes the crontab entry for the script's 'run' command. """ try: result = subprocess.run(['crontab', '-l'], capture_output=True, text=True) if result.returncode not in (0, 1): # A return code of 1 is normal for an empty file. logger.error(f"Failed to read crontab: {result.stderr}") return lines = result.stdout.splitlines() new_lines = [] for line in lines: # Match the line containing PYTHON_FILE_PATH and --type run. if re.search(r'.*{PYTHON_FILE_PATH}\s+--type\s+run.*'.format(PYTHON_FILE_PATH=PYTHON_FILE_PATH), line): logger.info(f"Removing cron line: {line}") continue new_lines.append(line) # Write back the new crontab. new_crontab = '\n'.join(new_lines) + '\n' subprocess.run(['crontab', '-'], input=new_crontab, text=True, check=True) logger.info("Crontab has been cleaned up.") except Exception as e: logger.error(f"Failed to delete crontab entry: {e}") def download_only(): """ Performs the download-only operation. It checks for and downloads an updated configuration file from OSS. Typically scheduled to run frequently. """ download_success = oss_tool.download_file_if_newer() if download_success: file_path = os.path.join(SCRIPT_DIR, os.path.basename(read_from_json('OSS_KEY', '.meta_stat_key.json'))) meta_stat_config = parse_config(file_path) schedule = meta_stat_config.get('schedule') if schedule['type'] == 'cron': cron_expr = schedule.get('cron') if cron_expr: update_crontab(cron_expr) else: logger.info("Missing cron expression. Cannot update the task.") return else: # Remove the cron job entry. remove_crontab_entry() if schedule['type'] == 'once' or schedule['type'] == 'one': # Schedule the task directly. run_only() return def compress_directory(new_dir_name, output_dir): """ Compresses a directory into a .tar.gz archive using pigz. :param new_dir_name: The name of the directory to compress. :param output_dir: The output directory. :return: The path of the generated compressed file. """ base_name = os.path.basename(new_dir_name) tar_path = os.path.join(output_dir, f"{base_name}.tar") gz_path = f"{tar_path}.gz" # Step 1: Create the .tar file. with tarfile.open(tar_path, "w") as tar: tar.add(new_dir_name, arcname=base_name) # Step 2: Compress the .tar file by using pigz. try: subprocess.run(['pigz', '-f', '-v', tar_path], check=True) except subprocess.CalledProcessError as e: logger.error(f"Compression failed: {e}") return None # Step 3: Delete the original directory and .tar file. if os.path.exists(tar_path): os.remove(tar_path) logger.info(f"Deleted the original .tar file: {tar_path}") if os.path.exists(new_dir_name): shutil.rmtree(new_dir_name) logger.info(f"Deleted the original directory: {new_dir_name}") logger.info(f"Directory compression complete: {gz_path}") return gz_path def run_only(): """ Runs the main metadata scanning and uploading process. This is the core 'run' operation, which can be triggered directly or by a scheduled cron job. """ global ots_writer lock_file = os.path.join(SCRIPT_DIR, '.meta_stat.lock') try: lock_fd = os.open(lock_file, os.O_CREAT | os.O_RDWR) fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except (IOError, OSError): logger.info("Another instance is already running. Exiting.") exit(1) begin_date = datetime.now().strftime("%Y%m%dT%H%M") file_path = os.path.join(SCRIPT_DIR, os.path.basename(read_from_json('OSS_KEY', '.meta_stat_key.json'))) meta_stat_config = parse_config(file_path) # Get the directory output modes. dir_output_modes, ots_file_name_to_input_dir_dict = determine_output_modes(meta_stat_config) fsid = read_from_json("fsid", OSS_CONFIG_FILE_NAME, 'fsid') if fsid.startswith('bmcpfs-'): fsid = fsid[7:] elif fsid.startswith('cpfs-'): fsid = fsid[5:] for dir_path in dir_output_modes: mode = dir_output_modes[dir_path] run_bmstat_for_dir(mode, fsid) end_date = datetime.now().strftime("%Y%m%dT%H%M") new_dir_name = f"beg_{begin_date}_end_{end_date}_{fsid}" new_dir_path = f"{OUTPUT_DIR_PREFIX}{new_dir_name}" temp_output_dir = f"{OUTPUT_DIR_PREFIX}{fsid}/" os.rename(temp_output_dir, new_dir_path) # Upload and delete the local files. TablestoreWriter.upload_dir_to_ots(new_dir_path) # Upload to OSS. if meta_stat_config['scan_to_files']: oss_upload_type = read_from_json("ossUploadType", OSS_CONFIG_FILE_NAME, 'gz') # csv if oss_upload_type == 'csv': oss_tool.upload_dir_to_oss(new_dir_path, meta_stat_config['oss_path']) local_file_path = compress_directory(new_dir_path, OUTPUT_DIR_PREFIX) if oss_upload_type == 'gz': oss_tool.upload_file_to_oss(local_file_path, os.path.join(meta_stat_config['oss_path'], f"{new_dir_name}.tar.gz")) # Keep only the latest .tar.gz file locally and delete the others. for file in os.listdir(OUTPUT_DIR_PREFIX): if file.startswith(f"beg_") and file.endswith(".tar.gz") and file != f"{new_dir_name}.tar.gz": os.remove(f"{OUTPUT_DIR_PREFIX}{file}") def upload_only(): upload_to_tablestore_list = read_from_json('uploadToTablestore', OSS_CONFIG_FILE_NAME) for file_path in upload_to_tablestore_list: if not os.path.exists(file_path): logger.error(f"File does not exist: {file_path}") exit(1) with ProcessPoolExecutor(max_workers=MAX_OTS_FUTURES) as executor: for file_path in upload_to_tablestore_list: TablestoreWriter.upload_csv_to_ots(executor, file_path) def main(): global oss_tool, nas_client # Create a command-line argument parser. parser = argparse.ArgumentParser(description="Download file from OSS with conditional check") # Add command-line arguments. parser.add_argument('--type', choices=['download', 'run', 'upload'], required=True, help='Specify the operation type: `download` only checks and downloads the configuration; `run` executes the statistics task; `upload` executes the upload task.') # Parse the command-line arguments. args = parser.parse_args() oss_tool = OssTool() # Initialize the NAS client (if NAS-related parameters are configured). try: nas_access_key_id = read_from_json('nas_access_key_id', '.meta_stat_key.json', None) nas_access_key_secret = read_from_json('nas_access_key_secret', '.meta_stat_key.json', None) nas_endpoint = read_from_json('nas_endpoint', '.meta_stat_key.json', None) if nas_access_key_id and nas_access_key_secret and nas_endpoint: nas_client = NasClient() nas_client.init_client(nas_access_key_id, nas_access_key_secret, nas_endpoint) logger.info("NAS client initialized successfully.") else: logger.info("NAS-related parameters are not configured. The `$fset` field mapping will be unavailable.") nas_client = None except Exception as e: logger.warning(f"Failed to initialize the NAS client: {e}. The `$fset` field mapping will be unavailable.") nas_client = None if args.type == 'download': download_only() elif args.type == 'run': run_only() elif args.type == 'upload': upload_only() def determine_output_modes(meta_stat_config): scan_to_files = set(meta_stat_config['scan_to_files']) scan_to_tablestore = set(meta_stat_config['scan_to_tablestore']) all_directories = set(scan_to_files).union(set(scan_to_tablestore)) dir_output_modes = OrderedDict() ots_file_name_to_input_dir_dict = {} for dir_path in sorted(all_directories): output_mode = DirectoryOutputMode(input_dir=dir_path) dir_output_modes[dir_path] = output_mode # First, determine the required output for each task. for _item in scan_to_files: output_file_name = _item if output_file_name.startswith('/'): output_file_name = output_file_name[1:] if output_file_name.endswith('/'): output_file_name = output_file_name[:-1] output_file_name = output_file_name.replace('/', '_') + ".csv" dir_output_modes[_item].output_path_set.add(output_file_name) for _item in scan_to_tablestore: output_file_name = _item if output_file_name.startswith('/'): output_file_name = output_file_name[1:] if output_file_name.endswith('/'): output_file_name = output_file_name[:-1] output_file_name = output_file_name.replace('/', '_') + ".ots" dir_output_modes[_item].output_path_set.add(output_file_name) ots_file_name_to_input_dir_dict[output_file_name] = _item # If a directory `b` is a subdirectory of an existing task `a`, add `b` to `a`'s exclude list and add `a`'s output targets to `b`'s. for _item_b in dir_output_modes: for _item_a in dir_output_modes: if _item_b.startswith(_item_a) and _item_b != _item_a: dir_output_modes[_item_a].exclude_dir_set.add(_item_b) dir_output_modes[_item_b].output_path_set.update(dir_output_modes[_item_a].output_path_set) return dir_output_modes, ots_file_name_to_input_dir_dict class DirectoryOutputMode: def __init__(self, input_dir: str, exclude_dir_list=None, output_path_set=None, ots_output_path_set=None): self.input_dir = input_dir if exclude_dir_list is None: exclude_dir_list = set() self.exclude_dir_set = exclude_dir_list if output_path_set is None: output_path_set = set() self.output_path_set = output_path_set if ots_output_path_set is None: ots_output_path_set = set() self.ots_output_path_set = ots_output_path_set def __repr__(self): return f"DirectoryOutputMode(input_dir={self.input_dir}, exclude_dir_list={self.exclude_dir_set}, output_path_set={self.output_path_set}, ots_output_path_set={self.ots_output_path_set})" def is_valid_cron(cron_str: str) -> bool: """ Checks if the input string is a valid cron expression (5 fields). Supports *, /, -, and number combinations. Does not validate the value range. """ cron_pattern = r'^(\s*(\*|[\d\-\*/]+)(,\s*(\*|[\d\-\*/]+))*){5}$' return bool(re.match(cron_pattern, cron_str.strip())) def check_path_exists_with_sudo(path): try: result = subprocess.run( ['sudo', 'test', '-e', path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5 ) return result.returncode == 0 except Exception as e: print(f"Error checking path: {e}") return False def parse_config(config_path: str) -> Dict: """ Parses the configuration file and returns structured data. :param config_path: The path to the JSON configuration file. :return: A dictionary containing the configuration fields. """ try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) # Extract fields. fsid = config.get("fsid") scan_to_tablestore = config.get("scanToTablestore", []) ots_config = config.get("otsConfig", {}) fields = config.get("fields", {}) pk_fields = config.get("pkFields", {}) scan_to_files = config.get("scanToFiles", []) oss_path = config.get("ossPath") schedule = config.get("schedule", {}) if scan_to_files is None: scan_to_files = [] if oss_path is None: oss_path = "" if oss_path.startswith('/'): oss_path = oss_path[1:] # Basic validation. if not fsid or not isinstance(fsid, str): raise ValueError("Missing a valid fsid field") # The fsid can contain only alphanumeric characters. if fsid.startswith('bmcpfs-'): fsid = fsid[7:] elif fsid.startswith('cpfs-'): fsid = fsid[5:] if not fsid.isalnum(): raise ValueError("fsid can contain only letters and digits") if not isinstance(scan_to_files, list): raise ValueError("scanToFiles must be a list") # Update scan_to_files. concat_paths(fsid, scan_to_files) # Check if the path exists. for path in scan_to_files: if not check_path_exists_with_sudo(path): raise ValueError(f"Path {path} does not exist") if not isinstance(scan_to_tablestore, list): raise ValueError("scanToTablestore must be a list") concat_paths(fsid, scan_to_tablestore) for path in scan_to_tablestore: if not check_path_exists_with_sudo(path): raise ValueError(f"Path {path} does not exist") if not isinstance(ots_config, dict): raise ValueError("otsConfig must be a dictionary") # Validate ots_config. for key in ['instanceName', 'tableName', 'endpoint', 'rowExistenceException', 'columnCondition', 'writeType']: if key not in ots_config: raise ValueError(f"otsConfig is missing the {key} field") if not ots_config[key]: raise ValueError(f"The {key} field in otsConfig cannot be empty") if key == 'rowExistenceException': if ots_config[key] not in [tablestore.RowExistenceExpectation.IGNORE, tablestore.RowExistenceExpectation.EXPECT_EXIST]: raise ValueError(f"The {key} field in otsConfig must be IGNORE or EXPECT_EXIST") if key == 'columnCondition': if ots_config[key] not in [IGNORE, NEW_ATIME]: raise ValueError(f"The {key} field in otsConfig must be IGNORE or NEW_ATIME") if key == 'writeType': if ots_config[key] not in AVAILABLE_OTS_WRITE_TYPE: raise ValueError(f"The {key} field in otsConfig must be PUT or UPDATE") if not isinstance(fields, dict): raise ValueError("fields must be a dictionary") if not isinstance(pk_fields, dict): raise ValueError("pkFields must be a dictionary") if not isinstance(schedule, dict): raise ValueError("schedule must be a dictionary") if schedule['type'] == 'cron': # Check if it is a valid cron expression. cron_expr = schedule.get('cron') if not cron_expr: raise ValueError("The cron field cannot be empty") if not is_valid_cron(cron_expr): raise ValueError(f"Invalid cron expression: {cron_expr}") if oss_path and not oss_path.endswith('/'): oss_path += '/' return { "fsid": fsid, "scan_to_files": scan_to_files, "scan_to_tablestore": scan_to_tablestore, "ots_config": ots_config, "fields": fields, "oss_path": oss_path, "schedule": schedule, } except FileNotFoundError: raise FileNotFoundError(f"Configuration file not found: {config_path}") except json.JSONDecodeError: raise ValueError(f"Invalid configuration file format (not valid JSON): {config_path}") except Exception as e: raise RuntimeError(f"Error parsing the configuration: {e}") def concat_paths(fsid, paths): for i, path in enumerate(paths): concat_path = BASE_PATH if not concat_path.endswith('/'): concat_path += '/' if path.startswith('/'): path = path[1:] concat_path += path if not concat_path.endswith('/'): concat_path += '/' paths[i] = concat_path if __name__ == '__main__': main() -
Create a
nas_stat_util.pyfile with the following content.# coding=utf-8 import argparse import concurrent import csv import io import os import stat import sys import threading import time from concurrent import futures from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from logging.handlers import RotatingFileHandler from multiprocessing import Manager from queue import Queue from stat import S_ISREG, S_ISDIR TOOL_VERSION = "1.3" INODE_LIST = 'inode_list' DIR_TREE = 'dir_tree' ALL = 'all' REGULAR_FILE = 'regular_file' DIR = 'dir' VALID_TYPES = [INODE_LIST, DIR_TREE] VALID_FILTER_INODE_TYPES = [ALL, REGULAR_FILE, DIR] BATCH_LINES = 10000 thread_pool = None class DirectoryStats: def __init__(self, args, _stat): self.item_count = 0 self.item_size = 0 self.skipped_item_count = 0 if args is not None: if _stat is not None: if args.use_atime: self.atime = _stat.st_atime if args.use_mtime: self.mtime = _stat.st_mtime if args.use_ctime: self.ctime = _stat.st_ctime if args.use_inode: self.inode = int(_stat.st_ino) if args.use_uid: self.uid = _stat.st_uid if args.use_gid: self.gid = _stat.st_gid if args.use_all_inode_num: self.all_inode_num = 0 if args.use_all_size: self.all_size = 0 def add_item_count(self, size): self.item_count += size def add_item_size(self, size): self.item_size += size def add_skipped_item_count(self, size): self.skipped_item_count += size def add_all_inode_num(self, size): if self.all_inode_num is None: self.all_inode_num = 0 self.all_inode_num += size def add_all_size(self, size): if self.all_size is None: self.all_size = 0 self.all_size += size def __repr__(self): return (f"DirectoryStats(item_count={self.item_count}, " f"item_size={self.item_size}, skipped_item_count={self.skipped_item_count})") class DirectoryStatMap: def __init__(self): self.directory_map = {} self.inode_num = 0 self.wait_walk_dir_list = [] def get_or_create(self, directory, args, _stat): if directory not in self.directory_map: if _stat is None: try: _stat = os.lstat(directory) except: pass self.directory_map[directory] = DirectoryStats(args, _stat) return self.directory_map[directory] def add_inode_num(self, num): self.inode_num += num def add_wait_walk_dir(self, directory): self.wait_walk_dir_list.append(directory) def output_timestamp(args, timestamp, with_ms=False): if args.human_readable: return time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime(timestamp)) if with_ms: return int(timestamp * 1000) return int(timestamp) def output_bytes(args, num_bytes): if args.human_readable: for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: if abs(num_bytes) < 1024.0: return f"{num_bytes:3.1f}{unit}" num_bytes /= 1024.0 return f"{num_bytes:.1f}YB" return num_bytes def to_csv_format(text): output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL, lineterminator='') writer.writerow([text]) return output.getvalue() def format_output_line(args, item_path, _stat=None, dir_stat=None): try: output_parts = [] for field in args.output_format_list: try: if args.stat_type == INODE_LIST: if field == 'path': if args.output_without_prefix: raw_path = item_path.replace(args.output_without_prefix, '', 1) else: raw_path = item_path if args.use_csv_format: result_path = to_csv_format(raw_path) else: result_path = raw_path output_parts.append(result_path) elif field == 'size': output_parts.append( output_bytes(args, _stat.st_size if args.use_file_size else _stat.st_blocks * args.block_size)) elif field == 'raw_size': output_parts.append(_stat.st_size if args.use_file_size else _stat.st_blocks * args.block_size) elif field == 'inode': output_parts.append(_stat.st_ino) elif field == 'atime': output_parts.append(output_timestamp(args, _stat.st_atime)) elif field == 'mtime': output_parts.append(output_timestamp(args, _stat.st_mtime)) elif field == 'ctime': output_parts.append(output_timestamp(args, _stat.st_ctime)) elif field == 'atime_ms': output_parts.append(output_timestamp(args, _stat.st_atime, True)) elif field == 'mtime_ms': output_parts.append(output_timestamp(args, _stat.st_mtime, True)) elif field == 'ctime_ms': output_parts.append(output_timestamp(args, _stat.st_ctime, True)) elif field == 'uid': output_parts.append(_stat.st_uid) elif field == 'gid': output_parts.append(_stat.st_gid) elif args.stat_type == DIR_TREE: if field == 'path': if args.output_without_prefix: raw_path = item_path.replace(args.output_without_prefix, '', 1) else: raw_path = item_path if args.use_csv_format: result_path = to_csv_format(raw_path) else: result_path = raw_path output_parts.append(result_path) elif field == 'inode_num': output_parts.append(dir_stat.item_count) elif field == 'size': output_parts.append(output_bytes(args, dir_stat.item_size if ( args.use_file_size) else dir_stat.item_size * args.block_size)) elif field == 'raw_size': output_parts.append( dir_stat.item_size if args.use_file_size else dir_stat.item_size * args.block_size) elif field == 'skip_num': output_parts.append(dir_stat.skipped_item_count) elif field == 'inode': output_parts.append(dir_stat.inode) elif field == 'atime': output_parts.append(output_timestamp(args, dir_stat.atime)) elif field == 'mtime': output_parts.append(output_timestamp(args, dir_stat.mtime)) elif field == 'ctime': output_parts.append(output_timestamp(args, dir_stat.ctime)) elif field == 'atime_ms': output_parts.append(output_timestamp(args, dir_stat.atime, True)) elif field == 'mtime_ms': output_parts.append(output_timestamp(args, dir_stat.mtime, True)) elif field == 'ctime_ms': output_parts.append(output_timestamp(args, dir_stat.ctime, True)) elif field == 'uid': output_parts.append(dir_stat.uid) elif field == 'gid': output_parts.append(dir_stat.gid) elif field == 'all_inode_num': output_parts.append(dir_stat.all_inode_num) elif field == 'all_size': output_parts.append(output_bytes(args, dir_stat.all_size if ( args.use_file_size) else dir_stat.all_size * args.block_size)) except Exception as e: print(f"Error formatting output field: {e}") output_parts.append('') if output_parts: output_parts = [str(part) for part in output_parts] return ','.join(output_parts) + '\n' return '' except Exception as e: print(f"Error formatting output line: {e}") return '' def inode_stat_print(args, local_dir_stat_map, item_path=None, _stat=None, thread_local_output_lines=None): if thread_local_output_lines is None: thread_local_output_lines = [] if args.stat_type == INODE_LIST: thread_local_output_lines.append(format_output_line(args, item_path, _stat=_stat)) if len(thread_local_output_lines) >= BATCH_LINES: output_stat_lines(args, local_dir_stat_map, thread_local_output_lines) def dir_stat_map_print(args, dir_stat_map, output_handle, print_elapsed_time=False, clear_screen=False): try: if clear_screen: os.system('cls' if os.name == 'nt' else 'clear') function_local_output_lines = [] for iter_dir_path, iter_dir_stat in sorted(dir_stat_map.directory_map.items()): function_local_output_lines.append(format_output_line(args, iter_dir_path, dir_stat=iter_dir_stat)) if len(function_local_output_lines) >= BATCH_LINES: with args.m_output_file_lock: output_handle.writelines(function_local_output_lines) function_local_output_lines.clear() with args.m_output_file_lock: output_handle.writelines(function_local_output_lines) function_local_output_lines.clear() if print_elapsed_time: end_time = time.time() elapsed_time = end_time - args.start_time output_handle.write(f"Elapsed time: {elapsed_time} seconds\n") except Exception as e: print(f"Error printing directory statistics: {e}") def output_total_line_print(args, dir_stat_map, output_handle, print_elapsed_time=False, clear_screen=False): if clear_screen: os.system('cls' if os.name == 'nt' else 'clear') with args.m_output_file_lock: output_handle.write(f"Total lines output: {dir_stat_map.inode_num}\n") if print_elapsed_time: end_time = time.time() elapsed_time = end_time - args.start_time output_handle.write(f"Elapsed time: {elapsed_time} seconds\n") def stat_print_at_runtime(args, dir_stat_map, output_handle, print_elapsed_time=False): time.sleep(args.print_process_time) while not args.stat_finish: if args.stat_type == DIR_TREE: dir_stat_map_print(args, dir_stat_map, output_handle, print_elapsed_time, clear_screen=True) elif args.stat_type == INODE_LIST: output_total_line_print(args, dir_stat_map, output_handle, print_elapsed_time, clear_screen=True) else: break time.sleep(args.print_process_time) def filter_inode(args, _stat): if args.min_size and _stat.st_size < args.min_size: return False if args.max_size and _stat.st_size > args.max_size: return False if args.atime and _stat.st_atime > args.atime: return False if args.atime_after and _stat.st_atime < args.atime_after: return False if args.mtime and _stat.st_mtime > args.mtime: return False if args.mtime_after and _stat.st_mtime < args.mtime_after: return False if args.ctime and _stat.st_ctime > args.ctime: return False if args.ctime_after and _stat.st_ctime < args.ctime_after: return False if args.filter_inode_type != ALL: if args.filter_inode_type == REGULAR_FILE and not S_ISREG(_stat.st_mode): return False if args.filter_inode_type == DIR and not S_ISDIR(_stat.st_mode): return False return True class CustomArgumentParser(argparse.ArgumentParser): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.common_group = self.add_argument_group('Common arguments') self.common_group.add_argument("-t", "--type", dest="stat_type", required=True, choices=VALID_TYPES, help="The type of statistics to generate. Supported types: " + ','.join(VALID_TYPES)) self.common_group.add_argument("-i", "--input", dest="input", required=True, help="A comma-separated list of input directories to scan.") self.common_group.add_argument("--input-from-file", dest="input_from_file", action="store_true", help="Read input directories from a file, with one directory path per line.") self.common_group.add_argument("-o", "--output-path", dest="output_path", default="/tmp/nas_stat_util.output", help="The path to the output file. Multiple paths can be specified as a comma-separated list.") self.common_group.add_argument("--human", dest="human_readable", action="store_true", help="Print sizes in a human-readable format (e.g., 1K, 2M, 3G).") self.common_group.add_argument("--processes", dest="processes", type=int, default=10, help="The number of concurrent processes for scanning.") self.common_group.add_argument("--threads", dest="threads", type=int, default=10, help="The number of concurrent threads per process.") self.common_group.add_argument("--backup-count", dest="backup_count", type=int, default=9, help="The number of old output files to retain as backups.") self.common_group.add_argument("--print-process-time", dest="print_process_time", type=int, default=0, help="The interval in seconds to print progress. Set to 0 to disable.") self.common_group.add_argument("--exclude-dirs", dest="exclude_dirs", help="A comma-separated list of directory names to exclude.") self.common_group.add_argument("--exclude-full-path-dirs", dest="exclude_full_path_dirs", help="A comma-separated list of full directory paths to exclude.") self.common_group.add_argument("--output-format", dest="output_format", help=('Specifies the output fields as a comma-separated list. ' 'Available fields for inode_list: path,size,raw_size,inode,atime,mtime,ctime,atime_ms,mtime_ms,ctime_ms,uid,gid (default: path,size). ' 'Available fields for dir_tree: path,inode_num,size,raw_size,skip_num,inode,atime,mtime,ctime,atime_ms,mtime_ms,ctime_ms,uid,gid,all_inode_num,all_size (default: path,inode_num,size,skip_num).') ) self.common_group.add_argument("--output-without-head", dest="output_without_header", action='store_true', help="Omit the header from the output file.") self.common_group.add_argument("--output-without-prefix", dest="output_without_prefix", default='', help="A prefix to remove from all output paths.") self.common_group.add_argument("--use-file-size", dest="use_file_size", action='store_true', help="Use file size for calculations instead of disk usage.") self.common_group.add_argument("--use-csv-format", dest="use_csv_format", action='store_true', help="Enclose the path field in quotes, following CSV format.") self.common_group.add_argument("--block-size", dest="block_size", type=int, default=512, help="The block size for disk usage calculations, in bytes. Default: 512.") self.common_group.add_argument("-s", "--min-size", dest="min_size", type=int, help="Minimum file size in bytes. Files smaller than this will be excluded.") self.common_group.add_argument("--max-size", dest="max_size", type=int, help="Maximum file size in bytes. Files larger than this will be excluded.") self.common_group.add_argument("-a", "--atime", dest="atime", type=int, help="Includes files accessed on or before this Unix timestamp.") self.common_group.add_argument("--atime-after", dest="atime_after", type=int, help="Includes files accessed on or after this Unix timestamp.") self.common_group.add_argument("-m", "--mtime", dest="mtime", type=int, help="Includes files modified on or before this Unix timestamp.") self.common_group.add_argument("--mtime-after", dest="mtime_after", type=int, help="Includes files modified on or after this Unix timestamp.") self.common_group.add_argument("-c", "--ctime", dest="ctime", type=int, help="Includes files with an inode change on or before this Unix timestamp.") self.common_group.add_argument("--ctime-after", dest="ctime_after", type=int, help="Includes files with an inode change on or after this Unix timestamp.") self.common_group.add_argument("--filter-inode-type", dest="filter_inode_type", choices=VALID_FILTER_INODE_TYPES, default=ALL, help=f"Filter by inode type. Supported: {VALID_FILTER_INODE_TYPES}. Default: all.") self.type_dir_group = self.add_argument_group('Directory Tree Type Arguments') self.type_dir_group.add_argument("-d", "--depth", dest="depth", type=int, default=2, help="The maximum directory depth to report in directory tree mode.") self.type_file_group = self.add_argument_group('Inode List Type Arguments') def add_dir_stat_map(args, sum_dir_stat_map, local_dir_stat_map): if sum_dir_stat_map is None or local_dir_stat_map is None: return if local_dir_stat_map.directory_map is not None: for iter_dir_path, iter_dir_stat in local_dir_stat_map.directory_map.items(): if iter_dir_path not in sum_dir_stat_map.directory_map: sum_dir_stat_map.directory_map[iter_dir_path] = iter_dir_stat else: dir_stat = sum_dir_stat_map.directory_map[iter_dir_path] dir_stat.add_item_count(iter_dir_stat.item_count) dir_stat.add_item_size(iter_dir_stat.item_size) dir_stat.add_skipped_item_count(iter_dir_stat.skipped_item_count) if args.use_all_inode_num: dir_stat.add_all_inode_num(iter_dir_stat.all_inode_num) if args.use_all_size: dir_stat.add_all_size(iter_dir_stat.all_size) if local_dir_stat_map.inode_num is not None: sum_dir_stat_map.add_inode_num(local_dir_stat_map.inode_num) if local_dir_stat_map.wait_walk_dir_list is not None: sum_dir_stat_map.wait_walk_dir_list.extend(local_dir_stat_map.wait_walk_dir_list) def stat_process(args, dir_list): global thread_pool if not thread_pool: thread_pool = ThreadPoolExecutor(max_workers=args.threads) args.thread_lock = threading.Lock() process_dir_stat_map = DirectoryStatMap() args.process_wait_queue_size = args.m_process_wait_queue.qsize() args.thread_future_set = set() args.thread_wait_queue = Queue() for dir_path in dir_list: future = thread_pool.submit(stat_walk, args, dir_path, True, None, time.perf_counter()) args.thread_future_set.add(future) while len(args.thread_future_set) > 0 or not args.thread_wait_queue.empty(): while not args.thread_wait_queue.empty(): if args.m_process_wait_queue.qsize() < args.max_process_queue_size: args.m_process_wait_queue.put(args.thread_wait_queue.get()) elif len(args.thread_future_set) < args.max_thread_queue_size: future = thread_pool.submit(stat_walk, args, args.thread_wait_queue.get(), True, None, time.perf_counter()) args.thread_future_set.add(future) else: break args.process_wait_queue_size = args.m_process_wait_queue.qsize() try: for future in futures.as_completed(args.thread_future_set, timeout=1): thread_dir_stat_map = future.result() add_dir_stat_map(args, process_dir_stat_map, thread_dir_stat_map) args.thread_future_set.remove(future) except concurrent.futures.TimeoutError: pass except Exception as e: print(f"Error in worker thread: {e}") args.thread_future_set.remove(future) return process_dir_stat_map def stat_walk(args, top, need_output, thread_local_output_lines, start_time): local_dir_stat_map = DirectoryStatMap() # Each thread maintains its own output list. if thread_local_output_lines is None: thread_local_output_lines = [] try: try: scandir_it = os.scandir(top) except OSError as error: add_skip_to_dir_stat_map(args, local_dir_stat_map, error, top) return local_dir_stat_map with scandir_it: while True: try: try: entry = next(scandir_it) except StopIteration: break except OSError as error: add_skip_to_dir_stat_map(args, local_dir_stat_map, error, top) break try: is_dir = entry.is_dir() except OSError: is_dir = False path = os.path.join(top, entry.name) if is_dir: if entry.name in args.exclude_dir_set: continue if not path.endswith('/'): path += '/' if path in args.exclude_full_path_dir_set: continue my_stat(args, local_dir_stat_map, thread_local_output_lines, path) if entry.is_symlink(): continue if time.perf_counter() - start_time > 1 and args.thread_wait_queue.qsize() < args.max_thread_queue_size - len( args.thread_future_set) + args.max_process_queue_size - args.process_wait_queue_size: args.thread_wait_queue.put(path) else: _local_dir_stat_map = stat_walk(args, path, False, thread_local_output_lines, start_time) add_dir_stat_map(args, local_dir_stat_map, _local_dir_stat_map) else: my_stat(args, local_dir_stat_map, thread_local_output_lines, path) if need_output and args.stat_type == INODE_LIST: output_stat_lines(args, local_dir_stat_map, thread_local_output_lines) return local_dir_stat_map except Exception as error: add_skip_to_dir_stat_map(args, local_dir_stat_map, error, top) return local_dir_stat_map def output_stat_lines(args, local_dir_stat_map, thread_local_output_lines): with args.thread_lock: with args.m_output_file_lock: for output_path in args.output_path_list: with open(output_path, 'a') as output_handle: output_handle.writelines(thread_local_output_lines) local_dir_stat_map.add_inode_num(len(thread_local_output_lines)) thread_local_output_lines.clear() def my_stat(args, local_dir_stat_map, thread_local_output_lines, item_path): try: _stat = os.lstat(item_path) if not args.need_filter or filter_inode(args, _stat): if args.stat_type == DIR_TREE: add_inode_to_dir_stat_map(args, local_dir_stat_map, item_path, _stat) # Record directory information. if stat.S_ISDIR(_stat.st_mode) and get_depth_from_str(item_path) <= args.min_key_depth + args.depth: local_dir_stat_map.get_or_create(item_path, args, _stat) elif args.stat_type == INODE_LIST: if filter_inode(args, _stat): inode_stat_print(args, local_dir_stat_map, item_path, _stat, thread_local_output_lines) if args.use_all_inode_num or args.use_all_size: if args.stat_type == DIR_TREE: add_inode_to_dir_stat_map(args, local_dir_stat_map, item_path, _stat, all_mode=True) # Record directory information. if stat.S_ISDIR(_stat.st_mode) and get_depth_from_str(item_path) <= args.min_key_depth + args.depth: local_dir_stat_map.get_or_create(item_path, args, _stat) except Exception as error: add_skip_to_dir_stat_map(args, local_dir_stat_map, error, item_path) def roll_output_file(args): for output_path in args.output_path_list: need_roll = os.path.isfile(output_path) if args.backup_count > 0 and need_roll: file_handler = RotatingFileHandler(output_path, backupCount=args.backup_count) file_handler.doRollover() def validate_input_dirs(args): directories = [] if args.input_from_file: with open(args.input, 'r') as file: for line in file: line = line.strip() if line: directories.append(line) else: directories.extend([_dir.strip() for _dir in args.input.split(",")]) # Check if each directory exists. for index, directory in enumerate(directories): if not directory.endswith('/'): directory += '/' directories[index] = directory if not os.path.isdir(directory): raise argparse.ArgumentTypeError(f"Directory '{directory}' does not exist.") return directories def check_and_init_args(args): args.output_path_list = args.output_path.split(",") args.min_key_depth = None if args.stat_type == DIR_TREE or args.stat_type == INODE_LIST: args.input_dirs = validate_input_dirs(args) for _dir in args.input_dirs: if args.min_key_depth is None: args.min_key_depth = get_depth_from_str(_dir) else: args.min_key_depth = min(args.min_key_depth, get_depth_from_str(_dir)) args.exclude_dir_set = frozenset() if args.exclude_dirs is not None: args.exclude_dir_set = frozenset(args.exclude_dirs.split(",")) args.exclude_full_path_dir_set = frozenset() if args.exclude_full_path_dirs is not None: exclude_full_path_dir_list = [] for path in args.exclude_full_path_dirs.split(","): if not path.endswith('/'): path += '/' exclude_full_path_dir_list += [path] args.exclude_full_path_dir_set = frozenset(exclude_full_path_dir_list) args.max_process_queue_size = 100 * args.processes args.max_thread_queue_size = 100 * args.threads if args.output_format is None: if args.stat_type == DIR_TREE: args.output_format_list = ['path', 'inode_num', 'size', 'skip_num'] elif args.stat_type == INODE_LIST: args.output_format_list = ['path', 'size'] else: args.output_format_list = args.output_format.split(',') args.use_atime = "atime" in args.output_format_list or "atime_ms" in args.output_format_list args.use_mtime = "mtime" in args.output_format_list or "mtime_ms" in args.output_format_list args.use_ctime = "ctime" in args.output_format_list or "ctime_ms" in args.output_format_list args.use_inode = "inode" in args.output_format_list args.use_uid = "uid" in args.output_format_list args.use_gid = "gid" in args.output_format_list args.use_all_inode_num = "all_inode_num" in args.output_format_list args.use_all_size = "all_size" in args.output_format_list args.need_filter = (args.min_size is not None or args.max_size is not None or args.atime is not None or args.atime_after is not None or args.ctime is not None or args.ctime_after is not None or args.mtime is not None or args.mtime_after is not None or args.filter_inode_type is not None) # / -> depth 0 # /a/ -> depth 1 # /a.txt -> depth 1 # /a/b/ -> depth 2 # /a/b.txt -> depth 2 def get_depth_from_str(file_path): if file_path.endswith('/'): return file_path.count('/') - 1 return file_path.count('/') def get_depth_from_list(file_path_list): if file_path_list[-1] == '': return len(file_path_list) - 2 return len(file_path_list) - 1 def add_inode_to_dir_stat_map(args, input_dir_stat_map, inode_path, _stat, all_mode=False): inode_size = _stat.st_size if args.use_file_size else _stat.st_blocks base_depth = args.min_key_depth file_path_list = inode_path.split('/') file_depth = get_depth_from_list(file_path_list) dir_path = '' for i in range(0, min(1 + base_depth + args.depth, file_depth)): dir_path += file_path_list[i] + '/' if i >= base_depth: dir_stat = input_dir_stat_map.get_or_create(dir_path, args, _stat) if all_mode: dir_stat.add_all_inode_num(1) dir_stat.add_all_size(inode_size) else: dir_stat.add_item_count(1) dir_stat.add_item_size(inode_size) def add_skip_to_dir_stat_map(args, input_dir_stat_map, error, inode_path): print(f"Skipping path due to an error. Path: {inode_path}, Error: {error}") file_path_list = inode_path.split('/') file_depth = get_depth_from_list(file_path_list) base_depth = args.min_key_depth if inode_path.endswith('/'): file_depth += 1 dir_path = '' for i in range(0, min(1 + base_depth + args.depth, file_depth)): dir_path += file_path_list[i] + '/' if i >= base_depth: dir_stat = input_dir_stat_map.get_or_create(dir_path, args, None) dir_stat.add_skipped_item_count(1) def chunks(lst, n): for i in range(0, len(lst), n): yield lst[i:i + n] def stat_func(args): args.stat_finish = False dir_stat_map = DirectoryStatMap() args.start_time = time.time() with Manager() as manager: args.m_output_file_lock = manager.Lock() args.m_process_wait_queue = manager.Queue() args.add_wait_flag = False process_future_set = set() try: if args.print_process_time > 0: threading.Thread(target=stat_print_at_runtime, args=(args, dir_stat_map, sys.stdout, True)).start() if args.stat_type == DIR_TREE or args.stat_type == INODE_LIST: roll_output_file(args) init_file_header(args) with ProcessPoolExecutor(max_workers=args.processes) as walk_executor: input_dir_chunks = list(chunks(args.input_dirs, args.max_thread_queue_size)) for input_dir_chunk in input_dir_chunks: future = walk_executor.submit(stat_process, args, input_dir_chunk) process_future_set.add(future) while len(process_future_set) > 0 or not args.m_process_wait_queue.empty(): while not args.m_process_wait_queue.empty(): if args.max_process_queue_size <= len(process_future_set): break _dir = args.m_process_wait_queue.get() future = walk_executor.submit(stat_process, args, [_dir]) process_future_set.add(future) try: for future in futures.as_completed(process_future_set, timeout=1): local_dir_stat_map = future.result() add_dir_stat_map(args, dir_stat_map, local_dir_stat_map) process_future_set.remove(future) except concurrent.futures.TimeoutError: pass except Exception as e: print(f"Error in worker process: {e}") process_future_set.remove(future) args.stat_finish = True if args.stat_type == DIR_TREE: for output_path in args.output_path_list: with open(output_path, 'a+') as output_file: dir_stat_map_print(args, dir_stat_map, output_file) dir_stat_map_print(args, dir_stat_map, sys.stdout, print_elapsed_time=True, clear_screen=(args.print_process_time > 0)) elif args.stat_type == INODE_LIST: output_total_line_print(args, dir_stat_map, sys.stdout, print_elapsed_time=True, clear_screen=(args.print_process_time > 0)) except Exception as e: print(f"Error in stat_func: {e}") args.stat_finish = True def init_file_header(args): for output_path in args.output_path_list: file_exists = os.path.isfile(output_path) file_empty = not file_exists or os.path.getsize(output_path) == 0 with open(output_path, 'a+') as output_file: with args.m_output_file_lock: if args.output_without_header: continue if file_empty: output_file.write(','.join(args.output_format_list) + '\n') output_file.flush() def main(): parser = CustomArgumentParser(description='Scans and collects metadata statistics for NAS file systems.') args = parser.parse_args() check_and_init_args(args) stat_func(args) if __name__ == "__main__": main() -
Configure the
.meta_stat_key.jsonfile, which stores the keys and parameters for accessing various cloud services.{ "oss_access_key_id": "<AccessKey ID>", "oss_access_key_secret": "<AccessKey Secret>", "ots_access_key_id": "<AccessKey ID>", "ots_access_key_secret": "<AccessKey Secret>", "BASE_PATH": "/cpfs", "OUTPUT_DIR_PREFIX": "/opt/cpfs-scanner/bmstat/", "OSS_REGION": "<region-id>", "OSS_BUCKET": "bmcpfs-scan", "OSS_KEY": "bmcpfs_stat_tool_config.json", "OSS_ENDPOINT": "oss-<region-id>-internal.aliyuncs.com", "nas_access_key_id": "<AccessKey ID>", "nas_access_key_secret": "<AccessKey Secret>", "nas_endpoint": "nas-vpc.<region-id>.aliyuncs.com" }The following table describes each configuration item.
Parameter
Description
oss_access_key_id/oss_access_key_secretThe AccessKey for accessing OSS. This key must have read and write permissions on the target bucket.
ots_access_key_id/ots_access_key_secretThe AccessKey for accessing Tablestore. This key must have read and write permissions on the target instance.
BASE_PATHThe mount path of the CPFS file system. For example,
/cpfs.OUTPUT_DIR_PREFIXThe local output directory for scan result files. Ensure you have write permissions on this directory.
OSS_REGIONThe region of the OSS bucket.
OSS_BUCKETThe name of the OSS bucket for archived scan result files.
OSS_KEYThe name of the scan behavior configuration file. This name is used to read the configuration locally and is also the object name of the file in OSS (for the
--type downloadmode).OSS_ENDPOINTThe OSS endpoint. If your server supports internal network access, use an internal endpoint to reduce latency and data transfer costs.
nas_access_key_id/nas_access_key_secretThe AccessKey for querying fileset information via the NAS API.
nas_endpointThe endpoint for the NAS API.
-
Configure the
bmcpfs_stat_tool_config.jsonfile. This file defines the scan scope, field mappings, and Tablestore write parameters.{ "fsid": "<file-system-id>", "ossPath": "/", "scanToFiles": [ ], "scanToTablestore": [ "/fileset1", "/fileset2" ], "fields": { "aTime": "$atime_ms", "cTime": "$ctime_ms", "mTime": "$mtime_ms", "fileSize": "$size", "cluster_id": "cluster_id", "fileset_id": "$fset_id", "fileset_name": "$fset_name", "fileset_file_count_quota": "$fset_file_count_quota", "fileset_size_quota": "$fset_size_quota", "wTime": "$wtime_ms", "region": "region-id" }, "pkFields": { "file_name": "$path" }, "otsConfig": { "instanceName": "<instance-name>", "tableName": "bmcpfs_test", "endpoint": "https://<instance-name>.<region-id>.vpc.tablestore.aliyuncs.com", "rowExistenceException": "IGNORE", "columnCondition": "IGNORE", "writeType": "PUT" }, "schedule": { "type": "once", "cron": "0 * * * *" } }The following table describes each configuration item.
Parameter
Description
fsid(Required)The CPFS file system ID. You can obtain this ID from the CPFS console.
scanToTablestore(Required)Specifies the directories to scan and write to Tablestore. These paths are relative to the
BASE_PATHparameter.otsConfig.instanceName(Required)The Tablestore instance name.
otsConfig.tableName(Required)The Tablestore table name. This must match the table name that you created in Step 2.
otsConfig.endpoint(Required)The Tablestore VPC endpoint, in the format
https://<instance-name>.<region-id>.vpc.tablestore.aliyuncs.com. If your server cannot access the VPC endpoint, use a public endpoint.fieldsThe mapping from scanned fields to Tablestore column names. Values that start with
$are CPFS metadata variables.pkFieldsThe primary key field mapping.
file_namemaps to the full path of the file.cluster_idThe cluster ID. You can customize this value.
regionThe region. You can customize this value.
schedule.typeThe scheduling type. The value
onceindicates a one-time execution.schedule.cronThe Cron expression. This parameter applies when
typeis configured for scheduled execution.NotePaths specified in
scanToFilesandscanToTablestoreare relative to theBASE_PATHdefined in the.meta_stat_key.jsonfile. For example, ifBASE_PATHis/cpfs, the path/fileset1corresponds to the actual scan path/cpfs/fileset1. -
Make the scanning scripts executable.
chmod 755 meta_stat.py nas_stat_util.py
Step 4: Run the scan
Run the following command to start the scan. The tool reads the local configuration, scans the specified directories, writes metadata to Tablestore, and uploads packaged results to OSS.
python3 meta_stat.py --type run
The --type parameter supports the following three modes.
|
Value |
Description |
|
|
Performs a full execution: reads the local configuration file, scans CPFS metadata, writes it to Tablestore, and uploads the packaged scan results to OSS. |
|
|
Downloads the latest |
|
|
Only uploads existing local scan results to Tablestore without running a new scan. |
To run scans regularly, configure a scheduled task in crontab. Run the following command to open the crontab file:
crontab -e
Add a scheduled task rule at the end of the file. For example, to run a scan at midnight every day, add the following line:
0 0 * * * /usr/bin/python3 /opt/cpfs-scanner/meta_stat.py --type run >> /opt/cpfs-scanner/cron.log 2>&1
The >> /opt/cpfs-scanner/cron.log 2>&1 part redirects standard output and standard error to a log file for troubleshooting. Adjust the Python interpreter and script paths to match your environment.
Step 5: Configure Grafana display
1. Install Tablestore data source plugin
-
Upload aliyun-tablestore-grafana-datasource-adapt-react.zip to the Grafana plugins directory, such as
/var/lib/grafana/plugins. -
Unzip the plugin file.
unzip aliyun-tablestore-grafana-datasource-adapt-react.zip -d /var/lib/grafana/plugins/ -
Modify the Grafana configuration file to allow loading unsigned plugins.
vi /etc/grafana/grafana.iniFind the
allow_loading_unsigned_pluginsparameter and set it to the plugin name.allow_loading_unsigned_plugins = aliyun-tablestore-grafana-datasource-adapt-reactNoteThis configuration option is commented out by default with a
;at the beginning of the line, and you must remove the leading;when you modify it for the configuration to take effect. -
Restart the Grafana service to apply the changes.
systemctl restart grafana-server
2. Add a data source
-
Log in to Grafana. Go to Connections > Data sources, then click Add new data source.
-
In the search box, enter
aliyun-tablestoreand select the aliyun-tablestore-grafana-datasource-adapt-react type. -
Configure the following connection parameters.
Parameter
Description
Endpoint
The endpoint of the Tablestore instance. The format is
https://<instance-name>.<region-id>.ots.aliyuncs.com.Instance
The name of the Tablestore instance.
AccessKey ID
The AccessKey ID of your Alibaba Cloud account or RAM user.
AccessKey Secret
The AccessKey Secret of your Alibaba Cloud account or RAM user.
NoteThe Endpoint parameter supports both public and VPC addresses. If your Grafana server and Tablestore instance are in the same VPC and region, use the VPC address (format:
https://<instance-name>.<region-id>.vpc.tablestore.aliyuncs.com) to reduce latency and avoid public network traffic fees. Otherwise, use the public address. -
Click Save & test to verify the connection.
3. Import a dashboard
-
Before importing, open the dashboard JSON file and set the
datasourceuidto the UID of your data source. You can find this UID in the URL on the data source settings page. The following code is an example of a file overview JSON. For more sample templates, see Grafana Dashboard Example Templates.{ "annotations": { "list": [ { "builtIn": 1, "datasource": { "type": "grafana", "uid": "-- Grafana --" }, "enable": true, "hide": true, "iconColor": "rgba(0, 211, 255, 1)", "name": "Annotations & Alerts", "type": "dashboard" } ] }, "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, "id": 1, "links": [ { "asDropdown": false, "icon": "external link", "includeVars": false, "keepTime": false, "tags": [], "targetBlank": false, "title": "New link", "tooltip": "", "type": "dashboards", "url": "" } ], "panels": [ { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "description": "", "fieldConfig": { "defaults": { "color": { "mode": "continuous-GrYlRd" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ { "color": "green", "value": 0 }, { "color": "red", "value": 80 } ] }, "unit": "bytes" }, "overrides": [] }, "gridPos": { "h": 4, "w": 6, "x": 0, "y": 0 }, "id": 3, "options": { "colorMode": "background", "graphMode": "none", "justifyMode": "auto", "orientation": "horizontal", "percentChangeColorMode": "standard", "reduceOptions": { "calcs": [], "fields": "", "values": false }, "showPercentChange": false, "textMode": "auto", "textStyle": { "fontSize": "40px", "fontWeight": "bold" }, "wideLayout": true }, "pluginVersion": "12.3.1", "targets": [ { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "formatType": "Table", "query": "SELECT SUM(fileSize) AS totalSize FROM `$tableName` USE INDEX($indexName)", "refId": "A", "xcol": "", "ycol": "" } ], "title": "Total Capacity Usage", "transformations": [ { "id": "prepareTimeSeries", "options": {} } ], "type": "stat" }, { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "description": "", "fieldConfig": { "defaults": { "color": { "mode": "continuous-RdYlGr" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ { "color": "green", "value": 0 }, { "color": "red", "value": 80 } ] }, "unit": "none" }, "overrides": [] }, "gridPos": { "h": 4, "w": 6, "x": 6, "y": 0 }, "id": 4, "options": { "colorMode": "background", "graphMode": "area", "justifyMode": "auto", "orientation": "horizontal", "percentChangeColorMode": "standard", "reduceOptions": { "calcs": [], "fields": "", "values": false }, "showPercentChange": false, "textMode": "auto", "textStyle": { "fontSize": "40px", "fontWeight": "bold" }, "wideLayout": true }, "pluginVersion": "12.3.1", "targets": [ { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "formatType": "Table", "query": "SELECT COUNT(*) AS file_count FROM `$tableName` USE INDEX($indexName)", "refId": "A", "xcol": "_time", "ycol": "" } ], "title": "Total File Count", "type": "stat" }, { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "description": "", "fieldConfig": { "defaults": { "color": { "fixedColor": "orange", "mode": "shades" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ { "color": "green", "value": 0 }, { "color": "red", "value": 80 } ] }, "unit": "none" }, "overrides": [] }, "gridPos": { "h": 4, "w": 5, "x": 12, "y": 0 }, "id": 5, "options": { "colorMode": "background", "graphMode": "area", "justifyMode": "auto", "orientation": "horizontal", "percentChangeColorMode": "standard", "reduceOptions": { "calcs": [ "lastNotNull" ], "fields": "", "values": false }, "showPercentChange": false, "textMode": "auto", "textStyle": { "fontSize": "40px", "fontWeight": "bold" }, "wideLayout": true }, "pluginVersion": "12.3.1", "targets": [ { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "formatType": "Table", "query": "SELECT COUNT(DISTINCT region) AS regions_count FROM `$tableName` USE INDEX($indexName)", "refId": "A", "xcol": "_time", "ycol": "" } ], "title": "Regions Covered", "type": "stat" }, { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "description": "", "fieldConfig": { "defaults": { "color": { "fixedColor": "purple", "mode": "shades" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ { "color": "green", "value": 0 }, { "color": "red", "value": 80 } ] }, "unit": "none" }, "overrides": [] }, "gridPos": { "h": 4, "w": 7, "x": 17, "y": 0 }, "id": 6, "options": { "colorMode": "background", "graphMode": "none", "justifyMode": "auto", "orientation": "horizontal", "percentChangeColorMode": "standard", "reduceOptions": { "calcs": [ "lastNotNull" ], "fields": "", "values": false }, "showPercentChange": false, "textMode": "auto", "textStyle": { "fontSize": "40px", "fontWeight": "bold" }, "wideLayout": true }, "pluginVersion": "12.3.1", "targets": [ { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "formatType": "Table", "query": "SELECT COUNT(DISTINCT cluster_id) AS cluster_count FROM `$tableName` USE INDEX($indexName)", "refId": "A", "xcol": "_time", "ycol": "" } ], "title": "Cluster Count", "type": "stat" }, { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "custom": { "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "fillOpacity": 80, "gradientMode": "none", "hideFrom": { "legend": false, "tooltip": false, "viz": false }, "lineWidth": 1, "scaleDistribution": { "log": 2, "type": "log" }, "thresholdsStyle": { "mode": "off" } }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ { "color": "green", "value": 0 }, { "color": "red", "value": 80 } ] }, "unit": "percent" }, "overrides": [] }, "gridPos": { "h": 10, "w": 24, "x": 0, "y": 4 }, "id": 7, "options": { "barRadius": 0, "barWidth": 0.15, "fullHighlight": true, "groupWidth": 0.7, "legend": { "calcs": [ "first" ], "displayMode": "list", "placement": "bottom", "showLegend": false }, "orientation": "horizontal", "showValue": "auto", "stacking": "none", "tooltip": { "hideZeros": false, "mode": "multi", "sort": "none" }, "xField": "cluster_id", "xTickLabelRotation": 0, "xTickLabelSpacing": 0 }, "pluginVersion": "12.3.1", "targets": [ { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "formatType": "Table", "query": "SELECT \n cluster_id,\n ROUND((SUM(fileSize) * 100.0 / (SELECT SUM(fileSize) FROM `$tableName` USE INDEX($indexName))), 2) AS `Capacity Usage`\nFROM `$tableName` USE INDEX($indexName)\nGROUP BY cluster_id\nORDER BY cluster_id;", "refId": "A", "xcol": "_time", "ycol": "" } ], "title": "Capacity Distribution by Cluster", "type": "barchart" }, { "datasource": { "type": "aliyun-tablestore-grafana-datasource-adapt-react", "uid": "fffrqt7w6setcc" }, "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "mappings": [], "thresholds": { "mode": "percentage", "steps": [ { "color": "green", "value": 0 }, { "color": "red", "value": 80 } ] }, "unit": "percent" }, "overrides": [] }, "gridPos": { "h": 6, "w": 24, "x": 0, "y": 14 }, "id": 9, "options": { "minVizHeight": 75, "minVizWidth": 75, "orientation": "auto", "reduceOptions": { "calcs": [], "fields": "/^percentage$/", "values": true }, "showThresholdLabels": false, "showThresholdMarkers": false, "sizing": "auto" }, "pluginVersion": "12.3.1", "targets": [ { "formatType": "Table", "query": "SELECT\n fileset_id,\n SUM(fileSize) * 100.0 / MAX(fileset_size_quota) AS percentage\nFROM `$tableName` USE INDEX($indexName)\nGROUP BY fileset_id\nORDER BY fileset_id", "refId": "A", "xcol": "_time", "ycol": "" } ], "title": "Fileset Capacity Usage", "type": "gauge" } ], "preload": false, "refresh": "5m", "schemaVersion": 42, "tags": [], "templating": { "list": [ { "current": { "text": "bmcpfs_test", "value": "bmcpfs_test" }, "hide": 2, "name": "tableName", "query": "bmcpfs_test", "skipUrlSync": true, "type": "constant" }, { "current": { "text": "bmcpfs_test_idx", "value": "bmcpfs_test_idx" }, "hide": 2, "name": "indexName", "query": "bmcpfs_test_idx", "skipUrlSync": true, "type": "constant" } ] }, "time": { "from": "now-6h", "to": "now" }, "timepicker": { "hidden": true }, "timezone": "browser", "title": "CPFS Overview", "uid": "adqgdbx", "version": 64 } -
Go to Dashboards, click New > Import in the upper-right corner, then upload the modified JSON file.
4. Customize table and index names (Optional)
If your table and index names differ from the default values in the dashboard template, you must update the corresponding dashboard variables.
-
On the dashboard page, click Edit > Settings > Variables in the upper-right corner.
-
Find the
tableNameandindexNamevariables, and change their default values to your table and index names. -
Click Save dashboard.
Sample Grafana dashboard templates
CPFS capacity trend
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 6,
"links": [
{
"asDropdown": true,
"icon": "external link",
"includeVars": false,
"keepTime": false,
"tags": [],
"targetBlank": false,
"title": "Other Dashboards",
"tooltip": "",
"type": "dashboards",
"url": ""
}
],
"liveNow": true,
"panels": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "continuous-BlYlRd"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"fillOpacity": 80,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineWidth": 1,
"scaleDistribution": {
"type": "linear"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "file_size"
},
"properties": [
{
"id": "unit",
"value": "bytes"
}
]
}
]
},
"gridPos": {
"h": 11,
"w": 24,
"x": 0,
"y": 0
},
"id": 9,
"options": {
"barRadius": 0,
"barWidth": 0.1,
"colorByField": "time",
"fullHighlight": true,
"groupWidth": 0.7,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": false
},
"orientation": "auto",
"showValue": "auto",
"stacking": "none",
"text": {
"valueSize": 1
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
},
"xField": "time",
"xTickLabelRotation": 0,
"xTickLabelSpacing": 100
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT DATE(DATE_ADD(FROM_UNIXTIME(cTime / 1000), INTERVAL 8 HOUR)) AS date ,SUM(fileSize) as file_size\n FROM (\n SELECT \n cTime,fileSize \n FROM \n $tableName USE INDEX($indexName) \n WHERE \n cTime >= $__unixMilliTimeFrom() AND cTime < $__unixMilliTimeTo()\n) tl\nGROUP BY date\nORDER BY date",
"refId": "A",
"xcol": " time",
"ycol": ""
}
],
"title": "New file capacity",
"type": "barchart"
},
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"fixedColor": "green",
"mode": "fixed"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"fillOpacity": 80,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineWidth": 1,
"scaleDistribution": {
"type": "linear"
},
"thresholdsStyle": {
"mode": "off"
}
},
"displayName": "Total New Files",
"fieldMinMax": false,
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 24,
"x": 0,
"y": 11
},
"id": 10,
"options": {
"barRadius": 0,
"barWidth": 0.1,
"colorByField": "day",
"fullHighlight": true,
"groupWidth": 0.7,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "right",
"showLegend": false
},
"orientation": "auto",
"showValue": "auto",
"stacking": "none",
"text": {
"valueSize": 1
},
"tooltip": {
"hideZeros": false,
"mode": "multi",
"sort": "none"
},
"xTickLabelRotation": 0,
"xTickLabelSpacing": 100
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT DATE(DATE_ADD(FROM_UNIXTIME(cTime / 1000), INTERVAL 8 HOUR)) AS date ,COUNT(*) as file_count\nFROM (\n SELECT \n cTime,file_name \n FROM \n $tableName USE INDEX($indexName) \n WHERE \n cTime >= $__unixMilliTimeFrom() AND cTime < $__unixMilliTimeTo()\n) tl\nGROUP BY date\nORDER BY date",
"refId": "A",
"xcol": "time",
"ycol": ""
}
],
"title": "New file count",
"type": "barchart"
}
],
"preload": true,
"refresh": "5m",
"schemaVersion": 42,
"tags": [],
"templating": {
"list": [
{
"current": {
"text": "bmcpfs_test",
"value": "bmcpfs_test"
},
"hide": 2,
"label": "Table name",
"name": "tableName",
"query": "bmcpfs_test",
"skipUrlSync": true,
"type": "constant"
},
{
"current": {
"text": "bmcpfs_test_idx",
"value": "bmcpfs_test_idx"
},
"hide": 2,
"name": "indexName",
"query": "bmcpfs_test_idx",
"skipUrlSync": true,
"type": "constant"
}
]
},
"time": {
"from": "now-7d",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "CPFS capacity trend",
"uid": "adj2l5q",
"version": 41
}
CPFS directory comparison
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 7,
"links": [
{
"asDropdown": false,
"icon": "external link",
"includeVars": false,
"keepTime": false,
"tags": [],
"targetBlank": false,
"title": "New link",
"tooltip": "",
"type": "dashboards",
"url": ""
}
],
"panels": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"fixedColor": "blue",
"mode": "fixed"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 4,
"w": 8,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"colorMode": "background",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"showPercentChange": false,
"textMode": "auto",
"wideLayout": true
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT COUNT(*) AS total_count\nFROM (\n SELECT file_name FROM $tableName USE INDEX() WHERE file_name Like CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n UNION ALL\n SELECT file_name FROM $tableName USE INDEX() WHERE file_name Like CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) t",
"refId": "A",
"xcol": "_time",
"ycol": ""
}
],
"title": "Total file count",
"type": "stat"
},
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"fixedColor": "light-red",
"mode": "fixed"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 4,
"w": 8,
"x": 8,
"y": 0
},
"id": 2,
"options": {
"colorMode": "background",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"showPercentChange": false,
"textMode": "auto",
"wideLayout": true
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT COUNT(*) AS duplicate_count\nFROM (\n SELECT file_name, fileSize\n FROM $tableName USE INDEX()\n WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) a\nJOIN (\n SELECT file_name, fileSize\n FROM $tableName USE INDEX()\n WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) b\nON (SUBSTRING_INDEX(a.file_name, '/', -1) = SUBSTRING_INDEX(b.file_name, '/', -1) AND a.fileSize = b.fileSize)",
"refId": "A",
"xcol": "_time",
"ycol": ""
}
],
"title": "Duplicate file count",
"type": "stat"
},
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"fixedColor": "orange",
"mode": "fixed"
},
"mappings": [],
"noValue": "0GB",
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
},
"unit": "bytes"
},
"overrides": []
},
"gridPos": {
"h": 4,
"w": 8,
"x": 16,
"y": 0
},
"id": 3,
"options": {
"colorMode": "background",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"showPercentChange": false,
"textMode": "auto",
"wideLayout": true
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT SUM(a.fileSize) AS duplicate_size\nFROM (\n SELECT file_name, fileSize\n FROM $tableName USE INDEX()\n WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) a\nJOIN (\n SELECT file_name, fileSize\n FROM $tableName USE INDEX()\n WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) b\nON (SUBSTRING_INDEX(a.file_name, '/', -1) = SUBSTRING_INDEX(b.file_name, '/', -1) AND a.fileSize = b.fileSize)",
"refId": "A",
"xcol": "_time",
"ycol": ""
}
],
"title": "Space used by duplicates",
"type": "stat"
},
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"custom": {
"align": "auto",
"cellOptions": {
"type": "auto"
},
"footer": {
"reducers": []
},
"hideFrom": {
"viz": false
},
"inspect": false,
"wrapHeaderText": false
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "Directory A/B path"
},
"properties": [
{
"id": "custom.width",
"value": 801
}
]
},
{
"matcher": {
"id": "byName",
"options": "File name"
},
"properties": [
{
"id": "custom.width",
"value": 405
}
]
},
{
"matcher": {
"id": "byName",
"options": "File size"
},
"properties": [
{
"id": "unit",
"value": "bytes"
}
]
}
]
},
"gridPos": {
"h": 9,
"w": 24,
"x": 0,
"y": 4
},
"id": 4,
"options": {
"cellHeight": "sm",
"showHeader": true,
"sortBy": [
{
"desc": false,
"displayName": "File size"
}
]
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT \n SUBSTRING_INDEX(a.file_name, '/', -1) AS 'File name',\n CONCAT('A:',a.file_name,'\\n','B:',b.file_name) AS 'Directory A/B path',\n a.fileSize AS 'File size'\nFROM (\n SELECT file_name, fileSize\n FROM $tableName USE INDEX()\n WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) a\nJOIN (\n SELECT file_name, fileSize\n FROM $tableName USE INDEX()\n WHERE file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n) b\nON (SUBSTRING_INDEX(a.file_name, '/', -1) = SUBSTRING_INDEX(b.file_name, '/', -1) AND a.fileSize = b.fileSize)",
"refId": "A",
"xcol": "_time",
"ycol": ""
}
],
"title": "Duplicate file list",
"type": "table"
},
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"custom": {
"align": "auto",
"cellOptions": {
"type": "auto"
},
"footer": {
"reducers": []
},
"hideFrom": {
"viz": false
},
"inspect": false
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "File size"
},
"properties": [
{
"id": "unit",
"value": "bytes"
}
]
},
{
"matcher": {
"id": "byName",
"options": "Path source"
},
"properties": [
{
"id": "custom.width",
"value": 118
}
]
},
{
"matcher": {
"id": "byName",
"options": "File name"
},
"properties": [
{
"id": "custom.width",
"value": 288
}
]
},
{
"matcher": {
"id": "byName",
"options": "File path"
},
"properties": [
{
"id": "custom.width",
"value": 904
}
]
}
]
},
"gridPos": {
"h": 7,
"w": 24,
"x": 0,
"y": 13
},
"id": 5,
"options": {
"cellHeight": "sm",
"showHeader": true,
"sortBy": [
{
"desc": false,
"displayName": "File path"
}
]
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT \n 'Path A' AS 'Path source',\n SUBSTRING_INDEX(file_name, '/', -1) AS 'File name',\n file_name AS 'File path',\n fileSize AS 'File size'\nFROM $tableName USE INDEX()\nWHERE \n file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n AND (SUBSTRING_INDEX(file_name, '/', -1), fileSize) NOT IN (\n SELECT \n SUBSTRING_INDEX(file_name, '/', -1), fileSize\n FROM \n $tableName USE INDEX()\n WHERE \n file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n )\n\nUNION ALL\n\nSELECT \n 'Path B' AS 'Path source',\n SUBSTRING_INDEX(file_name, '/', -1) AS 'File name',\n file_name AS 'File path',\n fileSize AS 'File size'\nFROM $tableName USE INDEX()\nWHERE \n file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path2', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n AND (SUBSTRING_INDEX(file_name, '/', -1), fileSize) NOT IN (\n SELECT \n SUBSTRING_INDEX(file_name, '/', -1), fileSize\n FROM \n $tableName USE INDEX()\n WHERE \n file_name LIKE CONCAT(REPLACE(REPLACE(REPLACE('$path1', '%', '\\\\%'),'\\\\','\\\\\\\\'),'_','\\\\_'),'%')\n )",
"refId": "A",
"xcol": "_time",
"ycol": ""
}
],
"title": "Unique file list",
"type": "table"
}
],
"preload": false,
"schemaVersion": 42,
"tags": [],
"templating": {
"list": [
{
"current": {
"text": "/fileset1",
"value": "/fileset1"
},
"label": "Source directory (A)",
"name": "path1",
"options": [
{
"selected": true,
"text": "/fileset1",
"value": "/fileset1"
}
],
"query": "/fileset1",
"type": "textbox"
},
{
"current": {
"text": "/fileset2",
"value": "/fileset2"
},
"label": "Reference directory (B)",
"name": "path2",
"options": [
{
"selected": true,
"text": "/fileset2",
"value": "/fileset2"
}
],
"query": "/fileset2",
"type": "textbox"
},
{
"current": {
"text": "bmcpfs_test",
"value": "bmcpfs_test"
},
"hide": 2,
"label": "Table name",
"name": "tableName",
"query": "bmcpfs_test",
"skipUrlSync": true,
"type": "constant"
}
]
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {
"hidden": true
},
"timezone": "browser",
"title": "CPFS directory comparison",
"uid": "adwm968",
"version": 44
}
CPFS file search
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 4,
"links": [
{
"asDropdown": true,
"icon": "external link",
"includeVars": false,
"keepTime": false,
"tags": [],
"targetBlank": false,
"title": "Other Dashboards",
"tooltip": "",
"type": "dashboards",
"url": ""
}
],
"panels": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"custom": {
"align": "left",
"cellOptions": {
"type": "auto"
},
"footer": {
"reducers": []
},
"inspect": false
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "File name"
},
"properties": [
{
"id": "custom.width",
"value": 667
}
]
},
{
"matcher": {
"id": "byName",
"options": "Creation time"
},
"properties": [
{
"id": "custom.width",
"value": 238
}
]
},
{
"matcher": {
"id": "byName",
"options": "Region"
},
"properties": [
{
"id": "custom.width",
"value": 277
}
]
},
{
"matcher": {
"id": "byName",
"options": "Creation time"
},
"properties": [
{
"id": "unit",
"value": "dateTimeAsIso"
}
]
},
{
"matcher": {
"id": "byName",
"options": "File size"
},
"properties": [
{
"id": "unit",
"value": "bytes"
}
]
}
]
},
"gridPos": {
"h": 21,
"w": 24,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"cellHeight": "md",
"showHeader": true,
"sortBy": []
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT \n file_name AS \"File name\",\n cTime AS \"Creation time\",\n region AS \"Region\",\n fileSize AS \"File size\"\nFROM $tableName USE INDEX($indexName)\nWHERE TEXT_MATCH_PHRASE(file_name,'$searchKeyword')\nLIMIT $limit",
"refId": "A",
"xcol": "_time",
"ycol": ""
}
],
"title": "File search results",
"type": "table"
}
],
"preload": false,
"refresh": "",
"schemaVersion": 42,
"tags": [],
"templating": {
"list": [
{
"current": {
"text": "/fileset1",
"value": "/fileset1"
},
"label": "File search",
"name": "searchKeyword",
"options": [
{
"selected": true,
"text": "/fileset1",
"value": "/fileset1"
}
],
"query": "/fileset1",
"type": "textbox"
},
{
"current": {
"text": "bmcpfs_test",
"value": "bmcpfs_test"
},
"hide": 2,
"label": "Table name",
"name": "tableName",
"query": "bmcpfs_test",
"skipUrlSync": true,
"type": "constant"
},
{
"current": {
"text": "5000",
"value": "5000"
},
"label": "Display limit",
"name": "limit",
"options": [
{
"selected": false,
"text": "10",
"value": "10"
},
{
"selected": false,
"text": "50",
"value": "50"
},
{
"selected": false,
"text": "100",
"value": "100"
},
{
"selected": false,
"text": "500",
"value": "500"
},
{
"selected": false,
"text": "1000",
"value": "1000"
},
{
"selected": true,
"text": "5000",
"value": "5000"
},
{
"selected": false,
"text": "10000",
"value": "10000"
}
],
"query": "10,50,100,500,1000,5000,10000",
"type": "custom"
},
{
"current": {
"text": "bmcpfs_test_idx",
"value": "bmcpfs_test_idx"
},
"hide": 2,
"name": "indexName",
"query": "bmcpfs_test_idx",
"skipUrlSync": true,
"type": "constant"
}
]
},
"time": {
"from": "now-1h",
"to": "now"
},
"timepicker": {
"hidden": true
},
"timezone": "",
"title": "CPFS file search",
"uid": "file-search-analysis",
"version": 33
}
CPFS file search and analysis
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 3,
"links": [
{
"asDropdown": true,
"icon": "external link",
"includeVars": false,
"keepTime": false,
"tags": [],
"targetBlank": false,
"title": "Other Dashboards",
"tooltip": "",
"type": "dashboards",
"url": ""
}
],
"panels": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "continuous-BlPu"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 3,
"w": 12,
"x": 0,
"y": 0
},
"id": 3,
"options": {
"colorMode": "background",
"graphMode": "none",
"justifyMode": "auto",
"orientation": "horizontal",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"last"
],
"fields": "",
"values": false
},
"showPercentChange": false,
"textMode": "auto",
"wideLayout": true
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT COUNT(*) AS file_count\nFROM `$tableName` USE INDEX($indexName)\nWHERE \n fileSize >= IF('$fileSize'='',0,$fileSize+0) * 1024\nAND \n aTime >= $__unixMilliTimeFrom() AND aTime < $__unixMilliTimeTo()\nAND\n TEXT_MATCH_PHRASE(file_name,'$pathFilter')",
"refId": "A",
"xcol": "_time",
"ycol": ""
}
],
"title": "Matching file count",
"type": "stat"
},
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"noValue": "0GB",
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 100
}
]
},
"unit": "bytes"
},
"overrides": []
},
"gridPos": {
"h": 3,
"w": 12,
"x": 12,
"y": 0
},
"id": 5,
"options": {
"colorMode": "background",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "horizontal",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"last"
],
"fields": "",
"values": false
},
"showPercentChange": false,
"textMode": "value",
"wideLayout": true
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT \n SUM(fileSize) AS cpfs_total_capacity_bytes\nFROM `$tableName` USE INDEX($indexName)\nWHERE \n fileSize >= IF('$fileSize'='',0,$fileSize+0) * 1024\nAND \n aTime >= $__unixMilliTimeFrom() AND aTime < $__unixMilliTimeTo()\nAND\n TEXT_MATCH_PHRASE(file_name,'$pathFilter')",
"refId": "A",
"xcol": "aTime",
"ycol": ""
}
],
"title": "Total size of matching files",
"type": "stat"
},
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"custom": {
"align": "left",
"cellOptions": {
"type": "auto"
},
"footer": {
"reducers": []
},
"inspect": false
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "File path"
},
"properties": [
{
"id": "custom.width",
"value": 561
}
]
},
{
"matcher": {
"id": "byName",
"options": "File size"
},
"properties": [
{
"id": "custom.width",
"value": 198
},
{
"id": "unit",
"value": "bytes"
}
]
},
{
"matcher": {
"id": "byName",
"options": "Creation time"
},
"properties": [
{
"id": "unit",
"value": "dateTimeAsIso"
}
]
},
{
"matcher": {
"id": "byName",
"options": "Access time"
},
"properties": [
{
"id": "unit",
"value": "dateTimeAsIso"
}
]
},
{
"matcher": {
"id": "byName",
"options": "Modification time"
},
"properties": [
{
"id": "unit",
"value": "dateTimeAsIso"
}
]
}
]
},
"gridPos": {
"h": 17,
"w": 24,
"x": 0,
"y": 3
},
"id": 1,
"options": {
"cellHeight": "md",
"showHeader": true,
"sortBy": [
{
"desc": false,
"displayName": "File path"
}
]
},
"pluginVersion": "12.3.1",
"targets": [
{
"datasource": {
"type": "aliyun-tablestore-grafana-datasource-adapt-react",
"uid": "fffrqt7w6setcc"
},
"formatType": "Table",
"query": "SELECT \n file_name AS \"File path\",\n fileSize AS \"File size\",\n cTime AS \"Creation time\",\n aTime AS \"Access time\",\n mTime AS \"Modification time\"\nFROM `$tableName` USE INDEX($indexName)\nWHERE \n fileSize >= IF('$fileSize'='',0,$fileSize+0) * 1024\nAND \n aTime >= $__unixMilliTimeFrom() AND aTime < $__unixMilliTimeTo()\nAND\n TEXT_MATCH_PHRASE(file_name,'$pathFilter')\nORDER BY fileSize ASC\nLIMIT $limit",
"refId": "A",
"xcol": "_time",
"ycol": ""
}
],
"title": "File query results",
"type": "table"
}
],
"preload": false,
"refresh": "5m",
"schemaVersion": 42,
"tags": [],
"templating": {
"list": [
{
"current": {
"text": "",
"value": ""
},
"label": "File size threshold (KiB)",
"name": "fileSize",
"options": [
{
"selected": true,
"text": "",
"value": ""
}
],
"query": "",
"type": "textbox"
},
{
"current": {
"text": "fileset1",
"value": "fileset1"
},
"label": "Directory filter",
"name": "pathFilter",
"options": [
{
"selected": true,
"text": "fileset1",
"value": "fileset1"
}
],
"query": "fileset1",
"type": "textbox"
},
{
"current": {
"text": "5000",
"value": "5000"
},
"label": "Display limit",
"name": "limit",
"options": [
{
"selected": false,
"text": "10",
"value": "10"
},
{
"selected": false,
"text": "50",
"value": "50"
},
{
"selected": false,
"text": "100",
"value": "100"
},
{
"selected": false,
"text": "500",
"value": "500"
},
{
"selected": false,
"text": "1000",
"value": "1000"
},
{
"selected": true,
"text": "5000",
"value": "5000"
},
{
"selected": false,
"text": "10000",
"value": "10000"
}
],
"query": "10,50,100,500,1000,5000,10000",
"type": "custom"
},
{
"current": {
"text": "bmcpfs_test",
"value": "bmcpfs_test"
},
"description": "",
"hide": 2,
"label": "Table name",
"name": "tableName",
"query": "bmcpfs_test",
"skipUrlSync": true,
"type": "constant"
},
{
"current": {
"text": "bmcpfs_test_idx",
"value": "bmcpfs_test_idx"
},
"hide": 2,
"name": "indexName",
"query": "bmcpfs_test_idx",
"skipUrlSync": true,
"type": "constant"
}
]
},
"time": {
"from": "now-30d",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "CPFS file search & analysis",
"uid": "cpfs-analysis",
"version": 90
}