Skip to content

Commit

Permalink
Reorganized recursion and duplicate entry storage
Browse files Browse the repository at this point in the history
Signed-off-by: Lalith Kota <[email protected]>
  • Loading branch information
lalithkota committed Oct 18, 2024
1 parent d0d97dd commit 8d3e2a5
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 59 deletions.
2 changes: 2 additions & 0 deletions openg2p-deduplicator/src/openg2p_deduplicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ class Settings(BaseSettings):
index_name_dedupe_requests: str = "g2p_dedupe_requests"
index_name_duplicates: str = "g2p_dedupe_duplicates"

duplicate_entry_id_joiner: str = "<->"

dedupe_runner_initial_delay_secs: int = 5
dedupe_runner_interval_secs: int = 10
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from openg2p_fastapi_common.controller import BaseController

from ..schemas.get_duplicates_response import GetDuplicatesHttpResponse
from ..schemas.get_duplicates_response import GetDuplicatesHttpResponse, HttpDuplicateEntry
from ..services.deduplication_service import DeduplicationService


Expand All @@ -27,4 +27,12 @@ def deduplication_service(self):

def get_duplicates_by_id(self, doc_id: str):
res = self.deduplication_service.get_duplicates_by_doc_id(doc_id=doc_id)
return GetDuplicatesHttpResponse(duplicates=res.duplicates)
res = [
HttpDuplicateEntry(
id=entry.duplicate_id,
match_score=entry.match_score,
last_dedupe_request_id=entry.last_dedupe_request_id,
)
for entry in res
]
return GetDuplicatesHttpResponse(duplicates=res)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from enum import Enum

from pydantic import BaseModel, field_serializer
from pydantic import BaseModel


class DeduplicationStatus(Enum):
Expand Down Expand Up @@ -38,7 +38,3 @@ class DeduplicateRequestEntry(BaseModel):
wait_before_exec_secs: int
created_at: datetime
updated_at: datetime | None = None

@field_serializer("status")
def status_serialize(self, status: DeduplicationStatus):
return status.value
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from pydantic import BaseModel


class DuplicateEntry(BaseModel):
id: str
class StoredDuplicateEntry(BaseModel):
original_id: str
duplicate_id: str
match_score: float
last_dedupe_request_id: str


class StoredDuplicates(BaseModel):
duplicates: list[DuplicateEntry]
class HttpDuplicateEntry(BaseModel):
id: str
match_score: float
last_dedupe_request_id: str


class GetDuplicatesHttpResponse(StoredDuplicates):
pass
class GetDuplicatesHttpResponse(BaseModel):
duplicates: list[HttpDuplicateEntry]
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def add_or_update_config(self, config: DedupeConfig):
self.opensearch_client.index(
index=_config.index_name_dedupe_configs,
id=urllib.parse.quote(config.name, safe=""),
body=config.model_dump(),
body=config.model_dump(mode="json"),
timeout=_config.opensearch_api_timeout,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ..config import Settings
from ..schemas.config_request import DedupeConfig, DedupeConfigFieldQueryType
from ..schemas.deduplicate_request import DeduplicateRequestEntry, DeduplicationStatus
from ..schemas.get_duplicates_response import DuplicateEntry, StoredDuplicates
from ..schemas.get_duplicates_response import StoredDuplicateEntry
from ..services.config_service import DedupeConfigService
from ..services.opensearch_service import OpenSearchClientService

Expand Down Expand Up @@ -60,7 +60,7 @@ def create_dedupe_request(
status_description="Deduplication in progress",
wait_before_exec_secs=wait_before_exec_secs,
created_at=datetime.now(),
).model_dump(),
).model_dump(mode="json"),
)
return status

Expand All @@ -76,16 +76,17 @@ def update_dedupe_request(self, request_id: str, key_value: dict):
timeout=_config.opensearch_api_timeout,
)

