Skip to content

Commit

Permalink
Add filter parameter to delete (#1779)
Browse files Browse the repository at this point in the history
Commencing from Milvus 2.3.3, an enhanced functionality has been introduced to
facilitate data deletion based on expressive criteria.
In this context, the inclusion of the "filter" parameter serves as an entry point
for accessing this feature.

All rows that conform to the specified filter expression will be eliminated.

The deletion condition should either be a list of primary keys to be deleted
or a filter expression.
If both are specified or if none are specified, an exception will be thrown.

Signed-off-by: zhenshan.cao <[email protected]>
  • Loading branch information
czs007 authored Nov 10, 2023
1 parent 3f68f7d commit 1428201
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 39 deletions.
74 changes: 74 additions & 0 deletions examples/hello_milvus_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import time
import numpy as np
from pymilvus import (
MilvusClient,
exceptions
)

fmt = "\n=== {:30} ===\n"
dim = 8
collection_name = "hello_milvus"
milvus_client = MilvusClient("http://localhost:19530")
milvus_client.drop_collection(collection_name)
milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2")

print("collections:", milvus_client.list_collections())
print(f"{collection_name} :", milvus_client.describe_collection(collection_name))
rng = np.random.default_rng(seed=19530)

rows = [
{"id": 1, "vector": rng.random((1, dim))[0], "a": 1},
{"id": 2, "vector": rng.random((1, dim))[0], "b": 2},
{"id": 3, "vector": rng.random((1, dim))[0], "c": 3},
{"id": 4, "vector": rng.random((1, dim))[0], "d": 4},
{"id": 5, "vector": rng.random((1, dim))[0], "e": 5},
{"id": 6, "vector": rng.random((1, dim))[0], "f": 6},
]

print(fmt.format("Start inserting entities"))
pks = milvus_client.insert(collection_name, rows, progress_bar=True)
pks2 = milvus_client.insert(collection_name, {"id": 7, "vector": rng.random((1, dim))[0], "g": 1})
pks.extend(pks2)


def fetch_data_by_pk(pk):
print(f"get primary key {pk} from {collection_name}")
pk_data = milvus_client.get(collection_name, pk)

if pk_data:
print(f"data of primary key {pk} is", pk_data[0])
else:
print(f"data of primary key {pk} is empty")

fetch_data_by_pk(pks[2])

print(f"start to delete primary key {pks[2]} in collection {collection_name}")
milvus_client.delete(collection_name, pks = pks[2])

fetch_data_by_pk(pks[2])


fetch_data_by_pk(pks[4])
filter = "e == 5 or f == 6"
print(f"start to delete by expr {filter} in collection {collection_name}")
milvus_client.delete(collection_name, filter=filter)

fetch_data_by_pk(pks[4])

print(f"start to delete by expr '{filter}' or by primary 4 in collection {collection_name}, expect get exception")
try:
milvus_client.delete(collection_name, pks = 4, filter=filter)
except Exception as e:
assert isinstance(e, exceptions.ParamError)
print("catch exception", e)

print(f"start to delete without specify any expr '{filter}' or any primary key in collection {collection_name}, expect get exception")
try:
milvus_client.delete(collection_name)
except Exception as e:
print("catch exception", e)

result = milvus_client.query(collection_name, "", output_fields = ["count(*)"])
print(f"final entities in {collection_name} is {result[0]['count(*)']}")

milvus_client.drop_collection(collection_name)
3 changes: 3 additions & 0 deletions pymilvus/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,6 @@ class ExceptionsMessage:
"Attempt to insert an unexpected field to collection without enabling dynamic field"
)
UpsertAutoIDTrue = "Upsert don't support autoid == true"
AmbiguousDeleteFilterParam = (
"Ambiguous filter parameter, only one deletion condition can be specified."
)
69 changes: 30 additions & 39 deletions pymilvus/milvus_client/milvus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pymilvus.exceptions import (
DataTypeNotMatchException,
MilvusException,
ParamError,
PrimaryKeyException,
)
from pymilvus.orm import utility
Expand Down Expand Up @@ -58,9 +59,7 @@ def __init__(
self._using = self._create_connection(
uri, user, password, db_name, token, timeout=timeout, **kwargs
)
self.is_self_hosted = bool(
utility.get_server_type(using=self._using) == "milvus",
)
self.is_self_hosted = bool(utility.get_server_type(using=self._using) == "milvus")

def create_collection(
self,
Expand Down Expand Up @@ -104,10 +103,7 @@ def create_collection(
except Exception as ex:
logger.error("Failed to create collection: %s", collection_name)
raise ex from ex
index_params = {
"metric_type": metric_type,
"params": {},
}
index_params = {"metric_type": metric_type, "params": {}}
self._create_index(collection_name, vector_field_name, index_params, timeout=timeout)
self._load(collection_name, timeout=timeout)

Expand All @@ -121,21 +117,10 @@ def _create_index(
"""Create a index on the collection"""
conn = self._get_connection()
try:
conn.create_index(
collection_name,
vec_field_name,
index_params,
timeout=timeout,
)
logger.debug(
"Successfully created an index on collection: %s",
collection_name,
)
conn.create_index(collection_name, vec_field_name, index_params, timeout=timeout)
logger.debug("Successfully created an index on collection: %s", collection_name)
except Exception as ex:
logger.error(
"Failed to create an index on collection: %s",
collection_name,
)
logger.error("Failed to create an index on collection: %s", collection_name)
raise ex from ex

def insert(
Expand Down Expand Up @@ -195,9 +180,7 @@ def insert(
pks.extend(res.primary_keys)
except Exception as ex:
logger.error(
"Failed to insert batch starting at entity: %s/%s",
str(i),
str(len(data)),
"Failed to insert batch starting at entity: %s/%s", str(i), str(len(data))
)
raise ex from ex

Expand Down Expand Up @@ -370,8 +353,9 @@ def get(
def delete(
self,
collection_name: str,
pks: Union[list, str, int],
pks: Optional[Union[list, str, int]] = None,
timeout: Optional[float] = None,
filter: Optional[str] = "",
**kwargs,
):
"""Delete entries in the collection by their pk.
Expand All @@ -390,25 +374,35 @@ def delete(
Args:
pks (list, str, int): The pk's to delete. Depending on pk_field type it can be int
or str or alist of either.
or str or alist of either. Default to None.
filter(str, optional): A filter to use for the deletion. Defaults to empty.
timeout (int, optional): Timeout to use, overides the client level assigned at init.
Defaults to None.
"""

if isinstance(pks, (int, str)):
pks = [pks]

if len(pks) == 0:
return []

expr = ""
conn = self._get_connection()
try:
schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs)
except Exception as ex:
logger.error("Failed to describe collection: %s", collection_name)
raise ex from ex
if pks:
try:
schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs)
except Exception as ex:
logger.error("Failed to describe collection: %s", collection_name)
raise ex from ex

expr = self._pack_pks_expr(schema_dict, pks)

if filter:
if expr:
raise ParamError(message=ExceptionsMessage.AmbiguousDeleteFilterParam)

if not isinstance(filter, str):
raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter))

expr = filter

expr = self._pack_pks_expr(schema_dict, pks)
ret_pks = []
try:
res = conn.delete(collection_name, expr, timeout=timeout, **kwargs)
Expand Down Expand Up @@ -600,8 +594,5 @@ def _load(self, collection_name: str, timeout: Optional[float] = None):
try:
conn.load_collection(collection_name, timeout=timeout)
except MilvusException as ex:
logger.error(
"Failed to load collection: %s",
collection_name,
)
logger.error("Failed to load collection: %s", collection_name)
raise ex from ex

0 comments on commit 1428201

Please sign in to comment.