Migrate On-premises Milvus to Relyt
This guide offers best practices for migrating data from a local Milvus cluster to the DW service units of the Vector edition using Python.
Prerequisites
-
Your Milvus cluster is version 2.3.x or later.
-
Python 3.8 or higher has been installed.
-
The following Python libraries have been installed:
-
psycopg2
-
pymilvus-2.3.0
-
pyaml
-
tqdm
If not, run the following commands to install them:
pip install psycopg2
pip install pymilvus==2.3.0
pip install pyaml
pip install tqdm -
Migrate a Milvus database to Relyt
Step 1. Export Milvus data
-
Create an export script named
export.py
and an export configuration file namedmilvus2csv.yaml
, along with an output directory namedoutput
.infoThe names of the export script, configuration file, and output directory are for reference only and can be adjusted as needed.
Following is content of the export script
export.py
:import yaml
import json
from pymilvus import (
connections,
DataType,
Collection,
)
import os
from tqdm import tqdm
with open("./milvus2csv.yaml", "r") as f:
config = yaml.safe_load(f)
print("configuration:")
print(config)
milvus_config = config["milvus"]
milvus_type_to_relyt_type = {
DataType.BOOL: "bool",
DataType.INT8: "smallint",
DataType.INT16: "smallint",
DataType.INT32: "integer",
DataType.INT64: "bigint",
DataType.FLOAT: "real",
DataType.DOUBLE: "double precision",
DataType.STRING: "text",
DataType.VARCHAR: "varchar",
DataType.JSON: "json",
DataType.BINARY_VECTOR: "bvector",
DataType.FLOAT_VECTOR: "vector",
DataType.FLOAT16_VECTOR: "vecf16",
DataType.SPARSE_FLOAT_VECTOR: "svector",
}
def convert_to_binary(binary_data):
decimal_value = int.from_bytes(binary_data, byteorder='big')
binary_string = bin(decimal_value)[2:].zfill(len(binary_data) * 8)
return ','.join(list(binary_string))
def data_convert_to_str(data, dtype, delimeter):
if dtype == DataType.BOOL:
return "1" if data else "0"
elif dtype in [DataType.INT8, DataType.INT16,
DataType.INT32, DataType.INT64,
DataType.FLOAT, DataType.DOUBLE]:
return str(data)
elif dtype in [DataType.STRING, DataType.VARCHAR]:
return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"")
elif dtype == DataType.JSON:
return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"")
elif dtype in [DataType.FLOAT_VECTOR, DataType.SPARSE_FLOAT_VECTOR, DataType.FLOAT16_VECTOR, DataType.BINARY_VECTOR]:
return "[" + data[1:-1] + "]"
Exception(f"Unsupported DataType {dtype}")
def csv_write_rows(datum, fd, fields_types, delimiter="|"):
for data in datum:
for i in range(len(data)):
ftype = fields_types[i]
data[i] = data_convert_to_str(data[i], ftype, delimiter)
fd.write(delimiter.join(data) + "\n")
def csv_write_header(headers, fd, delimiter="|"):
fd.write(delimiter.join(headers) + "\n")
def dump_collection(collection_name: str):
results = []
file_cnt = 0
print("connecting to milvus...")
connections.connect("default", **milvus_config)
export_config = config["export"]
collection = Collection(collection_name)
collection.load()
tmp_path = os.path.join(export_config["output_path"], collection_name)
if not os.path.exists(tmp_path):
os.mkdir(tmp_path)
enable_dynamic_field = collection.schema.enable_dynamic_field
if enable_dynamic_field:
print("Dynamic field is enabled, there may be additional schemas for data.")
res = collection.query(expr='', output_fields=["*"], limit=1)
if len(res[0]) != len(collection.schema.fields):
print("The schema of the collection is not consistent with the schema of the data. "
"Please check the schema of the collection and the schema of the data.")
print(f"The schema of the collection is:{res}")
fields_meta_str = ""
fields_types = []
headers = []
for schema in collection.schema.fields:
print(schema)
fields_types.append(schema.dtype)
headers.append(schema.name)
if len(fields_meta_str) != 0:
fields_meta_str += ","
fields_meta_str += f"{schema.name} {milvus_type_to_relyt_type[schema.dtype]}"
if schema.dtype == DataType.VARCHAR and "max_length" in schema.params.keys():
fields_meta_str += f"({schema.params['max_length']})"
if schema.dtype in [DataType.FLOAT_VECTOR, DataType.SPARSE_FLOAT_VECTOR, DataType.FLOAT16_VECTOR, DataType.BINARY_VECTOR]:
fields_meta_str += f"({schema.params['dim']})"
if schema.is_primary:
fields_meta_str += " PRIMARY KEY"
create_table_sql = f"CREATE TABLE {collection.name} " \
f" ({fields_meta_str}) using heap;"
with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f:
f.write(create_table_sql)
print(create_table_sql)
print(headers)
total_num = collection.num_entities
collection.load()
query_iterator = collection.query_iterator(batch_size=1000, expr="", output_fields=headers)
def write_to_csv_file(col_names, data):
if len(results) == 0:
return
nonlocal file_cnt
assert(file_cnt <= 1e9)
output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv")
with open(output_file_name, "w", newline="") as csv_file:
# write header
csv_write_header(col_names, csv_file)
# write data
csv_write_rows(data, csv_file, fields_types)
file_cnt += 1
results.clear()
with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar:
while True:
res = query_iterator.next()
if len(res) == 0:
print("Query iteration finished, closing...")
query_iterator.close()
break
for row in res:
row_list = []
for i in range(len(headers)):
field = row[headers[i]]
if isinstance(field, list) and fields_types[i] != DataType.BINARY_VECTOR:
row_list.append("{" + ", ".join(str(x) for x in field) + "}")
elif isinstance(field, dict):
row_list.append(json.dumps(field, ensure_ascii=False))
else:
row_list.append(field)
results.append(row_list)
if len(results) >= export_config["max_line_in_file"]:
write_to_csv_file(headers, data=results)
pbar.update(1)
write_to_csv_file(headers, data=results)
if __name__ == "__main__":
for name in config["export"]["collections"]:
dump_collection(name)Following is the content of the export configuration file
milvus2csv.yaml
:milvus:
uri: 'http://<host_name>'
user: '<username>' # Username
password: '<password>' # Password
db_name: '<db_name>' # Milvus database name
token: '<your_token>' # Access token
export:
collections:
- 'demo_collection'
- 'customized_setup'
- 'demo_collection2'
# - 'hello_milvus'
# - 'car'
# - 'medium_articles_with_dynamic'
# List all collections to export
max_line_in_file: 40000 # The max number of lines per file for export
output_path: './output' # Export destination directory, e.g., ./output -
Save export script
export.py
, export configuration filemilvus2csv.yaml
, and output directoryoutput
into the same directory.The directory structure is as shown in the following picture.
-
Modify the
milvus2csv.yaml
configuration file based on your Milvus cluster details. -
Run the Python script and check the output.
The output structure is as shown in the following picture.
Step 2. Import data into a Relyt Vector database
-
Create an import script named
import.py
and an import configuration file namedcsv2relyt.yaml
, and prepare the data to be imported (theoutput
directory obtained in Step 1).infoThe names of the import script and configuration file are just examples and can be adjusted as needed.
Following is the content of import script
import.py
:import psycopg2
import yaml
import glob
import os
if __name__ == "__main__":
with open('csv2relyt.yaml', 'r') as config_file:
config = yaml.safe_load(config_file)
print("current config:" + str(config))
db_host = config['database']['host']
db_port = config['database']['port']
db_name = config['database']['name']
schema_name = config['database']['schema']
db_user = config['database']['user']
db_password = config['database']['password']
data_path = config['data_path']
conn = psycopg2.connect(
host=db_host,
port=db_port,
database=db_name,
user=db_user,
password=db_password,
options=f'-c search_path={schema_name},public'
)
cur = conn.cursor()
# check schema
cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,))
existing_schema = cur.fetchone()
if existing_schema:
print(f"Schema {schema_name} already exists.")
else:
# create schema
cur.execute(f"CREATE SCHEMA {schema_name}")
print(f"Created schema: {schema_name}")
for table_name in os.listdir(data_path):
table_folder = os.path.join(data_path, table_name)
print(f"Begin processing table: {table_name}")
if os.path.isdir(table_folder):
create_table_file = os.path.join(table_folder, 'create_table.sql')
with open(create_table_file, 'r') as file:
create_table_sql = file.read()
try:
cur.execute(create_table_sql)
except psycopg2.errors.DuplicateTable as e:
print(e)
conn.rollback()
continue
print(f"Created table: {table_name}")
cnt = 0
csv_files = glob.glob(os.path.join(table_folder, '*.csv'))
for csv_file in csv_files:
with open(csv_file, 'r') as file:
copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER"
cur.copy_expert(copy_command, file)
cnt += 1
print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done")
conn.commit()
print(f"Finished importing table: {table_name}")
print(' # ' * 60)
cur.close()
conn.close()Following is the content of import configuration file
csv2relyt.yaml
:database:
host: "<host_name>" # The host name of the target DW service unit of the Vector edition for public access.
port: 5432 # The port number.
name: "<database_name>" # The name of the Relyt vector database.
user: "<username>" # The username of the Relyt cloud account or DW user.
password: "<password>" # The password.
schema: "public" # The name of the schema in the Relyt vector database to which data is imported, for example public. If the specified schema does not exist, it will automatically be created.
data_path: "./output" # The path to the data to import. -
Configure the
csv2relyt.yaml
configuration file based on your Relyt DW service unit information. -
Run the Python script.
-
Check the database in the DW service unit of the Vector edition to ensure the data has been imported correctly.
-
Recreate the required indexes. For detailed instructions, see Indexing.