Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync fix #12

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ singer-python = "^5.9.1"
sendgrid = "^6.7.0"
boto3 = "^1.17.79"

[tool.poetry.dev-dependencies]
[tool.poetry.dev.dependencies]
coverage = "^5.4"
pylint = "^2.6.0"
pytest = "^6.2.2"
Expand Down
8 changes: 1 addition & 7 deletions tap_ask_nicely/schemas/historical_stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,7 @@
"type": ["null", "object"],
"additionalProperties": false,
"properties": {
"year": {
"type": ["null", "string"]
},
"month": {
"type": ["null", "string"]
},
"day": {
"date": {
"type": ["null", "string"]
},
"weekday": {
Expand Down
13 changes: 9 additions & 4 deletions tap_ask_nicely/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def sync(self, **kwargs) -> Generator[dict, None, None]:
)
records = res.get("data", [])
for record in records:
record["sent"] = datetime.fromtimestamp(int(record["sent"]))
record["opened"] = datetime.fromtimestamp(int(record["opened"]))
record["responded"] = datetime.fromtimestamp(int(record["responded"]))
record["lastemailed"] = datetime.fromtimestamp(int(record["lastemailed"]))
record["created"] = datetime.fromtimestamp(int(record["created"]))
yield record
contact_ids.add(record["contact_id"])
page = page + 1
Expand Down Expand Up @@ -120,7 +125,7 @@ def sync(self) -> Generator[dict, None, None]:

class HistoricalStats(Stream):
tap_stream_id = "historical_stats"
key_properties = []
key_properties = ["date"]
replication_key = ""
object_type = "HISTORICAL_STATS"
replication_method = "INCREMENTAL"
Expand All @@ -140,9 +145,9 @@ def sync(self) -> Generator[dict, None, None]:
while start_from != datetime.strftime(datetime.now(), "%Y-%m-%d"):
response = self.client.fetch_historical_stats(date=start_from)
sent_stats = response["data"]
if sent_stats != []:
for stat in sent_stats:
yield stat
for stat in sent_stats:
stat["date"] = datetime(int(stat.pop("year")), int(stat.pop("month")), int(stat.pop("day"))).strftime("%Y-%m-%d")
yield stat
start_from = increment_date_by_day(start_from)

singer.write_bookmark(
Expand Down
23 changes: 3 additions & 20 deletions tap_ask_nicely/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from singer import Transformer, metadata
from tap_ask_nicely.client import AskNicelyClient
from tap_ask_nicely.streams import STREAMS
from tap_ask_nicely.utils import AuditLogs, SendgridMessenger, SlackMessenger, GmailMessenger
from tap_ask_nicely.utils import SendgridMessenger, SlackMessenger, GmailMessenger
from datetime import date, datetime
import time

Expand Down Expand Up @@ -63,27 +63,10 @@ def sync(config, state, catalog):
singer.write_state(state, tap_stream_id)

batch_stop = datetime.now().strftime("%Y-%m-%d, %H:%M:%S")
AuditLogs.write_audit_log(
run_id=run_id,
stream_name=tap_stream_id,
batch_start=batch_start,
batch_end=batch_stop,
records_synced=record_count,
run_time=(time.perf_counter() - start_time),
)

except Exception as e:
stream_comments.append(f"{tap_stream_id.upper}: {e}")
batch_stop = datetime.now().strftime("%Y-%m-%d, %H:%M:%S")
AuditLogs.write_audit_log(
run_id=run_id,
stream_name=tap_stream_id,
batch_start=batch_start,
batch_end=batch_stop,
records_synced=record_count,
run_time=(time.perf_counter() - start_time),
comments=e,
)

state = singer.set_currently_syncing(state, None)
singer.write_state(state)
Expand All @@ -99,7 +82,7 @@ def sync(config, state, catalog):
}

# Comment out for local runs
if config["slack_notifications"] == True:
if config.get("slack_notifications"):
SlackMessenger.send_message(
run_id=run_id,
start_time=pipeline_start,
Expand All @@ -108,6 +91,6 @@ def sync(config, state, catalog):
comments='\n'.join(stream_comments),
)

if config["email_notifications"] == True:
if config.get("email_notifications"):
sg = SendgridMessenger(notification_data)
sg.send_message()
137 changes: 19 additions & 118 deletions tap_ask_nicely/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
import os
import json
from datetime import datetime
from dotenv import load_dotenv

load_dotenv()


class SlackMessenger:
Expand Down Expand Up @@ -55,121 +52,6 @@ def send_message(
def build_url() -> str:
return f'{SlackMessenger.slack_base}/{os.getenv("SLACK_WEBHOOK_ADDRESS")}'


class AuditLogs:
def audit_schema() -> dict:
schema_base = {
"type": ["null", "object"],
"additionalProperties": False,
"properties": {
"run_id": {"type": ["null", "integer"]},
"stream_name": {"type": ["null", "string"]},
"batch_start": {"type": ["null", "string"], "format": "date-time"},
"batch_end": {"type": ["null", "string"], "format": "date-time"},
"records_synced": {"type": ["null", "integer"]},
"run_time": {"type": ["null", "number"]},
"comments": {"type": ["null", "string"]},
},
}
return schema_base

def schema_metadata():
metadata = [
{
"breadcrumb": [],
"metadata": {
"table-key-properties": [],
"forced-replication-method": "FULL_TABLE",
"inclusion": "available",
},
},
{
"breadcrumb": ["properties", "run_id"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "stream_stream"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "batch_start"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "batch_end"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "records_synced"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "run_time"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "comments"],
"metadata": {"inclusion": "available"},
},
]

return metadata

def audit_record(
run_id: int,
stream_name: str,
batch_start: str,
run_time: int,
batch_end: str = datetime.now(),
records_synced: int = 0,
comments: str = "",
) -> dict:

audit = {
"run_id": run_id,
"stream_name": stream_name,
"batch_start": batch_start,
"batch_end": batch_end,
"records_synced": records_synced,
"run_time": run_time,
"comments": comments,
}
return audit

def write_audit_log(
run_id: int,
stream_name: str,
batch_start: str,
batch_end: str = datetime.now(),
records_synced: int = 0,
run_time=int,
comments: str = "",
):
singer.write_schema(
"audit_log",
AuditLogs.audit_schema(),
[],
"",
)
audit_log = Transformer().transform(
AuditLogs.audit_record(
run_id=run_id,
stream_name=stream_name,
batch_start=batch_start,
batch_end=batch_end,
records_synced=records_synced,
run_time=run_time,
comments=comments,
),
AuditLogs.audit_schema(),
metadata.to_map(AuditLogs.schema_metadata()),
)
singer.write_record(
"audit_log",
audit_log,
)


class EmailMessenger:
def __init__(self, sync_data: dict) -> None:
self.data = sync_data
Expand Down Expand Up @@ -306,3 +188,22 @@ def send_message(self):
LOGGER.error(f"There was an issue sending the email: {error}")

return f"There was an issue sending the email: {error}"

def write_audit_log(run_id: int, stream_name: str, batch_start: str,
batch_end: str = datetime.now(),
records_synced: int = 0, run_time=int,
comments: str = ""):
with Counter('audit_log_written') as counter:
with Timer('audit_log_write_time'):
audit_data = {
"run_id": run_id,
"stream_name": stream_name,
"batch_start": batch_start,
"batch_end": batch_end,
"records_synced": records_synced,
"run_time": run_time,
"comments": comments,
}
email_messenger = SendgridMessenger(audit_data)
email_messenger.send_message()
counter.increment()
3 changes: 0 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
from tap_ask_nicely.storage import StorageHandler

import os
from dotenv import load_dotenv

load_dotenv()


@pytest.fixture
Expand Down