Migrate data from a self-managed Milvus cluster to an AnalyticDB for PostgreSQL instance

更新时间:
复制 MD 格式

Migrate your Milvus collections—including vector and scalar fields—to an AnalyticDB for PostgreSQL instance using two Python scripts: one to export data from Milvus as CSV files, and one to import those files into AnalyticDB for PostgreSQL.

Prerequisites

Before you begin, make sure you have the following:

Milvus (source)

  • A Milvus cluster running version 2.3.x or later

  • Connection credentials: host, port, username, password, database name, and access token

AnalyticDB for PostgreSQL (destination)

  • An AnalyticDB for PostgreSQL instance with a public endpoint

  • A database account with permissions to create schemas and tables

Environment

  • Python 3.8 or later

  • The following Python libraries:

    pip install psycopg2
    pip install pymilvus==2.3.0
    pip install pyaml
    pip install tqdm

Data type mapping

The export script automatically maps Milvus data types to the corresponding PostgreSQL types. The following table shows how each type is converted.

Milvus data typePostgreSQL type
BOOLbool
INT8smallint
INT16smallint
INT32integer
INT64bigint
FLOATreal
DOUBLEdouble precision
STRINGtext
VARCHARvarchar
JSONjson
BINARY_VECTORbit[]
FLOAT_VECTORreal[]

Migration considerations

Be aware of the following limitations before you start:

LimitationDetails
Live writes during migrationThe export script captures a point-in-time snapshot. Writes to Milvus after the export starts are not captured. Either pause writes before exporting, or log new writes separately and replay them after the import completes.
No automatic index creationThe import loads raw data only. Vector indexes are not created automatically. Without indexes, similarity searches fall back to sequential scans. Build indexes after import before running production queries.
Unsupported data typesOnly the data types listed in the mapping table above are supported. Collections that contain other data types may fail during export.

Step 1: Export data from Milvus

