Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/appsflyer #44

Closed
wants to merge 10 commits into from
Closed
11 changes: 4 additions & 7 deletions docs/supported-sources/appsflyer.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@ An API token is required to retrieve reports from the AppsFlyer API. Please foll

Once you complete the guide, you should have an API key. Let's say your API key is ey123, here's a sample command that will copy the data from AppsFlyer into a duckdb database:

ingestr ingest --source-uri 'appsflyer://?api_key=ey123' --source-table 'campaigns' --dest-uri duckdb:///appsflyer.duckdb --dest-table 'appsflyer.output' --interval-start '2024-08-01' --interval-end '2024-08-28'
ingestr ingest --source-uri 'appsflyer://?api_key=ey123' --source-table 'cacreativesmpaigns' --dest-uri duckdb:///appsflyer.duckdb --dest-table 'dest.creatives' --interval-start '2024-08-01' --interval-end '2024-08-28'

The result of this command will be a table in the appsflyer.duckdb database

Available Source Table:
AppsFlyer source allows ingesting the following source into separate tables:
The AppsFlyer source allows ingesting the "Creatives" data into a table:

-Campaigns: Retrieves data for campaigns, detailing the app's costs, loyal users, total installs, and revenue over multiple days.
- Creatives: Retrieves data on creative assets, including revenue and cost.

-Creatives: Retrieves data for a creative asset, including revenue and cost.

Use these as `--source-table` parameter in the `ingestr ingest` command.
Use creatives as the `--source-table` parameter in the `ingestr ingest` command.
26 changes: 19 additions & 7 deletions ingestr/src/appsflyer/_init_.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Iterable

import dlt
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import TDataItem
from dlt.sources import DltResource

Expand All @@ -11,14 +12,25 @@
def appsflyer_source(
api_key: str, start_date: str, end_date: str
) -> Iterable[DltResource]:
start_date_obj = ensure_pendulum_datetime(start_date)
client = AppsflyerClient(api_key)

@dlt.resource(write_disposition="merge", merge_key="install_time")
def campaigns() -> Iterable[TDataItem]:
yield from client.fetch_campaigns(start_date, end_date)
#disable due to cohort metrics
'''@dlt.resource(write_disposition="merge", merge_key="Install Time")
def campaigns(
updated=dlt.sources.incremental('["Install Time"]', start_date_obj.isoformat()),
) -> Iterable[TDataItem]:
yield from client.fetch_campaigns(
start_date=updated.start_value, end_date=end_date
)
'''

@dlt.resource(write_disposition="merge", merge_key="install_time")
def creatives() -> Iterable[TDataItem]:
yield from client.fetch_creatives(start_date, end_date)
@dlt.resource(write_disposition="merge", merge_key="Install Time")
def creatives(
updated=dlt.sources.incremental('["Install Time"]', start_date_obj.isoformat()),
) -> Iterable[TDataItem]:
yield from client.fetch_creatives(
start_date=updated.start_value, end_date=end_date
)

return campaigns, creatives
return creatives
96 changes: 58 additions & 38 deletions ingestr/src/appsflyer/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta
from typing import Optional

import requests
Expand Down Expand Up @@ -37,46 +38,59 @@ def _fetch_data(
dimensions=DEFAULT_GROUPING,
metrics=DEFAULT_KPIS,
):
params = {
"from": from_date,
"to": to_date,
"groupings": ",".join(dimensions),
"kpis": ",".join(metrics),
"format": "json",
"maximum_rows": maximum_rows,
}
current_start_time = datetime.fromisoformat(from_date).date()
end_date_time = datetime.fromisoformat(to_date).date()

