Skip to content

Commit

Permalink
Adding new field to know which is the name of the pointer (#27)
Browse files Browse the repository at this point in the history
* Adding a new field to know which field should be used as pointer

* Fixing list topic in new pubsub lib version
  • Loading branch information
jesuejunior authored Jan 7, 2021
1 parent 0a249b0 commit 3f5aa2e
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 18 deletions.
19 changes: 18 additions & 1 deletion brandenburg/models/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List, Dict, Union, Optional

# Third party imports
from dateutil.parser import parse
from pydantic import BaseModel, PrivateAttr, Field, validator

# Local application imports
Expand Down Expand Up @@ -54,20 +55,26 @@ class BatchModel(BaseModel):
action: str = Field(
title="This will always be upsert.", choices=(("upsert", "batch")), default="upsert",
)
sequence_pointer_field: str = Field(
..., title="""The datetime/timestamp field to be used as data pointer. E.g: updated_at"""
)
_sdc_received_at: str = PrivateAttr()
_sdc_sequence: int = PrivateAttr()

def __init__(self, **kwargs) -> None:
"""
:param service_id:
:param table_name:
:param data:
:param key_names:
:param schema_mapping:
:param action:
:param sequence_pointer_field:
:param sdc_sequence: An integer that tells the Import API the order in which data points in the request body should be
considered for loading. This data will be stored in the destination table in the _sequence column.
This API uses a Unix epoch (in milliseconds) as the value for this property.
Note: This value cannot exceed the maximum of 9223372036854775807.
"""
# import ipdb; ipdb.set_trace()
super().__init__(**kwargs)
NOW: datetime = datetime.now()
self._sdc_received_at = NOW.strftime("%y-%m-%d %H:%M:%S")
Expand Down Expand Up @@ -101,3 +108,13 @@ def action_validator(cls, value, values):
# if not len([key in keys for key in value]):
# raise ValueError("Fields on key_names must be into data.")
# return value

@property
def last_updated_at(self) -> int:
"""
set a timestamp as a property
"""
last_ran: int = 0
if self.sequence_pointer_field is not None and self.data:
last_ran = int(max(parse(str(column[self.sequence_pointer_field])) for column in self.data).timestamp())
return last_ran
12 changes: 8 additions & 4 deletions brandenburg/providers/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,19 @@ def create_topics(self, topics: List[str]) -> None:
TODO: Add a return statement and handle with exceptions
"""
GOOGLE_PROJECT_ID: str = settings.GOOGLE_PROJECT_ID
client: pubsub_v1.PublisherClient = pubsub_v1.PublisherClient(credentials=self.get_credentials())
project = client.project_path(GOOGLE_PROJECT_ID)
client: pubsub_v1.PublisherClient = publisher_client
logger.info(f"Checking if all topics already exists")
existing_topics: List[str] = [element.name.split("/")[3] for element in client.list_topics(project)]
existing_topics: List[str] = [
element.name.split("/")[3] for element in client.list_topics(project=f"projects/{GOOGLE_PROJECT_ID}")
]
logger.info(f"Existing opics: { existing_topics}")
for topic in set(topics).difference(existing_topics):
topic_name: str = client.topic_path(GOOGLE_PROJECT_ID, topic)
logger.info(f"creating topic: {topic_name}")
client.create_topic(topic_name)
try:
client.create_topic(topic_name)
except Exception as ex:
logger.error(ex)

def publish(self, topic: str, data: str, **attrs):
"""
Expand Down
26 changes: 18 additions & 8 deletions brandenburg/services/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,38 @@
from typing import Tuple

# Local application imports
from brandenburg.config import settings
from brandenburg.models.batch import BatchModel
from brandenburg.services.publisher import PublisherService
from brandenburg.toolbox._backends.redis import RedisBackend
from brandenburg.toolbox.logger import log

logger = log.get_logger(__name__)
LOGGER = log.get_logger(__name__)


class BatchService:
@staticmethod
async def execute(batch: BatchModel, routing_key: str, action: str = "upsert") -> Tuple[BatchModel, bool]:
@classmethod
async def execute(cls, batch: BatchModel, routing_key: str, action: str = "upsert") -> Tuple[BatchModel, bool]:
batch.action = action
res = await PublisherService.publish(batch.dict(), routing_key)
logger.info(f"sent_to_topic: {bool(res)}, batch: {batch}")
LOGGER.info(f"sent_to_topic: {bool(res)}, batch: {batch}")
cls._set_last_ran(batch)
return batch, True

@staticmethod
async def upload(name: str, filename: str, file: bytes, hash: str, token: str):
async def upload(name: str, filename: str, file: bytes, hash: str, token: str) -> bool:

path: str = f"{name}/{datetime.now().strftime('%Y/%m/%d')}"
logger.info(f"uploading file: {filename} with hash: {hash} to path {path}, token: {token}")
LOGGER.info(f"uploading file: {filename} with hash: {hash} to path {path}, token: {token}")
await PublisherService.upload_file(f"{path}/{filename}", file)
logger.info("uploading MD5SUM file")
LOGGER.info("uploading MD5SUM file")
await PublisherService.upload_file(f"{path}/MD5SUM", StringIO(hash))
logger.info("all files were uploaded")
LOGGER.info("all files were uploaded")
return True

@classmethod
async def _set_last_ran(cls, batch: BatchModel) -> None:
cache = await RedisBackend(settings.REDIS_URL).get_instance()
LOGGER.info(f"Configuring last ran date to table: {batch.table_name}, timestamp: {batch.last_updated_at}")
cache.set_cache(batch.table_name.lower(), batch.last_updated_at, -1)
LOGGER.info("last ran was set successfully")
12 changes: 11 additions & 1 deletion tests/models/test_batch_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
"service_id": "raw",
"table_name": "users",
"action": "upsert",
"data": [{"id": 1, "name": "maria", "email": "[email protected]"}],
"data": [
{"id": 1, "name": "maria", "email": "[email protected]", "updated_at": "2020-08-05T16:48:39.343"},
{"id": 2, "name": "alana", "email": "[email protected]", "updated_at": "2021-01-05T10:08:39.000"},
{"id": 3, "name": "joana", "email": "[email protected]", "updated_at": "2020-12-05T10:08:39.000"},
],
"sequence_pointer_field": "updated_at",
}


