Skip to main content

Migrate On-premises Milvus to Relyt

This guide offers best practices for migrating data from a local Milvus cluster to the DW service units of the Vector edition using Python.


Prerequisites

  • Your Milvus cluster is version 2.3.x or later.

  • Python 3.8 or higher has been installed.

  • The following Python libraries have been installed:

    • psycopg2

    • pymilvus-2.3.0

    • pyaml

    • tqdm

    If not, run the following commands to install them:

    pip install psycopg2
    pip install pymilvus==2.3.0
    pip install pyaml
    pip install tqdm

Migrate a Milvus database to Relyt

Step 1. Export Milvus data

  1. Create an export script named export.py and an export configuration file named milvus2csv.yaml, along with an output directory named output.

    info

    The names of the export script, configuration file, and output directory are for reference only and can be adjusted as needed.

    Following is content of the export script export.py:

    import yaml
    import json
    from pymilvus import (
    connections,
    DataType,
    Collection,
    )
    import os
    from tqdm import tqdm

    with open("./milvus2csv.yaml", "r") as f:
    config = yaml.safe_load(f)

    print("configuration:")
    print(config)

    milvus_config = config["milvus"]

    milvus_type_to_relyt_type = {
    DataType.BOOL: "bool",
    DataType.INT8: "smallint",
    DataType.INT16: "smallint",
    DataType.INT32: "integer",
    DataType.INT64: "bigint",

    DataType.FLOAT: "real",
    DataType.DOUBLE: "double precision",

    DataType.STRING: "text",
    DataType.VARCHAR: "varchar",
    DataType.JSON: "json",

    DataType.BINARY_VECTOR: "bvector",
    DataType.FLOAT_VECTOR: "vector",
    DataType.FLOAT16_VECTOR: "vecf16",
    DataType.SPARSE_FLOAT_VECTOR: "svector",
    }


    def convert_to_binary(binary_data):
    decimal_value = int.from_bytes(binary_data, byteorder='big')
    binary_string = bin(decimal_value)[2:].zfill(len(binary_data) * 8)
    return ','.join(list(binary_string))


    def data_convert_to_str(data, dtype, delimeter):
    if dtype == DataType.BOOL:
    return "1" if data else "0"
    elif dtype in [DataType.INT8, DataType.INT16,
    DataType.INT32, DataType.INT64,
    DataType.FLOAT, DataType.DOUBLE]:
    return str(data)
    elif dtype in [DataType.STRING, DataType.VARCHAR]:
    return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"")
    elif dtype == DataType.JSON:
    return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"")
    elif dtype in [DataType.FLOAT_VECTOR, DataType.SPARSE_FLOAT_VECTOR, DataType.FLOAT16_VECTOR, DataType.BINARY_VECTOR]:
    return "[" + data[1:-1] + "]"

    Exception(f"Unsupported DataType {dtype}")


    def csv_write_rows(datum, fd, fields_types, delimiter="|"):
    for data in datum:
    for i in range(len(data)):
    ftype = fields_types[i]
    data[i] = data_convert_to_str(data[i], ftype, delimiter)
    fd.write(delimiter.join(data) + "\n")


    def csv_write_header(headers, fd, delimiter="|"):
    fd.write(delimiter.join(headers) + "\n")


    def dump_collection(collection_name: str):
    results = []
    file_cnt = 0
    print("connecting to milvus...")
    connections.connect("default", **milvus_config)

    export_config = config["export"]
    collection = Collection(collection_name)
    collection.load()
    tmp_path = os.path.join(export_config["output_path"], collection_name)
    if not os.path.exists(tmp_path):
    os.mkdir(tmp_path)
    enable_dynamic_field = collection.schema.enable_dynamic_field
    if enable_dynamic_field:
    print("Dynamic field is enabled, there may be additional schemas for data.")
    res = collection.query(expr='', output_fields=["*"], limit=1)
    if len(res[0]) != len(collection.schema.fields):
    print("The schema of the collection is not consistent with the schema of the data. "
    "Please check the schema of the collection and the schema of the data.")
    print(f"The schema of the collection is:{res}")

    fields_meta_str = ""
    fields_types = []
    headers = []
    for schema in collection.schema.fields:
    print(schema)
    fields_types.append(schema.dtype)
    headers.append(schema.name)
    if len(fields_meta_str) != 0:
    fields_meta_str += ","
    fields_meta_str += f"{schema.name} {milvus_type_to_relyt_type[schema.dtype]}"
    if schema.dtype == DataType.VARCHAR and "max_length" in schema.params.keys():
    fields_meta_str += f"({schema.params['max_length']})"
    if schema.dtype in [DataType.FLOAT_VECTOR, DataType.SPARSE_FLOAT_VECTOR, DataType.FLOAT16_VECTOR, DataType.BINARY_VECTOR]:
    fields_meta_str += f"({schema.params['dim']})"
    if schema.is_primary:
    fields_meta_str += " PRIMARY KEY"

    create_table_sql = f"CREATE TABLE {collection.name} " \
    f" ({fields_meta_str}) using heap;"

    with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f:
    f.write(create_table_sql)

    print(create_table_sql)

    print(headers)

    total_num = collection.num_entities
    collection.load()
    query_iterator = collection.query_iterator(batch_size=1000, expr="", output_fields=headers)

    def write_to_csv_file(col_names, data):
    if len(results) == 0:
    return
    nonlocal file_cnt
    assert(file_cnt <= 1e9)
    output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv")
    with open(output_file_name, "w", newline="") as csv_file:
    # write header
    csv_write_header(col_names, csv_file)
    # write data
    csv_write_rows(data, csv_file, fields_types)
    file_cnt += 1
    results.clear()

    with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar:
    while True:
    res = query_iterator.next()
    if len(res) == 0:
    print("Query iteration finished, closing...")
    query_iterator.close()
    break
    for row in res:
    row_list = []
    for i in range(len(headers)):
    field = row[headers[i]]
    if isinstance(field, list) and fields_types[i] != DataType.BINARY_VECTOR:
    row_list.append("{" + ", ".join(str(x) for x in field) + "}")
    elif isinstance(field, dict):
    row_list.append(json.dumps(field, ensure_ascii=False))
    else:
    row_list.append(field)
    results.append(row_list)
    if len(results) >= export_config["max_line_in_file"]:
    write_to_csv_file(headers, data=results)
    pbar.update(1)

    write_to_csv_file(headers, data=results)


    if __name__ == "__main__":
    for name in config["export"]["collections"]:
    dump_collection(name)

    Following is the content of the export configuration file milvus2csv.yaml:

    milvus:
    uri: 'http://<host_name>'
    user: '<username>' # Username
    password: '<password>' # Password
    db_name: '<db_name>' # Milvus database name
    token: '<your_token>' # Access token

    export:
    collections:
    - 'demo_collection'
    - 'customized_setup'
    - 'demo_collection2'
    # - 'hello_milvus'
    # - 'car'
    # - 'medium_articles_with_dynamic'
    # List all collections to export
    max_line_in_file: 40000 # The max number of lines per file for export
    output_path: './output' # Export destination directory, e.g., ./output
  2. Save export script export.py, export configuration file milvus2csv.yaml, and output directory output into the same directory.

    The directory structure is as shown in the following picture.

  3. Modify the milvus2csv.yaml configuration file based on your Milvus cluster details.

  4. Run the Python script and check the output.

    The output structure is as shown in the following picture.