The export script connects to Milvus, reads each collection using a query iterator (batch size: 1,000 rows), and writes the data to numbered CSV files. It also generates a create_table.sql file per collection that contains the corresponding CREATE TABLE statement for PostgreSQL.

  1. Create the following three files in the same working directory: export.py

    import yaml
    import json
    from pymilvus import (
        connections,
        DataType,
        Collection,
    )
    import os
    from tqdm import tqdm
    
    with open("./milvus2csv.yaml", "r") as f:
        config = yaml.safe_load(f)
    
    print("configuration:")
    print(config)
    
    milvus_config = config["milvus"]
    
    milvus_type_to_adbpg_type = {
        DataType.BOOL: "bool",
        DataType.INT8: "smallint",
        DataType.INT16: "smallint",
        DataType.INT32: "integer",
        DataType.INT64: "bigint",
    
        DataType.FLOAT: "real",
        DataType.DOUBLE: "double precision",
    
        DataType.STRING: "text",
        DataType.VARCHAR: "varchar",
        DataType.JSON: "json",
    
        DataType.BINARY_VECTOR: "bit[]",
        DataType.FLOAT_VECTOR: "real[]",
    }
    
    
    def convert_to_binary(binary_data):
        decimal_value = int.from_bytes(binary_data, byteorder='big')
        binary_string = bin(decimal_value)[2:].zfill(len(binary_data)*8)
        return ','.join(list(binary_string))
    
    
    def data_convert_to_str(data, dtype, delimeter):
        if dtype == DataType.BOOL:
            return "1" if data else "0"
        elif dtype in [DataType.INT8, DataType.INT16,
                       DataType.INT32, DataType.INT64,
                       DataType.FLOAT, DataType.DOUBLE]:
            return str(data)
        elif dtype in [DataType.STRING, DataType.VARCHAR]:
            return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"").replace("\n", "\\n")
        elif dtype == DataType.JSON:
            return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"").replace("\n", "\\n")
        elif dtype == DataType.BINARY_VECTOR:
            return "{" + ','.join([convert_to_binary(d) for d in data]) + "}"
        elif dtype == DataType.FLOAT_VECTOR:
            return data
    
        Exception(f"Unsupported DataType {dtype}")
    
    
    def csv_write_rows(datum, fd, fields_types, delimiter="|"):
        for data in datum:
            for i in range(len(data)):
                ftype = fields_types[i]
                data[i] = data_convert_to_str(data[i], ftype, delimiter)
            fd.write(delimiter.join(data) + "\n")
    
    
    def csv_write_header(headers, fd, delimiter="|"):
        fd.write(delimiter.join(headers) + "\n")
    
    
    def dump_collection(collection_name: str):
        results = []
        file_cnt = 0
        print("connecting to milvus...")
        connections.connect("default", **milvus_config)
    
        export_config = config["export"]
        collection = Collection(collection_name)
        collection.load()
        tmp_path = os.path.join(export_config["output_path"], collection_name)
        if not os.path.exists(tmp_path):
            os.mkdir(tmp_path)
    
        fields_meta_str = ""
        fields_types = []
        headers = []
        for schema in collection.schema.fields:
            print(schema)
            fields_types.append(schema.dtype)
            headers.append(schema.name)
            if len(fields_meta_str) != 0:
                fields_meta_str += ","
            fields_meta_str += f"{schema.name} {milvus_type_to_adbpg_type[schema.dtype]}"
            if schema.dtype == DataType.VARCHAR and "max_length" in schema.params.keys():
                fields_meta_str += f"({schema.params['max_length']})"
            if schema.is_primary:
                fields_meta_str += " PRIMARY KEY"
    
        create_table_sql = f"CREATE TABLE {collection.name} " \
                           f" ({fields_meta_str});"
    
        with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f:
            f.write(create_table_sql)
    
        print(create_table_sql)
    
        print(headers)
    
        total_num = collection.num_entities
        collection.load()
        query_iterator = collection.query_iterator(batch_size=1000, expr="", output_fields=headers)
    
        def write_to_csv_file(col_names, data):
            if len(results) == 0:
                return
            nonlocal file_cnt
            assert(file_cnt <= 1e9)
            output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv")
            with open(output_file_name, "w", newline="") as csv_file:
                # write header
                csv_write_header(col_names, csv_file)
                # write data
                csv_write_rows(data, csv_file, fields_types)
                file_cnt += 1
                results.clear()
    
        with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar:
            while True:
                res = query_iterator.next()
                if len(res) == 0:
                    print("query iteration finished, close")
                    # close the iterator
                    query_iterator.close()
                    break
                for row in res:
                    row_list = []
                    for i in range(len(headers)):
                        field = row[headers[i]]
                        if isinstance(field, list) and fields_types[i] != DataType.BINARY_VECTOR:
                            row_list.append("{" + ", ".join(str(x) for x in field) + "}")
                        elif isinstance(field, dict):
                            row_list.append(json.dumps(field, ensure_ascii=False))
                        else:
                            row_list.append(field)
                    results.append(row_list)
                    if len(results) >= export_config["max_line_in_file"]:
                        write_to_csv_file(headers, data=results)
                    pbar.update(1)
    
        write_to_csv_file(headers, data=results)
    
    if __name__ == "__main__":
      for name in config["export"]["collections"]:
          dump_collection(name)

    milvus2csv.yaml

    milvus:
       host: '<localhost>'        # The host address of the Milvus service.
       port: 19530                # The service port of Milvus.
       user: '<user_name>'        # The username.
       password: '<password>'     # The password.
       db_name: '<database_name>' # The database name.
       token: '<token_id>'        # The access token.
    
    export:
       collections:
        - 'test'
        - 'medium_articles_with_json'
        # - 'hello_milvus'
        # - 'car'
        # - 'medium_articles_with_dynamic'
        # Specify the names of all collections that you want to export.
      max_line_in_file: 40000     # The number of rows per output file. Reduce this value if you have limited memory.
      output_path: './output'     # The destination folder for export. This topic uses ./output as an example.

    output/ — an empty folder for the exported data.

  2. Update milvus2csv.yaml with your Milvus connection details and the names of the collections to export.

  3. Run the export script:

    python export.py

    After the script finishes, the output folder contains one subdirectory per collection, each with numbered CSV files and a create_table.sql file:

    .
    ├── export.py
    ├── milvus2csv.yaml
    └── output
        ├── medium_articles_with_json
        │   ├── 0000000000.csv
        │   ├── 0000000001.csv
        │   ├── 0000000002.csv
        │   └── create_table.sql
        └── test
            ├── 0000000000.csv
            └── create_table.sql

