Database Reference
bear.db
get_milvus_client(db_name=config.MILVUS_DB_NAME)
Get or create Milvus client.
Source code in bear/db.py
| def get_milvus_client(db_name: str = config.MILVUS_DB_NAME) -> MilvusClient:
"""Get or create Milvus client."""
uri = f"http://{config.MILVUS_HOST}:{config.MILVUS_PORT}"
token = config.MILVUS_TOKEN if config.MILVUS_TOKEN else ""
client = MilvusClient(uri=uri, token=str(token))
client.use_database(db_name)
return client
|
create_milvus_collection(client, model, auto_id=False, enable_dynamic_field=True)
Create a Milvus collection for the given model.
Source code in bear/db.py
| def create_milvus_collection(client: MilvusClient, model: type[Resource], auto_id: bool = False, enable_dynamic_field: bool = True) -> None:
"""Create a Milvus collection for the given model."""
if model not in ALL_RESOURCES:
raise ValueError(f"Model {model} is not registered in bear.model.ALL_MODELS.")
collection_name = model.__name__.lower() # Not instantiated, just use the class name
if client.has_collection(collection_name):
logger.info(f"Collection '{collection_name}' already exists. Skipping creation.")
return
# Initialize collection with schema and index parameters
schema = client.create_schema(auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
index_params = client.prepare_index_params()
for field_name, field_info in model.model_fields.items():
assert len(field_info.metadata) == 1, f"Field {field_name} should have exactly one metadata entry."
milvus_metadata = field_info.metadata[0].json_schema
if "index_configs" in milvus_metadata:
index_config = milvus_metadata.pop("index_configs")
logger.info(f"Adding index for field {field_name} with config {index_config}")
index_params.add_index(field_name=field_name, **index_config)
logger.info(f"Adding field {field_name} with schema {milvus_metadata}")
schema.add_field(field_name=field_name, **milvus_metadata)
logger.info(f"Creating collection '{collection_name}' with schema and index parameters.")
client.create_collection(collection_name=collection_name, schema=schema, index_params=index_params)
|
init(db_name=config.MILVUS_DB_NAME, wipe=False)
Initialize Milvus collection.
Source code in bear/db.py
| def init(db_name: str = config.MILVUS_DB_NAME, wipe: bool = False) -> None:
"""Initialize Milvus collection."""
client = get_milvus_client(db_name=db_name)
if wipe and db_name in client.list_databases():
logger.info(f"Wiping database: {db_name}")
[client.drop_collection(x) for x in client.list_collections()]
client.drop_database(db_name=db_name)
if db_name not in client.list_databases():
logger.info(f"Creating database: {db_name}")
client.create_database(db_name=db_name)
client.use_database(db_name)
for model in ALL_RESOURCES:
create_milvus_collection(client=client, model=model)
|
push(resources, db_name=config.MILVUS_DB_NAME)
Upsert resources into Milvus. This method is slower but ensures no duplicate IDs.
Source code in bear/db.py
| def push(resources: list[ResourceType], db_name: str = config.MILVUS_DB_NAME) -> None:
"""Upsert resources into Milvus. This method is slower but ensures no duplicate IDs."""
client = get_milvus_client()
client.use_database(db_name)
collection_name = resources[0]._name
if not client.has_collection(collection_name):
raise ValueError(f"Collection '{collection_name}' does not exist. Please create it first.")
data = [resource.to_milvus() for resource in resources]
client.insert(collection_name=collection_name, data=data)
logger.info(f"Inserted {len(resources)} resources into collection '{collection_name}'.")
|
Database Management
The database module handles connections and operations with both PostgreSQL and Milvus vector database.
Features
- PostgreSQL connection management
- Milvus vector database operations
- Connection pooling
- Error handling and retries
Database Schema
Information about the database schema and table structures used by BEAR.