Step 2. Import data into a Relyt Vector database

  1. Create an import script named import.py and an import configuration file named csv2relyt.yaml, and prepare the data to be imported (the output directory obtained in Step 1).

    info

    The names of the import script and configuration file are just examples and can be adjusted as needed.

    Following is the content of import script import.py:

    import psycopg2
    import yaml
    import glob


    import os

    if __name__ == "__main__":
    with open('csv2relyt.yaml', 'r') as config_file:
    config = yaml.safe_load(config_file)

    print("current config:" + str(config))

    db_host = config['database']['host']
    db_port = config['database']['port']
    db_name = config['database']['name']
    schema_name = config['database']['schema']
    db_user = config['database']['user']
    db_password = config['database']['password']
    data_path = config['data_path']

    conn = psycopg2.connect(
    host=db_host,
    port=db_port,
    database=db_name,
    user=db_user,
    password=db_password,
    options=f'-c search_path={schema_name},public'
    )

    cur = conn.cursor()

    # check schema
    cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,))
    existing_schema = cur.fetchone()
    if existing_schema:
    print(f"Schema {schema_name} already exists.")
    else:
    # create schema
    cur.execute(f"CREATE SCHEMA {schema_name}")
    print(f"Created schema: {schema_name}")

    for table_name in os.listdir(data_path):
    table_folder = os.path.join(data_path, table_name)
    print(f"Begin processing table: {table_name}")
    if os.path.isdir(table_folder):
    create_table_file = os.path.join(table_folder, 'create_table.sql')
    with open(create_table_file, 'r') as file:
    create_table_sql = file.read()
    try:
    cur.execute(create_table_sql)
    except psycopg2.errors.DuplicateTable as e:
    print(e)
    conn.rollback()
    continue
    print(f"Created table: {table_name}")

    cnt = 0
    csv_files = glob.glob(os.path.join(table_folder, '*.csv'))
    for csv_file in csv_files:
    with open(csv_file, 'r') as file:
    copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER"
    cur.copy_expert(copy_command, file)
    cnt += 1
    print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done")

    conn.commit()
    print(f"Finished importing table: {table_name}")
    print(' # ' * 60)

    cur.close()
    conn.close()

    Following is the content of import configuration file csv2relyt.yaml:

    database:
    host: "<host_name>" # The host name of the target DW service unit of the Vector edition for public access.
    port: 5432 # The port number.
    name: "<database_name>" # The name of the Relyt vector database.
    user: "<username>" # The username of the Relyt cloud account or DW user.
    password: "<password>" # The password.
    schema: "public" # The name of the schema in the Relyt vector database to which data is imported, for example public. If the specified schema does not exist, it will automatically be created.

    data_path: "./output" # The path to the data to import.
  2. Configure the csv2relyt.yaml configuration file based on your Relyt DW service unit information.

  3. Run the Python script.

  4. Check the database in the DW service unit of the Vector edition to ensure the data has been imported correctly.

  5. Recreate the required indexes. For detailed instructions, see Indexing.