Skip to content

Commit

Permalink
exclude insights stream from the other tests as well
Browse files Browse the repository at this point in the history
  • Loading branch information
sgandhi1311 committed Jan 9, 2025
1 parent 54c49b8 commit ff1f23b
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 130 deletions.
256 changes: 130 additions & 126 deletions tests/test_facebook_attribution_window.py
Original file line number Diff line number Diff line change
@@ -1,132 +1,136 @@
import base
import os
# TODO: https://jira.talendforge.org/browse/TDL-26640
# The test is commented as all the insights streams are not having the data.
# Please refer to the JIRA ticket for more information.

from tap_tester import runner, connections
# import base
# import os

from base import FacebookBaseTest
# from tap_tester import runner, connections

# from base import FacebookBaseTest

class FacebookAttributionWindow(FacebookBaseTest):

is_done = None
# class FacebookAttributionWindow(FacebookBaseTest):

# is_done = None

# TODO: https://jira.talendforge.org/browse/TDL-26640
EXCLUDE_STREAMS = {
'ads_insights_hourly_advertiser', # TDL-24312, TDL-26640
'ads_insights_platform_and_device', # TDL-26640
'ads_insights', # TDL-26640
'ads_insights_age_and_gender', # TDL-26640
'ads_insights_country', # TDL-26640
'ads_insights_dma', # TDL-26640
'ads_insights_region' # TDL-26640
}

@staticmethod
def name():
return "tap_tester_facebook_attribution_window"

def streams_to_test(self):
""" 'attribution window' is only supported for 'ads_insights' streams """

# Fail the test when the JIRA card is done to allow stream to be re-added and tested
if self.is_done is None:
self.is_done = base.JIRA_CLIENT.get_status_category("TDL-24312") == 'done'
self.assert_message = ("JIRA ticket has moved to done, re-add the "
"applicable EXCLUDE_STREAMS to the test.")
self.is_done_2 = base.JIRA_CLIENT.get_status_category("TDL-26640") == 'done'
# if either card is done, fail & update the test to include more streams
self.is_done = self.is_done or self.is_done_2
assert self.is_done != True, self.assert_message

# return [stream for stream in self.expected_streams() if self.is_insight(stream)]
return [stream for stream in self.expected_streams()
if self.is_insight(stream)
and stream != 'ads_insights_hourly_advertiser']

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
return_value = {
'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'),
'start_date': self.start_date,
'end_date': self.end_date,
'insights_buffer_days': str(self.ATTRIBUTION_WINDOW)
}
if original:
return return_value

return_value["start_date"] = self.start_date
return return_value

def test_run(self):
"""
For the test ad set up in facebook ads manager we see data
on April 7th, start date is based on this data
"""
# attrribution window = 7
self.ATTRIBUTION_WINDOW = 7
self.start_date = '2021-04-14T00:00:00Z'
self.end_date = '2021-04-15T00:00:00Z'
self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

# attribution window = 28
self.ATTRIBUTION_WINDOW = 28
self.start_date = '2021-04-30T00:00:00Z'
self.end_date = '2021-05-01T00:00:00Z'
self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

# attribution window = 1
self.ATTRIBUTION_WINDOW = 1
self.start_date = '2021-04-08T00:00:00Z'
self.end_date = '2021-04-09T00:00:00Z'
self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

def run_test(self, attr_window, start_date, end_date):
"""
Test to check the attribution window
"""

expected_streams = self.streams_to_test()

conn_id = connections.ensure_connection(self)

# calculate start date with attribution window
start_date_with_attribution_window = self.timedelta_formatted(
start_date, days=-attr_window, date_format=self.START_DATE_FORMAT
)

# Run in check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# Select only the expected streams tables
catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries, select_all_fields=True)

# Run a sync job using orchestrator
self.run_and_verify_sync(conn_id)
sync_records = runner.get_records_from_target_output()

expected_replication_keys = self.expected_replication_keys()

for stream in expected_streams:
with self.subTest(stream=stream):

replication_key = next(iter(expected_replication_keys[stream]))

# get records
records = [record.get('data') for record in sync_records.get(stream).get('messages')]

# check for the record is between attribution date and start date
is_between = False

for record in records:
replication_key_value = record.get(replication_key)