def retry_on_limit(
response: Optional[requests.Response], exception: Optional[BaseException]
) -> bool:
return (
isinstance(response, requests.Response) and response.status_code == 429
while current_start_time < end_date_time:
current_end_time = min(
(current_start_time + timedelta(days=30)), end_date_time
)

request_client = Client(
request_timeout=10.0,
raise_for_status=False,
retry_condition=retry_on_limit,
request_max_attempts=12,
request_backoff_factor=2,
).session
params = {
"from": current_start_time.isoformat(),
"to": current_end_time.isoformat(),
"groupings": ",".join(dimensions),
"kpis": ",".join(metrics),
"format": "json",
"maximum_rows": maximum_rows,
}

try:
response = request_client.get(
url=self.uri, headers=self.__get_headers(), params=params
)
def retry_on_limit(
response: Optional[requests.Response],
exception: Optional[BaseException],
) -> bool:
return (
isinstance(response, requests.Response)
and response.status_code == 429
)

if response.status_code == 200:
result = response.json()
yield result
else:
raise HTTPError(
f"Request failed with status code: {response.status_code}"
request_client = Client(
request_timeout=10.0,
raise_for_status=False,
retry_condition=retry_on_limit,
request_max_attempts=12,
request_backoff_factor=2,
).session

try:
response = request_client.get(
url=self.uri, headers=self.__get_headers(), params=params
)

except requests.RequestException as e:
raise HTTPError(f"Request failed: {e}")
if response.status_code == 200:
result = response.json()
yield result
else:
raise HTTPError(
f"Request failed with status code: {response.status_code}"
)

except requests.RequestException as e:
raise HTTPError(f"Request failed: {e}")

current_start_time = current_end_time

'''
def fetch_campaigns(
self,
start_date: str,
Expand All @@ -89,18 +103,24 @@ def fetch_campaigns(
"cohort_day_3_total_revenue_per_user",
"cohort_day_7_total_revenue_per_user",
"cohort_day_7_revenue_per_user",
"cohort_day_14_total_revenue_per_user",
"cohort_day_14_revenue_per_user",
"cohort_day_21_total_revenue_per_user",
"cohort_day_21_revenue_per_user",
"retention_day_7",
]
return self._fetch_data(start_date, end_date, metrics=metrics)
max_cohort_duration = 7
max_allowed_end_date = (
datetime.now() - timedelta(days=max_cohort_duration)
).strftime("%Y-%m-%d")
adjusted_end_date = min(end_date, max_allowed_end_date)
return self._fetch_data(
from_date=start_date, to_date=adjusted_end_date, metrics=metrics
)
'''

def fetch_creatives(
self,
start_date: str,
end_date: str,
):
dimensions = DEFAULT_GROUPING + ["af_adset_id", "af_adset", "af_ad_id"]
return self._fetch_data(start_date, end_date, dimensions=dimensions)
return self._fetch_data(
from_date=start_date, to_date=end_date, dimensions=dimensions
)
27 changes: 19 additions & 8 deletions ingestr/src/sources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import base64
import csv
import json
from datetime import date, datetime
from datetime import date, datetime, timedelta
from typing import Any, Callable, Optional
from urllib.parse import parse_qs, urlparse

Expand Down Expand Up @@ -724,20 +724,32 @@ def dlt_source(self, uri: str, table: str, **kwargs):
raise ValueError("api_key in the URI is required to connect to Appsflyer")

resource = None
if table in ["campaigns", "creatives"]:
if table == "creatives":
resource = table
else:
raise ValueError(
f"Resource '{table}' is not supported for Appsflyer source yet, if you are interested in it please create a GitHub issue at https://github.com/bruin-data/ingestr"
)
interval_start = kwargs.get("interval_start")
interval_end = kwargs.get("interval_end")

start_date = kwargs.get("interval_start") or "2024-01-02"
end_date = kwargs.get("interval_end") or "2024-01-29"

start_date = (
interval_start if interval_start else datetime(2001,1,1)
)

end_date = (
interval_end
if interval_end
else datetime.now()
)
min_diff_90_days = (end_date - start_date).days
if min_diff_90_days < 90:
start_date = end_date - timedelta(90)

return appsflyer_source(
api_key=api_key[0],
start_date=start_date,
end_date=end_date,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
).with_resources(resource)


Expand All @@ -757,7 +769,6 @@ def dlt_source(self, uri: str, table: str, **kwargs):
interval_start.strftime("%Y-%m-%d") if interval_start else "2000-01-01"
)
end_date = interval_end.strftime("%Y-%m-%d") if interval_end else None

source_fields = urlparse(uri)
subdomain = source_fields.hostname
if not subdomain:
Expand Down
Loading