Skip to content

Commit

Permalink
changed to using diff library and search entities at a batch
Browse files Browse the repository at this point in the history
  • Loading branch information
yaelibarg committed Jan 12, 2025
1 parent a5d976b commit e277e26
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 27 deletions.
53 changes: 39 additions & 14 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,50 @@ async def _register_resource_raw(
results: list[dict[Any, Any]],
user_agent_type: UserAgentType,
parse_all: bool = False,
send_raw_data_examples_amount: int = 0,
entities_at_port_with_properties: list[Entity] | None = None,
send_raw_data_examples_amount: int = 0
) -> CalculationResult:
objects_diff = await self._calculate_raw(
[(resource, results)], parse_all, send_raw_data_examples_amount
)
query = {
"combinator": "and",
"rules": [
{
"combinator": "or",
"rules": [
{
"property": "$identifier",
"operator": "=",
"value": entity.identifier,
}
for entity in objects_diff[0].entity_selector_diff.passed
]
}
]
}
entities_at_port_with_properties = await ocean.port_client.search_entities(
user_agent_type,
include_params=["blueprint", "identifier"] + [
f"properties.{prop}" for prop in resource.port.entity.mappings.properties
],
query=query
)
unique_entities = get_unique_entities(objects_diff[0].entity_selector_diff.passed, entities_at_port_with_properties)
modified_objects = []

if unique_entities:
logger.bind(
changed_entities=len(unique_entities),
total_entities=len(objects_diff[0].entity_selector_diff.passed),
).info("Upserting changed entities")
modified_objects = await self.entities_state_applier.upsert(
unique_entities, user_agent_type
)
else:
logger.bind(
total_entities=len(objects_diff[0].entity_selector_diff.passed),
).info("no changed entities, not upserting")

return CalculationResult(
objects_diff[0].entity_selector_diff._replace(passed=modified_objects),
errors=objects_diff[0].errors,
Expand Down Expand Up @@ -177,7 +208,7 @@ async def _unregister_resource_raw(
return entities_selector_diff.passed, errors

async def _register_in_batches(
self, resource_config: ResourceConfig, user_agent_type: UserAgentType, entities_at_port_with_properties: list[Entity]
self, resource_config: ResourceConfig, user_agent_type: UserAgentType
) -> tuple[list[Entity], list[Exception]]:
results, errors = await self._get_resource_raw_results(resource_config)
async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = []
Expand All @@ -191,14 +222,14 @@ async def _register_in_batches(
send_raw_data_examples_amount = (
SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0
)

passed_entities = []
if raw_results:
all_entities, register_errors,_ = await self._register_resource_raw(
resource_config,
raw_results,
user_agent_type,
send_raw_data_examples_amount=send_raw_data_examples_amount,
entities_at_port_with_properties=entities_at_port_with_properties,
send_raw_data_examples_amount=send_raw_data_examples_amount
)
errors.extend(register_errors)
passed_entities = list(all_entities.passed)
Expand All @@ -215,8 +246,7 @@ async def _register_in_batches(
resource_config,
items,
user_agent_type,
send_raw_data_examples_amount=send_raw_data_examples_amount,
entities_at_port_with_properties=entities_at_port_with_properties,
send_raw_data_examples_amount=send_raw_data_examples_amount
)
errors.extend(register_errors)
passed_entities.extend(entities.passed)
Expand Down Expand Up @@ -475,15 +505,10 @@ async def sync_raw_all(
for resource in app_config.resources:
# create resource context per resource kind, so resync method could have access to the resource
# config as we might have multiple resources in the same event
entities_at_port_with_properties = await ocean.port_client.search_entities(
user_agent_type,
include_params=["blueprint", "identifier"] + [
f"properties.{prop}" for prop in resource.port.entity.mappings.properties
]
)

async with resource_context(resource):
task = asyncio.get_event_loop().create_task(
self._register_in_batches(resource, user_agent_type, entities_at_port_with_properties)
self._register_in_batches(resource, user_agent_type)
)

event.on_abort(lambda: task.cancel())
Expand Down
21 changes: 8 additions & 13 deletions port_ocean/core/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import asyncio
from typing import Iterable, Any, TypeVar, Callable, Awaitable
import hashlib
import json

from loguru import logger
from pydantic import parse_obj_as, ValidationError
from deepdiff import DeepDiff

from port_ocean.clients.port.client import PortClient
from port_ocean.core.models import Entity, Runtime
Expand Down Expand Up @@ -110,14 +109,14 @@ def get_port_diff(before: Iterable[Entity], after: Iterable[Entity]) -> EntityPo

def are_entities_equal(first_entity: Entity, second_entity: Entity) -> bool:
"""
Compare two entities by their identifier, blueprint, and a hash of their properties.
Compare two entities by their identifier, blueprint, and properties using DeepDiff.
Args:
first_entity: First entity to compare
second_entity: Second entity to compare
Returns:
bool: True if entities have same identifier, blueprint and properties hash
bool: True if entities have same identifier, blueprint and properties
"""
# First check identifiers and blueprints
if (
Expand All @@ -126,15 +125,11 @@ def are_entities_equal(first_entity: Entity, second_entity: Entity) -> bool:
):
return False

# Create deterministic JSON strings of properties
first_props = json.dumps(first_entity.properties, sort_keys=True)
second_props = json.dumps(second_entity.properties, sort_keys=True)

# Create hashes
first_hash = hashlib.sha256(first_props.encode()).hexdigest()
second_hash = hashlib.sha256(second_props.encode()).hexdigest()

return first_hash == second_hash
# Compare properties using DeepDiff
diff = DeepDiff(
first_entity.properties, second_entity.properties, ignore_order=True
)
return not diff


def get_unique_entities(
Expand Down

0 comments on commit e277e26

Please sign in to comment.