diff --git a/flowrun/storage/elasticsearch.py b/flowrun/storage/elasticsearch.py index 4d671d0..bead1ca 100644 --- a/flowrun/storage/elasticsearch.py +++ b/flowrun/storage/elasticsearch.py @@ -10,14 +10,15 @@ class FlowRunElasticSearch(GenericStorage): def get_by_pk(self, identifier: str) -> FlowRunElasticSearchDTO: try: - es_flow_run = get_connection().get(index=self._index_name, uuid=identifier)[ - "_source" - ] - except (AttributeError, TypeError) as err: + es_flow_run = get_connection().search( + index=self._index_name, + body={"size": 1, "query": {"terms": {"uuid": identifier}}}, + )["hits"]["hits"][0]["_source"] + except (AttributeError, TypeError, IndexError) as err: print("[-] Elasticsearch Get error: ", type(err), err) return None return es_flow_run # FlowRunElasticSearchDTO.from_dict(es_flow_run) def insert(self, new_obj: dict) -> bool: es_flow_run = get_connection().index(index=self._index_name, body=new_obj) - return es_flow_run.get("ok") + return es_flow_run["result"] == "created" diff --git a/shared/processors.py b/shared/processors.py index 01b2dc4..9c48de0 100644 --- a/shared/processors.py +++ b/shared/processors.py @@ -16,14 +16,16 @@ def execute(self, object_identifier: str): return True # [E]xtract the obj instance from the From Storage - from_obj = self.storage_from.get_by_pk(object_identifier) - if from_obj is None: - return False + from_obj: dict = self.storage_from.get_by_pk(object_identifier) + if ( + from_obj is None + ): # if the object with the indetifier does not exist, do nothing with it + return True # [T]ransform the object to be saved in the new storage - transformed_obj = self.object_transformer(from_obj) + transformed_obj: dict = self.object_transformer(from_obj) # [L]oad the treated object into the new storage - new_obj = self.storage_to.insert(transformed_obj) + is_inserted: bool = self.storage_to.insert(transformed_obj) - return bool(new_obj) # if None will return False else True + return is_inserted