# Verify the sync records respect the attribution window
self.assertGreaterEqual(self.parse_date(replication_key_value), self.parse_date(start_date_with_attribution_window),
msg="The record does not respect the attribution window.")

# verify if the record's bookmark value is between start date and attribution window
if self.parse_date(start_date_with_attribution_window) <= self.parse_date(replication_key_value) <= self.parse_date(start_date):
is_between = True

self.assertTrue(is_between)
# # TODO: https://jira.talendforge.org/browse/TDL-26640
# EXCLUDE_STREAMS = {
# 'ads_insights_hourly_advertiser', # TDL-24312, TDL-26640
# 'ads_insights_platform_and_device', # TDL-26640
# 'ads_insights', # TDL-26640
# 'ads_insights_age_and_gender', # TDL-26640
# 'ads_insights_country', # TDL-26640
# 'ads_insights_dma', # TDL-26640
# 'ads_insights_region' # TDL-26640
# }

# @staticmethod
# def name():
# return "tap_tester_facebook_attribution_window"

# def streams_to_test(self):
# """ 'attribution window' is only supported for 'ads_insights' streams """

# # Fail the test when the JIRA card is done to allow stream to be re-added and tested
# if self.is_done is None:
# self.is_done = base.JIRA_CLIENT.get_status_category("TDL-24312") == 'done'
# self.assert_message = ("JIRA ticket has moved to done, re-add the "
# "applicable EXCLUDE_STREAMS to the test.")
# self.is_done_2 = base.JIRA_CLIENT.get_status_category("TDL-26640") == 'done'
# # if either card is done, fail & update the test to include more streams
# self.is_done = self.is_done or self.is_done_2
# assert self.is_done != True, self.assert_message

# # return [stream for stream in self.expected_streams() if self.is_insight(stream)]
# return [stream for stream in self.expected_streams()
# if self.is_insight(stream)
# and stream != 'ads_insights_hourly_advertiser']

# def get_properties(self, original: bool = True):
# """Configuration properties required for the tap."""
# return_value = {
# 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'),
# 'start_date': self.start_date,
# 'end_date': self.end_date,
# 'insights_buffer_days': str(self.ATTRIBUTION_WINDOW)
# }
# if original:
# return return_value

# return_value["start_date"] = self.start_date
# return return_value

# def test_run(self):
# """
# For the test ad set up in facebook ads manager we see data
# on April 7th, start date is based on this data
# """
# # attrribution window = 7
# self.ATTRIBUTION_WINDOW = 7
# self.start_date = '2021-04-14T00:00:00Z'
# self.end_date = '2021-04-15T00:00:00Z'
# self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

# # attribution window = 28
# self.ATTRIBUTION_WINDOW = 28
# self.start_date = '2021-04-30T00:00:00Z'
# self.end_date = '2021-05-01T00:00:00Z'
# self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

# # attribution window = 1
# self.ATTRIBUTION_WINDOW = 1
# self.start_date = '2021-04-08T00:00:00Z'
# self.end_date = '2021-04-09T00:00:00Z'
# self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

# def run_test(self, attr_window, start_date, end_date):
# """
# Test to check the attribution window
# """

# expected_streams = self.streams_to_test()

# conn_id = connections.ensure_connection(self)

# # calculate start date with attribution window
# start_date_with_attribution_window = self.timedelta_formatted(
# start_date, days=-attr_window, date_format=self.START_DATE_FORMAT
# )

# # Run in check mode
# found_catalogs = self.run_and_verify_check_mode(conn_id)

# # Select only the expected streams tables
# catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
# self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries, select_all_fields=True)

# # Run a sync job using orchestrator
# self.run_and_verify_sync(conn_id)
# sync_records = runner.get_records_from_target_output()

# expected_replication_keys = self.expected_replication_keys()

# for stream in expected_streams:
# with self.subTest(stream=stream):

# replication_key = next(iter(expected_replication_keys[stream]))

# # get records
# records = [record.get('data') for record in sync_records.get(stream).get('messages')]

# # check for the record is between attribution date and start date
# is_between = False

# for record in records:
# replication_key_value = record.get(replication_key)

# # Verify the sync records respect the attribution window
# self.assertGreaterEqual(self.parse_date(replication_key_value), self.parse_date(start_date_with_attribution_window),
# msg="The record does not respect the attribution window.")

