Skip to content

Commit

Permalink
feat: fix elasticsearch flowrun get_by_pk
Browse files Browse the repository at this point in the history
  • Loading branch information
helllllllder committed May 13, 2024
1 parent b00a0c1 commit c86b51b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
11 changes: 6 additions & 5 deletions flowrun/storage/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ class FlowRunElasticSearch(GenericStorage):

def get_by_pk(self, identifier: str) -> FlowRunElasticSearchDTO:
try:
es_flow_run = get_connection().get(index=self._index_name, uuid=identifier)[
"_source"
]
except (AttributeError, TypeError) as err:
es_flow_run = get_connection().search(
index=self._index_name,
body={"size": 1, "query": {"terms": {"uuid": identifier}}},
)["hits"]["hits"][0]["_source"]
except (AttributeError, TypeError, IndexError) as err:
print("[-] Elasticsearch Get error: ", type(err), err)
return None
return es_flow_run # FlowRunElasticSearchDTO.from_dict(es_flow_run)

def insert(self, new_obj: dict) -> bool:
es_flow_run = get_connection().index(index=self._index_name, body=new_obj)
return es_flow_run.get("ok")
return es_flow_run["result"] == "created"
14 changes: 8 additions & 6 deletions shared/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ def execute(self, object_identifier: str):
return True

# [E]xtract the obj instance from the From Storage
from_obj = self.storage_from.get_by_pk(object_identifier)
if from_obj is None:
return False
from_obj: dict = self.storage_from.get_by_pk(object_identifier)
if (
from_obj is None
): # if the object with the indetifier does not exist, do nothing with it
return True

# [T]ransform the object to be saved in the new storage
transformed_obj = self.object_transformer(from_obj)
transformed_obj: dict = self.object_transformer(from_obj)

# [L]oad the treated object into the new storage
new_obj = self.storage_to.insert(transformed_obj)
is_inserted: bool = self.storage_to.insert(transformed_obj)

return bool(new_obj) # if None will return False else True
return is_inserted

0 comments on commit c86b51b

Please sign in to comment.