Expand Down Expand Up @@ -73,3 +78,8 @@ def test_with_data_over_1000():
assert json.loads(info.value.json()) == [
{"loc": ["data"], "msg": "Field data exceed 1000 records.", "type": "value_error",}
]


def test_get_max_datetime():
batch: BatchModel = BatchModel(**BATCH)
assert batch.last_updated_at == 1609852119
24 changes: 20 additions & 4 deletions tests/routers/test_imports_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
# Local application imports
from brandenburg.config import settings

xfail = pytest.mark.xfail

DATA: Dict[str, str] = {
"service_id": "test_brandenburg",
"table_name": "user",
"data": [{"id": 1, "name": "Maria"}],
"data": [{"id": 1, "name": "Maria", "updated_at": "2020-12-03T19:35:30.0494511"}],
"action": "upsert",
"sequence_pointer_field": "updated_at",
}


Expand All @@ -39,12 +42,20 @@ def test_api_get_401_without_auth(client):
assert res.status_code == 401


@pytest.mark.xfail
def test_api_with_sequence_field_request(client):
data: Dict[str, str] = copy.deepcopy(DATA)
data.update({"sequence_pointer_field": "updated_at"})
res = client.post(f"/v1/import/push/", json=data, headers=HEADERS)
assert res.status_code == 201


def test_full_fields(client):
full: Dict[str, str] = copy.deepcopy(DATA)
full["key_names"] = ["id"]
full["schema_mapping"] = [{"a": "s"}]
pass
full["schema_mapping"] = [{"name": "id", "type": "int", "is_nullable": True}]
full["action"] = "batch"
res = client.post(f"/v1/import/batch/", json=full, headers=HEADERS)
assert res.status_code == 201


@pytest.mark.xfail
Expand Down Expand Up @@ -83,3 +94,8 @@ def test_send_file_and_check_background_function(client):
Check if the files was uploaded
"""
pass


@xfail
def test_check_last_datetime_field(client):
pass

0 comments on commit 3f5aa2e

Please sign in to comment.