Skip to content

Commit

Permalink
Make reports run incrementally on the specified period length
Browse files Browse the repository at this point in the history
or fetch all data in one request if period is not provided.

Ensure that "click_performance_report" is always split by 1 day.
  • Loading branch information
nikitamaruniak committed Aug 8, 2024
1 parent d880663 commit 30bd558
Showing 1 changed file with 78 additions and 8 deletions.
86 changes: 78 additions & 8 deletions tap_google_ads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from requests.exceptions import ReadTimeout
import backoff

from dateutil.relativedelta import relativedelta

from tap_google_ads.api_version import API_VERSION
from . import report_definitions

Expand All @@ -26,6 +28,12 @@
]
)

REPORTS_ONE_DAY_ONLY = frozenset(
[
"click_performance_report",
]
)

DEFAULT_CONVERSION_WINDOW = 30
DEFAULT_REQUEST_TIMEOUT = 900 # in seconds

Expand Down Expand Up @@ -56,6 +64,52 @@ def get_request_timeout(config):
request_timeout = DEFAULT_REQUEST_TIMEOUT
return request_timeout


def get_split_by_period(config) -> relativedelta | None:
"""Constructs the date range for a single API request from config and errors
on invalid values.
The value has the following schema:
```
"split_by_period": {
"days": <int>,
"month": <int>,
"years": <int>
}
```
At least "days" or "months" or "years" must be set and be greater than 0.
If value is set, it will be used to split the requested period into series
of date ranges and each date range will be requested separately.
If value is not set or can't be correctly parsed, the function returns `None`
and the tap fetches the whole requested period in one request.
"""

split_by_period = config.get("split_by_period")

if split_by_period is None:
return None

days = split_by_period.get("days") or "0"
months = split_by_period.get("months") or "0"
years = split_by_period.get("years") or "0"

try:
period = relativedelta(
days = int(days),
months = int(months),
years = int(years),
)
except (ValueError, TypeError):
LOGGER.warning(f"The provided split_by_period value {split_by_period} is invalid; it will be set to None.")
return None

return period


def create_nested_resource_schema(resource_schema, fields):
new_schema = {
"type": ["null", "object"],
Expand Down Expand Up @@ -156,11 +210,12 @@ def create_core_stream_query(resource_name, selected_fields, last_pk_fetched, fi
return core_query


def create_report_query(resource_name, selected_fields, query_date):

def create_report_query(resource_name, selected_fields, start_date, end_date):
format_str = "%Y-%m-%d"
query_date = utils.strftime(query_date, format_str=format_str)
report_query = f"SELECT {','.join(selected_fields)} FROM {resource_name} WHERE segments.date = '{query_date}' {build_parameters()}"
formatted_start_date = utils.strftime(start_date, format_str=format_str)
formatted_end_date = utils.strftime(end_date, format_str=format_str)
fields = ",".join(selected_fields)
report_query = f"SELECT {fields} FROM {resource_name} WHERE segments.date BETWEEN '{formatted_start_date}' AND '{formatted_end_date}' {build_parameters()}"

return report_query

Expand Down Expand Up @@ -745,9 +800,21 @@ def sync(self, sdk_client, customer, stream, config, state, query_limit):
if selected_fields == {'segments.date'}:
raise Exception(f"Selected fields is currently limited to {', '.join(selected_fields)}. Please select at least one attribute and metric in order to replicate {stream_name}.")

if stream_name in REPORTS_ONE_DAY_ONLY:
LOGGER.info(f"Stream: {stream_name} supports quering by one day only. Setting split_by_period value to 1 day.")
split_by_period = relativedelta(days=1)
else:
split_by_period = get_split_by_period(config)

while query_date <= end_date:
query = create_report_query(resource_name, selected_fields, query_date)
LOGGER.info(f"Requesting {stream_name} data for {utils.strftime(query_date, '%Y-%m-%d')}.")
if split_by_period is None:
# If split_by_period is not provided - fetch the whole period
# from start_date to end_date in one request.
query_last_date = end_date
else:
query_last_date = min(end_date, query_date + split_by_period - timedelta(days=1))
query = create_report_query(resource_name, selected_fields, query_date, query_last_date)
LOGGER.info(f"Requesting {stream_name} data for {utils.strftime(query_date, '%Y-%m-%d')}...{utils.strftime(query_last_date, '%Y-%m-%d')}.")

try:
response = make_request(gas, query, customer["customerId"], config)
Expand All @@ -768,12 +835,15 @@ def sync(self, sdk_client, customer, stream, config, state, query_limit):

singer.write_record(stream_name, record)

new_bookmark_value = {replication_key: utils.strftime(query_date)}
new_bookmark_value = {replication_key: utils.strftime(query_last_date)}
singer.write_bookmark(state, stream["tap_stream_id"], customer["customerId"], new_bookmark_value)

singer.write_state(state)

query_date += timedelta(days=1)
if split_by_period is None:
break
else:
query_date += split_by_period


def initialize_core_streams(resource_schema):
Expand Down

0 comments on commit 30bd558

Please sign in to comment.