Migrate data from a self-managed Milvus cluster to an AnalyticDB for PostgreSQL instance
Migrate your Milvus collections—including vector and scalar fields—to an AnalyticDB for PostgreSQL instance using two Python scripts: one to export data from Milvus as CSV files, and one to import those files into AnalyticDB for PostgreSQL.
Prerequisites
Before you begin, make sure you have the following:
Milvus (source)
A Milvus cluster running version 2.3.x or later
Connection credentials: host, port, username, password, database name, and access token
AnalyticDB for PostgreSQL (destination)
An AnalyticDB for PostgreSQL instance with a public endpoint
A database account with permissions to create schemas and tables
Environment
Python 3.8 or later
The following Python libraries:
pip install psycopg2 pip install pymilvus==2.3.0 pip install pyaml pip install tqdm
Data type mapping
The export script automatically maps Milvus data types to the corresponding PostgreSQL types. The following table shows how each type is converted.
| Milvus data type | PostgreSQL type |
|---|---|
BOOL | bool |
INT8 | smallint |
INT16 | smallint |
INT32 | integer |
INT64 | bigint |
FLOAT | real |
DOUBLE | double precision |
STRING | text |
VARCHAR | varchar |
JSON | json |
BINARY_VECTOR | bit[] |
FLOAT_VECTOR | real[] |
Migration considerations
Be aware of the following limitations before you start:
| Limitation | Details |
|---|---|
| Live writes during migration | The export script captures a point-in-time snapshot. Writes to Milvus after the export starts are not captured. Either pause writes before exporting, or log new writes separately and replay them after the import completes. |
| No automatic index creation | The import loads raw data only. Vector indexes are not created automatically. Without indexes, similarity searches fall back to sequential scans. Build indexes after import before running production queries. |
| Unsupported data types | Only the data types listed in the mapping table above are supported. Collections that contain other data types may fail during export. |
Step 1: Export data from Milvus
The export script connects to Milvus, reads each collection using a query iterator (batch size: 1,000 rows), and writes the data to numbered CSV files. It also generates a create_table.sql file per collection that contains the corresponding CREATE TABLE statement for PostgreSQL.
Create the following three files in the same working directory:
export.pyimport yaml import json from pymilvus import ( connections, DataType, Collection, ) import os from tqdm import tqdm with open("./milvus2csv.yaml", "r") as f: config = yaml.safe_load(f) print("configuration:") print(config) milvus_config = config["milvus"] milvus_type_to_adbpg_type = { DataType.BOOL: "bool", DataType.INT8: "smallint", DataType.INT16: "smallint", DataType.INT32: "integer", DataType.INT64: "bigint", DataType.FLOAT: "real", DataType.DOUBLE: "double precision", DataType.STRING: "text", DataType.VARCHAR: "varchar", DataType.JSON: "json", DataType.BINARY_VECTOR: "bit[]", DataType.FLOAT_VECTOR: "real[]", } def convert_to_binary(binary_data): decimal_value = int.from_bytes(binary_data, byteorder='big') binary_string = bin(decimal_value)[2:].zfill(len(binary_data)*8) return ','.join(list(binary_string)) def data_convert_to_str(data, dtype, delimeter): if dtype == DataType.BOOL: return "1" if data else "0" elif dtype in [DataType.INT8, DataType.INT16, DataType.INT32, DataType.INT64, DataType.FLOAT, DataType.DOUBLE]: return str(data) elif dtype in [DataType.STRING, DataType.VARCHAR]: return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"").replace("\n", "\\n") elif dtype == DataType.JSON: return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"").replace("\n", "\\n") elif dtype == DataType.BINARY_VECTOR: return "{" + ','.join([convert_to_binary(d) for d in data]) + "}" elif dtype == DataType.FLOAT_VECTOR: return data Exception(f"Unsupported DataType {dtype}") def csv_write_rows(datum, fd, fields_types, delimiter="|"): for data in datum: for i in range(len(data)): ftype = fields_types[i] data[i] = data_convert_to_str(data[i], ftype, delimiter) fd.write(delimiter.join(data) + "\n") def csv_write_header(headers, fd, delimiter="|"): fd.write(delimiter.join(headers) + "\n") def dump_collection(collection_name: str): results = [] file_cnt = 0 print("connecting to milvus...") connections.connect("default", **milvus_config) export_config = config["export"] collection = Collection(collection_name) collection.load() tmp_path = os.path.join(export_config["output_path"], collection_name) if not os.path.exists(tmp_path): os.mkdir(tmp_path) fields_meta_str = "" fields_types = [] headers = [] for schema in collection.schema.fields: print(schema) fields_types.append(schema.dtype) headers.append(schema.name) if len(fields_meta_str) != 0: fields_meta_str += "," fields_meta_str += f"{schema.name} {milvus_type_to_adbpg_type[schema.dtype]}" if schema.dtype == DataType.VARCHAR and "max_length" in schema.params.keys(): fields_meta_str += f"({schema.params['max_length']})" if schema.is_primary: fields_meta_str += " PRIMARY KEY" create_table_sql = f"CREATE TABLE {collection.name} " \ f" ({fields_meta_str});" with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f: f.write(create_table_sql) print(create_table_sql) print(headers) total_num = collection.num_entities collection.load() query_iterator = collection.query_iterator(batch_size=1000, expr="", output_fields=headers) def write_to_csv_file(col_names, data): if len(results) == 0: return nonlocal file_cnt assert(file_cnt <= 1e9) output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv") with open(output_file_name, "w", newline="") as csv_file: # write header csv_write_header(col_names, csv_file) # write data csv_write_rows(data, csv_file, fields_types) file_cnt += 1 results.clear() with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar: while True: res = query_iterator.next() if len(res) == 0: print("query iteration finished, close") # close the iterator query_iterator.close() break for row in res: row_list = [] for i in range(len(headers)): field = row[headers[i]] if isinstance(field, list) and fields_types[i] != DataType.BINARY_VECTOR: row_list.append("{" + ", ".join(str(x) for x in field) + "}") elif isinstance(field, dict): row_list.append(json.dumps(field, ensure_ascii=False)) else: row_list.append(field) results.append(row_list) if len(results) >= export_config["max_line_in_file"]: write_to_csv_file(headers, data=results) pbar.update(1) write_to_csv_file(headers, data=results) if __name__ == "__main__": for name in config["export"]["collections"]: dump_collection(name)milvus2csv.yamlmilvus: host: '<localhost>' # The host address of the Milvus service. port: 19530 # The service port of Milvus. user: '<user_name>' # The username. password: '<password>' # The password. db_name: '<database_name>' # The database name. token: '<token_id>' # The access token. export: collections: - 'test' - 'medium_articles_with_json' # - 'hello_milvus' # - 'car' # - 'medium_articles_with_dynamic' # Specify the names of all collections that you want to export. max_line_in_file: 40000 # The number of rows per output file. Reduce this value if you have limited memory. output_path: './output' # The destination folder for export. This topic uses ./output as an example.output/— an empty folder for the exported data.Update
milvus2csv.yamlwith your Milvus connection details and the names of the collections to export.Run the export script:
python export.pyAfter the script finishes, the
outputfolder contains one subdirectory per collection, each with numbered CSV files and acreate_table.sqlfile:. ├── export.py ├── milvus2csv.yaml └── output ├── medium_articles_with_json │ ├── 0000000000.csv │ ├── 0000000001.csv │ ├── 0000000002.csv │ └── create_table.sql └── test ├── 0000000000.csv └── create_table.sql
Step 2: Import data to AnalyticDB for PostgreSQL
The import script reads the exported CSV files and loads them into AnalyticDB for PostgreSQL using the PostgreSQL COPY command. It creates the schema (if it doesn't exist) and table before each import.
In a new working directory, place the following two files alongside the
datafolder (rename theoutputfolder from Step 1 todata, or updatedata_pathin the configuration):import.pyimport psycopg2 import yaml import glob import os if __name__ == "__main__": with open('csv2adbpg.yaml', 'r') as config_file: config = yaml.safe_load(config_file) print("current config:" + str(config)) db_host = config['database']['host'] db_port = config['database']['port'] db_name = config['database']['name'] schema_name = config['database']['schema'] db_user = config['database']['user'] db_password = config['database']['password'] data_path = config['data_path'] conn = psycopg2.connect( host=db_host, port=db_port, database=db_name, user=db_user, password=db_password, options=f'-c search_path={schema_name},public' ) cur = conn.cursor() # check schema cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,)) existing_schema = cur.fetchone() if existing_schema: print(f"Schema {schema_name} already exists.") else: # create schema cur.execute(f"CREATE SCHEMA {schema_name}") print(f"Created schema: {schema_name}") for table_name in os.listdir(data_path): table_folder = os.path.join(data_path, table_name) print(f"Begin Process table: {table_name}") if os.path.isdir(table_folder): create_table_file = os.path.join(table_folder, 'create_table.sql') with open(create_table_file, 'r') as file: create_table_sql = file.read() try: cur.execute(create_table_sql) except psycopg2.errors.DuplicateTable as e: print(e) conn.rollback() continue print(f"Created table: {table_name}") cnt = 0 csv_files = glob.glob(os.path.join(table_folder, '*.csv')) for csv_file in csv_files: with open(csv_file, 'r') as file: copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER" cur.copy_expert(copy_command, file) cnt += 1 print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done") conn.commit() print(f"Finished import table: {table_name}") print(' # '*60) cur.close() conn.close()csv2adbpg.yamldatabase: host: "192.16.XX.XX" # The public endpoint of the AnalyticDB for PostgreSQL instance. port: 5432 # The port of the AnalyticDB for PostgreSQL instance. name: "vector_database" # The name of the destination database. user: "username" # The database account of the AnalyticDB for PostgreSQL instance. password: "" # The password of the account. schema: "public" # The name of the schema to import. If the schema does not exist, it is automatically created. data_path: "./data" # The data source to import.The directory structure should look like this:
. ├── csv2adbpg.yaml ├── data │ ├── medium_articles_with_json │ │ ├── 0000000000.csv │ │ ├── 0000000001.csv │ │ ├── 0000000002.csv │ │ └── create_table.sql │ └── test │ ├── 0000000000.csv │ └── create_table.sql └── import.pyUpdate
csv2adbpg.yamlwith your AnalyticDB for PostgreSQL connection details.Run the import script:
python import.pyVerify the import by connecting to your AnalyticDB for PostgreSQL instance and checking row counts:
SELECT COUNT(*) FROM <table_name>;The count should match the number of rows exported from the corresponding Milvus collection.
What's next
After the import, the tables contain data but have no vector indexes. Without indexes, similarity searches fall back to a sequential scan, which is significantly slower at scale. Build vector indexes on the migrated tables before running production queries.
For instructions on creating vector indexes, see Create a vector index.