Step 2: Import data to AnalyticDB for PostgreSQL

The import script reads the exported CSV files and loads them into AnalyticDB for PostgreSQL using the PostgreSQL COPY command. It creates the schema (if it doesn't exist) and table before each import.

  1. In a new working directory, place the following two files alongside the data folder (rename the output folder from Step 1 to data, or update data_path in the configuration): import.py

    import psycopg2
    import yaml
    import glob
    import os
    
    if __name__ == "__main__":
        with open('csv2adbpg.yaml', 'r') as config_file:
            config = yaml.safe_load(config_file)
    
        print("current config:" + str(config))
    
        db_host = config['database']['host']
        db_port = config['database']['port']
        db_name = config['database']['name']
        schema_name = config['database']['schema']
        db_user = config['database']['user']
        db_password = config['database']['password']
        data_path = config['data_path']
    
        conn = psycopg2.connect(
            host=db_host,
            port=db_port,
            database=db_name,
            user=db_user,
            password=db_password,
            options=f'-c search_path={schema_name},public'
        )
    
        cur = conn.cursor()
    
        # check schema
        cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,))
        existing_schema = cur.fetchone()
        if existing_schema:
            print(f"Schema {schema_name} already exists.")
        else:
            # create schema
            cur.execute(f"CREATE SCHEMA {schema_name}")
            print(f"Created schema: {schema_name}")
    
        for table_name in os.listdir(data_path):
            table_folder = os.path.join(data_path, table_name)
            print(f"Begin Process table: {table_name}")
            if os.path.isdir(table_folder):
                create_table_file = os.path.join(table_folder, 'create_table.sql')
                with open(create_table_file, 'r') as file:
                    create_table_sql = file.read()
                try:
                    cur.execute(create_table_sql)
                except psycopg2.errors.DuplicateTable as e:
                    print(e)
                    conn.rollback()
                    continue
                print(f"Created table: {table_name}")
    
                cnt = 0
                csv_files = glob.glob(os.path.join(table_folder, '*.csv'))
                for csv_file in csv_files:
                    with open(csv_file, 'r') as file:
                        copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER"
                        cur.copy_expert(copy_command, file)
                    cnt += 1
                    print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done")
    
            conn.commit()
            print(f"Finished import table: {table_name}")
            print(' # '*60)
    
        cur.close()
        conn.close()

    csv2adbpg.yaml

    database:
      host: "192.16.XX.XX"         # The public endpoint of the AnalyticDB for PostgreSQL instance.
      port: 5432                   # The port of the AnalyticDB for PostgreSQL instance.
      name: "vector_database"      # The name of the destination database.
      user: "username"             # The database account of the AnalyticDB for PostgreSQL instance.
      password: ""                 # The password of the account.
      schema: "public"             # The name of the schema to import. If the schema does not exist, it is automatically created.
    
    data_path: "./data"            # The data source to import.

    The directory structure should look like this:

    .
    ├── csv2adbpg.yaml
    ├── data
    │   ├── medium_articles_with_json
    │   │   ├── 0000000000.csv
    │   │   ├── 0000000001.csv
    │   │   ├── 0000000002.csv
    │   │   └── create_table.sql
    │   └── test
    │       ├── 0000000000.csv
    │       └── create_table.sql
    └── import.py
  2. Update csv2adbpg.yaml with your AnalyticDB for PostgreSQL connection details.

  3. Run the import script:

    python import.py
  4. Verify the import by connecting to your AnalyticDB for PostgreSQL instance and checking row counts:

    SELECT COUNT(*) FROM <table_name>;

    The count should match the number of rows exported from the corresponding Milvus collection.

What's next

After the import, the tables contain data but have no vector indexes. Without indexes, similarity searches fall back to a sequential scan, which is significantly slower at scale. Build vector indexes on the migrated tables before running production queries.

For instructions on creating vector indexes, see Create a vector index.

References