Skip to content

Commit

Permalink
Upgrade api version 202403 video ads (#71)
Browse files Browse the repository at this point in the history
* added video-ads implementation
* fixed pylint issues
* added handler for missing permission, removed test for skip condition
* updated message
* fstring -> format for 3.6 compatibility
  • Loading branch information
Vi6hal authored Apr 14, 2024
1 parent 93d6cf5 commit 1ec1015
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 66 deletions.
180 changes: 180 additions & 0 deletions tap_linkedin_ads/schemas/video_ads.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,186 @@
"string"
]
},
"lifecycle_state": {
"type": [
"null",
"string"
]
},
"visibility": {
"type": [
"null",
"string"
]
},
"published_at": {
"type": [
"null",
"string"
],
"format": "date-time"
},
"author": {
"type": [
"null",
"string"
]
},
"content_call_to_action_label": {
"type": [
"null",
"string"
]
},
"distribution": {
"type": [
"null",
"object"
],
"additionalProperties": false,
"properties": {
"feed_distribution": {
"type": [
"null",
"string"
]
},
"third_party_distribution_channels": {
"type": [
"null",
"array"
],
"items": {
"type": [
"null",
"string"
]
}
}
}
},
"content": {
"type": [
"null",
"object"
],
"additionalProperties": false,
"properties": {
"media": {
"type": [
"null",
"object"
],
"additionalProperties": false,
"properties": {
"title": {
"type": [
"null",
"string"
]
},
"id": {
"type": [
"null",
"string"
]
}
}
}
}
},
"content_landing_page": {
"type": [
"null",
"string"
]
},
"lifecycle_state_info": {
"type": [
"null",
"object"
],
"additionalProperties": false,
"properties": {
"is_edited_by_author": {
"type": [
"null",
"boolean"
]
}
}
},
"is_reshare_disabled_by_author": {
"type": [
"null",
"boolean"
]
},
"created_at": {
"type": [
"null",
"string"
],
"format": "date-time"
},
"last_modified_at": {
"type": [
"null",
"string"
],
"format": "date-time"
},
"id": {
"type": [
"null",
"string"
]
},
"commentary": {
"type": [
"null",
"string"
]
},
"ad_context": {
"type": [
"null",
"object"
],
"additionalProperties": false,
"properties": {
"dsc_status": {
"type": [
"null",
"string"
]
},
"dsc_name": {
"type": [
"null",
"string"
]
},
"dsc_ad_type": {
"type": [
"null",
"string"
]
},
"is_dsc": {
"type": [
"null",
"boolean"
]
},
"dsc_ad_account": {
"type": [
"null",
"string"
]
}
}
},
"account_id": {
"type": [
"null",
Expand Down
44 changes: 25 additions & 19 deletions tap_linkedin_ads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,11 @@ def get_next_url(stream_name, next_url, data):
next_page_token = data.get('metadata', {}).get('nextPageToken', None)
if next_page_token:
if 'pageToken=' in next_url:
next_url = re.sub(r'pageToken=[^&]+', f'pageToken={next_page_token}', next_url)
next_url = re.sub(r'pageToken=[^&]+', 'pageToken={}'.format(next_page_token), next_url)
else:
next_url = f"{next_url}&pageToken={next_page_token}"
next_url = next_url + "&pageToken={}".format(next_page_token)
else:
next_url = None
return next_url
else:
next_url = None
links = data.get('paging', {}).get('links', [])
Expand All @@ -112,7 +111,7 @@ def get_next_url(stream_name, next_url, data):
return 'https://api.linkedin.com{}'.format(href)
# Prepare next page URL
next_url = 'https://api.linkedin.com{}'.format(urllib.parse.unquote(href))
return next_url
return next_url

def shift_sync_window(params, today, date_window_size, forced_window_size=None):
"""
Expand All @@ -130,7 +129,6 @@ def shift_sync_window(params, today, date_window_size, forced_window_size=None):
'dateRange.start.day': current_end.day,
'dateRange.start.month': current_end.month,
'dateRange.start.year': current_end.year,

'dateRange.end.day': new_end.day,
'dateRange.end.month': new_end.month,
'dateRange.end.year': new_end.year,}
Expand Down Expand Up @@ -279,7 +277,7 @@ def process_records(self,

return max_bookmark_value, counter.value

# pylint: disable=too-many-branches,too-many-statements,too-many-arguments,too-many-locals
# pylint: disable=too-many-branches,too-many-statements,too-many-arguments,too-many-locals,too-many-nested-blocks
def sync_endpoint(self,
client,
catalog,
Expand Down Expand Up @@ -344,10 +342,13 @@ def sync_endpoint(self,
urllist = []
if self.tap_stream_id in NEW_PATH_STREAMS:
for account in account_list:
url = f"{BASE_URL}/adAccounts/{account}/{self.path}?{querystring}"
url = "{}/adAccounts/{}/{}?{}".format(BASE_URL, account, self.path, querystring)
urllist.append((account, url))
else:
url = 'https://api.linkedin.com/rest/{}?{}'.format(self.path, querystring)
if self.path == 'posts':
url = '{}/{}?{}&dscAdAccount=urn%3Ali%3AsponsoredAccount%3A{}'.format(BASE_URL, self.path, querystring, parent_id)
else:
url = '{}/{}?{}'.format(BASE_URL, self.path, querystring)
urllist.append((None, url))

for acct_id, next_url in urllist:
Expand Down Expand Up @@ -403,14 +404,6 @@ def sync_endpoint(self,
# Add children filter params based on parent IDs
if self.tap_stream_id == 'accounts':
account = 'urn:li:sponsoredAccount:{}'.format(parent_id)
owner_id = record.get('reference_organization_id', None)
owner = 'urn:li:organization:{}'.format(owner_id)
if child_stream_name == 'video_ads' and owner_id is not None:
child_stream_params['account'] = account
child_stream_params['owner'] = owner
else:
LOGGER.warning("Skipping video_ads call for %s account as reference_organization_id is not found.", account)
continue
elif self.tap_stream_id == 'campaigns':
campaign = 'urn:li:sponsoredCampaign:{}'.format(parent_id)
if child_stream_name == 'creatives':
Expand Down Expand Up @@ -616,12 +609,25 @@ class VideoAds(LinkedInAds):
replication_method = "INCREMENTAL"
key_properties = ["content_reference"]
foreign_key = "id"
path = "adDirectSponsoredContents"
path = "posts"
data_key = "elements"
parent = "accounts"
params = {
"q": "account"
"q": "dscAdAccount",
"dscAdTypes": "List(VIDEO)"
}
headers = {'X-Restli-Protocol-Version': "2.0.0"}

def sync_endpoint(self, *args, **kwargs):
try:
return super().sync_endpoint(*args, **kwargs)
except Exception as error:
if "Not enough permissions to access: partnerApiPostsExternal" in str(error):
LOGGER.info("Access to the video-ads API is denied due to insufficient permissions. Please reauthenticate or verify the required permissions.")
LOGGER.error(error)
# total record count (zero), initial bookmark returned to supress this failure
return 0, self.get_bookmark(kwargs.get("state"), kwargs.get("start_date"))
raise error

class AccountUsers(LinkedInAds):
"""
Expand Down Expand Up @@ -736,7 +742,7 @@ class AdAnalyticsByCreative(LinkedInAds):
# Dictionary of the stream classes
STREAMS = {
"accounts": Accounts,
# "video_ads": VideoAds,
"video_ads": VideoAds,
"account_users": AccountUsers,
"campaign_groups": CampaignGroups,
"campaigns": Campaigns,
Expand Down
7 changes: 3 additions & 4 deletions tap_linkedin_ads/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,14 @@ def sync(client, config, catalog, state):
params = stream_obj.params
if account_filter == 'search_id_values_param':
# Convert account IDs to URN format
urn_list = [f"urn%3Ali%3AsponsoredAccount%3A{account_id}" for account_id in account_list]
urn_list = ["urn%3Ali%3AsponsoredAccount%3A{}".format(account_id) for account_id in account_list]
# Create the query parameter string
param_value = f"(id:(values:List({','.join(urn_list)})))"
param_value = "(id:(values:List({})))".format(','.join(urn_list))
params['search'] = param_value
elif account_filter == 'accounts_param':
for idx, account in enumerate(account_list):
params['accounts[{}]'.format(idx)] = \
'urn:li:sponsoredAccount:{}'.format(account)

# Update params of specific stream
stream_obj.params = params

Expand All @@ -129,7 +128,7 @@ def sync(client, config, catalog, state):
stream_obj.write_schema(catalog)

total_records, max_bookmark_value = stream_obj.sync_endpoint(
client=client, catalog=catalog,
client=client, catalog=catalog,
state=state, page_size=page_size,
start_date=start_date,
selected_streams=selected_streams,
Expand Down
21 changes: 21 additions & 0 deletions tap_linkedin_ads/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,25 @@ def transform_urn(data_dict):
return data_dict


def transform_video_ads(data_dict):
# pylint: disable=fixme
# TODO: To be removed in next major version release
if 'author' in data_dict:
data_dict['owner'] = data_dict["author"]
if 'id' in data_dict:
data_dict['content_reference'] = data_dict["id"]
if 'ad_context' in data_dict:
if 'dsc_name' in data_dict['ad_context']:
data_dict['name'] = data_dict["ad_context"]['dsc_name']
if 'dsc_ad_type' in data_dict['ad_context']:
data_dict['type'] = data_dict["ad_context"]['dsc_ad_type']
if 'dsc_ad_account' in data_dict['ad_context']:
data_dict['account'] = data_dict["ad_context"]['dsc_ad_account']
if 'last_modified_at' in data_dict:
data_dict['last_modified_time'] = data_dict["last_modified_at"]
if 'created_at' in data_dict:
data_dict['created_time'] = data_dict["created_at"]
return data_dict

def transform_data(data_dict, stream_name):
new_dict = data_dict
Expand All @@ -308,6 +327,8 @@ def transform_data(data_dict, stream_name):
this_dict = transform_campaigns(this_dict)
elif stream_name == 'creatives':
this_dict = transform_creatives(this_dict)
elif stream_name == 'video_ads':
this_dict = transform_video_ads(this_dict)
this_dict = transform_urn(this_dict)
this_dict = transform_audit_fields(this_dict)

Expand Down
14 changes: 7 additions & 7 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def get_credentials(self):
def expected_check_streams():
return {
'accounts',
# 'video_ads',
'video_ads',
'account_users',
'campaign_groups',
'campaigns',
Expand All @@ -100,12 +100,12 @@ def expected_metadata(self):
self.OBEYS_START_DATE: True,
self.REPLICATION_KEYS: {'last_modified_time'}
},
# 'video_ads': {
# self.PRIMARY_KEYS: {'content_reference'},
# self.REPLICATION_METHOD: self.INCREMENTAL,
# self.OBEYS_START_DATE: True,
# self.REPLICATION_KEYS: {'last_modified_time'}
# },
'video_ads': {
self.PRIMARY_KEYS: {'content_reference'},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.OBEYS_START_DATE: True,
self.REPLICATION_KEYS: {'last_modified_time'}
},
'account_users': {
self.PRIMARY_KEYS: {'account_id', 'user_person_id'},
self.REPLICATION_METHOD: self.INCREMENTAL,
Expand Down
1 change: 1 addition & 0 deletions tests/test_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"video_ads": {
"content_reference_share_id",
"content_reference_ucg_post_id",
"change_audit_stamps"
},
"accounts": {
"total_budget_ends_at",
Expand Down
Loading

0 comments on commit 1ec1015

Please sign in to comment.