# # verify if the record's bookmark value is between start date and attribution window
# if self.parse_date(start_date_with_attribution_window) <= self.parse_date(replication_key_value) <= self.parse_date(start_date):
# is_between = True

# self.assertTrue(is_between)
28 changes: 26 additions & 2 deletions tests/test_facebook_automatic_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,43 @@
import os

from tap_tester import runner, connections

import base
from base import FacebookBaseTest


class FacebookAutomaticFields(FacebookBaseTest):
"""Test that with no fields selected for a stream automatic fields are still replicated"""

is_done = None

# TODO: https://jira.talendforge.org/browse/TDL-26640
EXCLUDE_STREAMS = {
'ads_insights_hourly_advertiser', # TDL-24312, TDL-26640
'ads_insights_platform_and_device', # TDL-26640
'ads_insights', # TDL-26640
'ads_insights_age_and_gender', # TDL-26640
'ads_insights_country', # TDL-26640
'ads_insights_dma', # TDL-26640
'ads_insights_region' # TDL-26640
}

@staticmethod
def name():
return "tap_tester_facebook_automatic_fields"

def streams_to_test(self):
return self.expected_streams()
# return set(self.expected_metadata().keys())
# Fail the test when the JIRA card is done to allow stream to be re-added and tested
if self.is_done is None:
self.is_done = base.JIRA_CLIENT.get_status_category("TDL-24312") == 'done'
self.assert_message = ("JIRA ticket has moved to done, re-add the "
"applicable EXCLUDE_STREAMS to the test.")
self.is_done_2 = base.JIRA_CLIENT.get_status_category("TDL-26640") == 'done'
# if either card is done, fail & update the test to include more streams
self.is_done = self.is_done or self.is_done_2
assert self.is_done != True, self.assert_message

return self.expected_metadata().keys() - self.EXCLUDE_STREAMS

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
Expand Down
3 changes: 2 additions & 1 deletion tests/test_facebook_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def test_run(self):
# Testing against ads insights objects
self.start_date = self.get_properties()['start_date']
self.end_date = self.get_properties()['end_date']
self.bookmarks_test(insight_streams)
# TODO: https://jira.talendforge.org/browse/TDL-26640
# self.bookmarks_test(insight_streams)

# Testing against core objects
self.end_date = '2021-02-09T00:00:00Z'
Expand Down
27 changes: 26 additions & 1 deletion tests/test_facebook_table_reset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import dateutil.parser
import datetime
import base
from base_new_frmwrk import FacebookBaseTest
from tap_tester.base_suite_tests.table_reset_test import TableResetTest

Expand All @@ -9,12 +10,36 @@ class FacebookTableResetTest(TableResetTest, FacebookBaseTest):
"""tap-salesforce Table reset test implementation
Currently tests only the stream with Incremental replication method"""

is_done = None

@staticmethod
def name():
return "tt_facebook_table_reset"

# TODO: https://jira.talendforge.org/browse/TDL-26640
EXCLUDE_STREAMS = {
'ads_insights_hourly_advertiser', # TDL-24312, TDL-26640
'ads_insights_platform_and_device', # TDL-26640
'ads_insights', # TDL-26640
'ads_insights_age_and_gender', # TDL-26640
'ads_insights_country', # TDL-26640
'ads_insights_dma', # TDL-26640
'ads_insights_region' # TDL-26640
}

def streams_to_test(self):
return self.expected_stream_names()
# return set(self.expected_metadata().keys())
# Fail the test when the JIRA card is done to allow stream to be re-added and tested
if self.is_done is None:
self.is_done = base.JIRA_CLIENT.get_status_category("TDL-24312") == 'done'
self.assert_message = ("JIRA ticket has moved to done, re-add the "
"applicable EXCLUDE_STREAMS to the test.")
self.is_done_2 = base.JIRA_CLIENT.get_status_category("TDL-26640") == 'done'
# if either card is done, fail & update the test to include more streams
self.is_done = self.is_done or self.is_done_2
assert self.is_done != True, self.assert_message

return self.expected_metadata().keys() - self.EXCLUDE_STREAMS

@property
def reset_stream(self):
Expand Down

0 comments on commit ff1f23b

Please sign in to comment.