diff --git a/brandenburg/models/batch.py b/brandenburg/models/batch.py index 7a71380..1a556f7 100644 --- a/brandenburg/models/batch.py +++ b/brandenburg/models/batch.py @@ -3,6 +3,7 @@ from typing import List, Dict, Union, Optional # Third party imports +from dateutil.parser import parse from pydantic import BaseModel, PrivateAttr, Field, validator # Local application imports @@ -54,6 +55,9 @@ class BatchModel(BaseModel): action: str = Field( title="This will always be upsert.", choices=(("upsert", "batch")), default="upsert", ) + sequence_pointer_field: str = Field( + ..., title="""The datetime/timestamp field to be used as data pointer. E.g: updated_at""" + ) _sdc_received_at: str = PrivateAttr() _sdc_sequence: int = PrivateAttr() @@ -61,13 +65,16 @@ def __init__(self, **kwargs) -> None: """ :param service_id: :param table_name: + :param data: + :param key_names: + :param schema_mapping: :param action: + :param sequence_pointer_field: :param sdc_sequence: An integer that tells the Import API the order in which data points in the request body should be considered for loading. This data will be stored in the destination table in the _sequence column. This API uses a Unix epoch (in milliseconds) as the value for this property. Note: This value cannot exceed the maximum of 9223372036854775807. """ - # import ipdb; ipdb.set_trace() super().__init__(**kwargs) NOW: datetime = datetime.now() self._sdc_received_at = NOW.strftime("%y-%m-%d %H:%M:%S") @@ -101,3 +108,13 @@ def action_validator(cls, value, values): # if not len([key in keys for key in value]): # raise ValueError("Fields on key_names must be into data.") # return value + + @property + def last_updated_at(self) -> int: + """ + set a timestamp as a property + """ + last_ran: int = 0 + if self.sequence_pointer_field is not None and self.data: + last_ran = int(max(parse(str(column[self.sequence_pointer_field])) for column in self.data).timestamp()) + return last_ran diff --git a/brandenburg/providers/gcp.py b/brandenburg/providers/gcp.py index e58000f..ae45b55 100644 --- a/brandenburg/providers/gcp.py +++ b/brandenburg/providers/gcp.py @@ -85,15 +85,19 @@ def create_topics(self, topics: List[str]) -> None: TODO: Add a return statement and handle with exceptions """ GOOGLE_PROJECT_ID: str = settings.GOOGLE_PROJECT_ID - client: pubsub_v1.PublisherClient = pubsub_v1.PublisherClient(credentials=self.get_credentials()) - project = client.project_path(GOOGLE_PROJECT_ID) + client: pubsub_v1.PublisherClient = publisher_client logger.info(f"Checking if all topics already exists") - existing_topics: List[str] = [element.name.split("/")[3] for element in client.list_topics(project)] + existing_topics: List[str] = [ + element.name.split("/")[3] for element in client.list_topics(project=f"projects/{GOOGLE_PROJECT_ID}") + ] logger.info(f"Existing opics: { existing_topics}") for topic in set(topics).difference(existing_topics): topic_name: str = client.topic_path(GOOGLE_PROJECT_ID, topic) logger.info(f"creating topic: {topic_name}") - client.create_topic(topic_name) + try: + client.create_topic(topic_name) + except Exception as ex: + logger.error(ex) def publish(self, topic: str, data: str, **attrs): """ diff --git a/brandenburg/services/batch.py b/brandenburg/services/batch.py index 6b8fb7e..dd8b365 100644 --- a/brandenburg/services/batch.py +++ b/brandenburg/services/batch.py @@ -4,28 +4,38 @@ from typing import Tuple # Local application imports +from brandenburg.config import settings from brandenburg.models.batch import BatchModel from brandenburg.services.publisher import PublisherService +from brandenburg.toolbox._backends.redis import RedisBackend from brandenburg.toolbox.logger import log -logger = log.get_logger(__name__) +LOGGER = log.get_logger(__name__) class BatchService: - @staticmethod - async def execute(batch: BatchModel, routing_key: str, action: str = "upsert") -> Tuple[BatchModel, bool]: + @classmethod + async def execute(cls, batch: BatchModel, routing_key: str, action: str = "upsert") -> Tuple[BatchModel, bool]: batch.action = action res = await PublisherService.publish(batch.dict(), routing_key) - logger.info(f"sent_to_topic: {bool(res)}, batch: {batch}") + LOGGER.info(f"sent_to_topic: {bool(res)}, batch: {batch}") + cls._set_last_ran(batch) return batch, True @staticmethod - async def upload(name: str, filename: str, file: bytes, hash: str, token: str): + async def upload(name: str, filename: str, file: bytes, hash: str, token: str) -> bool: path: str = f"{name}/{datetime.now().strftime('%Y/%m/%d')}" - logger.info(f"uploading file: {filename} with hash: {hash} to path {path}, token: {token}") + LOGGER.info(f"uploading file: {filename} with hash: {hash} to path {path}, token: {token}") await PublisherService.upload_file(f"{path}/{filename}", file) - logger.info("uploading MD5SUM file") + LOGGER.info("uploading MD5SUM file") await PublisherService.upload_file(f"{path}/MD5SUM", StringIO(hash)) - logger.info("all files were uploaded") + LOGGER.info("all files were uploaded") return True + + @classmethod + async def _set_last_ran(cls, batch: BatchModel) -> None: + cache = await RedisBackend(settings.REDIS_URL).get_instance() + LOGGER.info(f"Configuring last ran date to table: {batch.table_name}, timestamp: {batch.last_updated_at}") + cache.set_cache(batch.table_name.lower(), batch.last_updated_at, -1) + LOGGER.info("last ran was set successfully") diff --git a/tests/models/test_batch_model.py b/tests/models/test_batch_model.py index d678ee5..08d8faa 100644 --- a/tests/models/test_batch_model.py +++ b/tests/models/test_batch_model.py @@ -13,7 +13,12 @@ "service_id": "raw", "table_name": "users", "action": "upsert", - "data": [{"id": 1, "name": "maria", "email": "maria@uol.in"}], + "data": [ + {"id": 1, "name": "maria", "email": "maria@uol.in", "updated_at": "2020-08-05T16:48:39.343"}, + {"id": 2, "name": "alana", "email": "alana@aon.in", "updated_at": "2021-01-05T10:08:39.000"}, + {"id": 3, "name": "joana", "email": "joana@aon.in", "updated_at": "2020-12-05T10:08:39.000"}, + ], + "sequence_pointer_field": "updated_at", } @@ -73,3 +78,8 @@ def test_with_data_over_1000(): assert json.loads(info.value.json()) == [ {"loc": ["data"], "msg": "Field data exceed 1000 records.", "type": "value_error",} ] + + +def test_get_max_datetime(): + batch: BatchModel = BatchModel(**BATCH) + assert batch.last_updated_at == 1609852119 diff --git a/tests/routers/test_imports_api.py b/tests/routers/test_imports_api.py index 292cab1..e00f077 100644 --- a/tests/routers/test_imports_api.py +++ b/tests/routers/test_imports_api.py @@ -9,11 +9,14 @@ # Local application imports from brandenburg.config import settings +xfail = pytest.mark.xfail + DATA: Dict[str, str] = { "service_id": "test_brandenburg", "table_name": "user", - "data": [{"id": 1, "name": "Maria"}], + "data": [{"id": 1, "name": "Maria", "updated_at": "2020-12-03T19:35:30.0494511"}], "action": "upsert", + "sequence_pointer_field": "updated_at", } @@ -39,12 +42,20 @@ def test_api_get_401_without_auth(client): assert res.status_code == 401 -@pytest.mark.xfail +def test_api_with_sequence_field_request(client): + data: Dict[str, str] = copy.deepcopy(DATA) + data.update({"sequence_pointer_field": "updated_at"}) + res = client.post(f"/v1/import/push/", json=data, headers=HEADERS) + assert res.status_code == 201 + + def test_full_fields(client): full: Dict[str, str] = copy.deepcopy(DATA) full["key_names"] = ["id"] - full["schema_mapping"] = [{"a": "s"}] - pass + full["schema_mapping"] = [{"name": "id", "type": "int", "is_nullable": True}] + full["action"] = "batch" + res = client.post(f"/v1/import/batch/", json=full, headers=HEADERS) + assert res.status_code == 201 @pytest.mark.xfail @@ -83,3 +94,8 @@ def test_send_file_and_check_background_function(client): Check if the files was uploaded """ pass + + +@xfail +def test_check_last_datetime_field(client): + pass