def get_duplicates_by_doc_id(self, doc_id: str) -> StoredDuplicates:
def get_duplicates_by_doc_id(self, doc_id: str) -> list[StoredDuplicateEntry]:
try:
res = self.opensearch_client.get_source(
res = self.opensearch_client.search(
index=_config.index_name_duplicates,
id=doc_id,
timeout=_config.opensearch_api_timeout,
body={"query": {"term": {"original_id": {"value": doc_id}}}},
)
res = res.get("hits", {}).get("hits", [])
except Exception:
res = {"duplicates": []}
res = StoredDuplicates.model_validate(res)
res = []
res = [StoredDuplicateEntry.model_validate(entry.get("_source", {})) for entry in res]
return res

def run_dedupe_job(self):
Expand Down Expand Up @@ -142,15 +143,17 @@ def run_dedupe_task(self):
)
return
# Find nested duplicates with the give config and update their entries
no_of_dups = self.find_and_update_duplicates_by_doc_id(
duplicates = self.find_duplicates_by_doc_id(
dedupe_config, dedupe_request.id, dedupe_request.doc_id
)
# Create duplicate entries
self.create_duplicate_entries(dedupe_request.doc_id, duplicates, dedupe_request.id)
# Update request status
self.update_dedupe_request(
dedupe_request.id,
{
"status": DeduplicationStatus.completed.value,
"status_description": f"Deduplication Complete. {no_of_dups} found.",
"status_description": f"Deduplication Complete. {len(duplicates or [])} found.",
"updated_at": datetime.now(),
},
)
Expand All @@ -166,9 +169,7 @@ def run_dedupe_task(self):
)
pass

def find_and_update_duplicates_by_doc_id(
self, dedupe_config: DedupeConfig, dedupe_request_id, doc_id, already_updated_docs: list = None
):
def find_duplicates_by_doc_id(self, dedupe_config: DedupeConfig, dedupe_request_id, doc_id):
# Get Record with the given id
query_time = datetime.now()
try:
Expand All @@ -186,18 +187,18 @@ def find_and_update_duplicates_by_doc_id(
"updated_at": query_time,
},
)
return -1
return []
# Construct match query with all fields. And search for other records
query_list = []
for field in dedupe_config.fields:
if field.query_type == DedupeConfigFieldQueryType.match:
query = {
match_query = {
"query": record.get(field.name),
"boost": field.boost,
}
if field.fuzziness:
query["fuzziness"] = field.fuzziness
query_list.append({"match": {field.name: query}})
match_query["fuzziness"] = field.fuzziness
query_list.append({"match": {field.name: match_query}})
elif field.query_type == DedupeConfigFieldQueryType.term:
query_list.append(
{
Expand All @@ -218,35 +219,33 @@ def find_and_update_duplicates_by_doc_id(
duplicates_res = duplicates_res.get("hits", {}).get("hits", [])
duplicates_res = [duplicate for duplicate in duplicates_res if duplicate.get("_id") != doc_id]
# Update duplicates in the current record and all other records
self.opensearch_client.index(
index=_config.index_name_duplicates,
id=doc_id,
body=StoredDuplicates(
duplicates=[
DuplicateEntry(
id=duplicate.get("_id"),
match_score=duplicate.get("_score"),
last_dedupe_request_id=dedupe_request_id,
)
for duplicate in duplicates_res
if (not dedupe_config.score_threshold)
or dedupe_config.score_threshold <= duplicate.get("_score")
]
).model_dump(),
timeout=_config.opensearch_api_timeout,
)
already_updated_docs = already_updated_docs or []
already_updated_docs.append(doc_id)
for duplicate in duplicates_res:
if (not dedupe_config.score_threshold) or (
dedupe_config.score_threshold <= duplicate.get("_score")
):
duplicate_id = duplicate.get("_id")
if duplicate_id not in already_updated_docs:
self.find_and_update_duplicates_by_doc_id(
dedupe_config, dedupe_request_id, duplicate_id, already_updated_docs
)
return len(duplicates_res)
return duplicates_res

def create_duplicate_entries(self, doc_id: str, duplicates: list, request_id: str):
for dup in duplicates:
# Create to and fro duplicate entries
self.opensearch_client.index(
index=_config.index_name_duplicates,
id=f"{doc_id}{_config.duplicate_entry_id_joiner}{dup.get('_id')}",
timeout=_config.opensearch_api_timeout,
body=StoredDuplicateEntry(
original_id=doc_id,
duplicate_id=dup.get("_id"),
match_score=dup.get("_score"),
last_dedupe_request_id=request_id,
).model_dump(mode="json"),
)
self.opensearch_client.index(
index=_config.index_name_duplicates,
id=f"{dup.get('_id')}{_config.duplicate_entry_id_joiner}{doc_id}",
timeout=_config.opensearch_api_timeout,
body=StoredDuplicateEntry(
original_id=dup.get("_id"),
duplicate_id=doc_id,
match_score=dup.get("_score"),
last_dedupe_request_id=request_id,
).model_dump(mode="json"),
)

def is_runner_thread_alive(self):
return self.dedupe_runner.is_alive()

0 comments on commit 8d3e2a5

Please sign in to comment.