Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tdl-17121, 24442, 24575 #174

Merged
merged 3 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions tests/sfbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,11 +928,9 @@ def setUpClass(cls):
def get_custom_fields(self, found_catalogs, conn_id):
""" List all the custom_fields for each stream"""
custom_fields = {}
for stream in self.streams_to_test():

catalog = [catalog for catalog in found_catalogs
if catalog["stream_name"] == stream][0]
for catalog in found_catalogs:
schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])["annotated-schema"]
stream = catalog['stream_name']
custom_fields[stream] = {key for key in schema['properties'].keys()
if key.endswith("__c")}
return custom_fields
Expand All @@ -949,6 +947,19 @@ def get_non_custom_fields(self, found_catalogs, conn_id):
and schema['properties'][key]['inclusion'] != "unsupported"}
return non_custom_fields

def get_select_by_default_fields(self, found_catalogs, conn_id):
""" List all the selected_by_default fields for each stream"""

select_by_default_fields = {}
other_fields = {}
for catalog in found_catalogs:
schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])['metadata']
stream = catalog['stream_name']
select_by_default_fields[stream] = {item['breadcrumb'][-1] for item in schema
if item['breadcrumb'] != [] and
item['metadata'].get('selected-by-default') == True}
return select_by_default_fields

@staticmethod
def count_custom_non_custom_fields(fields):
custom = 0
Expand Down Expand Up @@ -1008,7 +1019,7 @@ def get_streams_with_data():
'LoginGeo',
'FlowDefinitionView',
'LightningToggleMetrics',
'LightningExitByPageMetrics',
#'LightningExitByPageMetrics', --- removing form the list has not getting any data
'PermissionSetTabSetting',
'MilestoneType',
'Period',
Expand Down Expand Up @@ -1172,6 +1183,17 @@ def get_full_table_streams(self):
}
return full_table_streams

def get_custom_fields_streams(self):
custom_field_streams = {
'Account',
'Case',
'Contact',
'Lead',
'Opportunity',
'TapTester__c',
}
return custom_field_streams

def switchable_streams(self):
streams = self.expected_stream_names().difference(self.get_full_table_streams())
final_list = streams.intersection(self.get_streams_with_data())
Expand Down
9 changes: 7 additions & 2 deletions tests/test_salesforce_all_fields_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class SFCustomFieldsTest(AllFieldsTest, SFBaseTest):
def name():
return "tt_sf_all_fields_custom"

streams_to_test = SFBaseTest.streams_to_test
def streams_to_test(self):
return self.get_custom_fields_streams()

def streams_to_selected_fields(self):
found_catalogs = AllFieldsTest.found_catalogs
Expand All @@ -21,7 +22,11 @@ def streams_to_selected_fields(self):
return custom_fields

def test_all_fields_for_streams_are_replicated(self):
for stream in self.streams_to_test():
selected_streams = self.streams_to_test()
actual_custom_field_streams = {key for key in self.selected_fields.keys() if self.selected_fields.get(key,set())}
self.assertSetEqual( selected_streams, actual_custom_field_streams,
msg = f"More streams have custom fields actual_custom_field_streams.diff(selected_streams)")
for stream in selected_streams:
with self.subTest(stream=stream):
automatic_fields = self.expected_automatic_fields(stream)
expected_custom_fields = self.selected_fields.get(stream, set()).union(automatic_fields)
Expand Down
10 changes: 8 additions & 2 deletions tests/test_salesforce_all_fields_custom_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class SFCustomFieldsTestRest(AllFieldsTest, SFBaseTest):
def name():
return "tt_sf_all_fields_custom_rest"

streams_to_test = SFBaseTest.streams_to_test
def streams_to_test(self):
return self.get_custom_fields_streams()

def streams_to_selected_fields(self):
found_catalogs = AllFieldsTest.found_catalogs
Expand All @@ -21,7 +22,12 @@ def streams_to_selected_fields(self):
return custom_fields

def test_all_fields_for_streams_are_replicated(self):
for stream in self.streams_to_test():

selected_streams = self.streams_to_test()
actual_custom_field_streams = {key for key in self.selected_fields.keys() if self.selected_fields.get(key,set())}
self.assertSetEqual( selected_streams, actual_custom_field_streams,
msg = f"More streams have custom fields actual_custom_field_streams.diff(selected_streams)")
for stream in selected_streams:
with self.subTest(stream=stream):
automatic_fields = self.expected_automatic_fields(stream)
expected_custom_fields = self.selected_fields.get(stream, set()).union(automatic_fields)
Expand Down
13 changes: 11 additions & 2 deletions tests/test_salesforce_all_fields_non_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,16 @@ class SFNonCustomFieldsTest(AllFieldsTest, SFBaseTest):
def name():
return "tt_sf_all_fields_non_custom"

streams_to_test = SFBaseTest.streams_to_test
def streams_to_test(self):
return {
'Account',
'ActiveProfileMetric',
'Calendar',
'ContentWorkspacePermission',
'CampaignMemberStatus',
'Community',
}


def streams_to_selected_fields(self):
found_catalogs = AllFieldsTest.found_catalogs
Expand All @@ -22,7 +31,7 @@ def streams_to_selected_fields(self):
return non_custom_fields

def test_non_custom_fields(self):
for stream in self.streams_to_selected_fields():
for stream in self.streams_to_test():
with self.subTest(stream=stream):
expected_non_custom_fields = self.selected_fields.get(stream,set())
replicated_non_custom_fields = self.actual_fields.get(stream, set())
Expand Down
12 changes: 10 additions & 2 deletions tests/test_salesforce_all_fields_non_custom_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@ class SFNonCustomFieldsTestRest(AllFieldsTest, SFBaseTest):
def name():
return "tt_sf_all_fields_non_custom_rest"

streams_to_test = SFBaseTest.streams_to_test
def streams_to_test(self):
return {
'Case',
'PricebookEntry',
'Profile',
'PermissionSet',
'Product2',
'PromptAction',
}

def streams_to_selected_fields(self):
found_catalogs = AllFieldsTest.found_catalogs
Expand All @@ -23,7 +31,7 @@ def streams_to_selected_fields(self):
return non_custom_fields

def test_non_custom_fields(self):
for stream in self.streams_to_selected_fields():
for stream in self.streams_to_test():
with self.subTest(stream=stream):
expected_non_custom_fields = self.selected_fields.get(stream,set())
replicated_non_custom_fields = self.actual_fields.get(stream, set())
Expand Down
182 changes: 29 additions & 153 deletions tests/test_salesforce_bookmarks.py
Original file line number Diff line number Diff line change
@@ -1,165 +1,41 @@
import unittest
import datetime
import dateutil.parser
import pytz
from tap_tester.base_suite_tests.bookmark_test import BookmarkTest
from sfbase import SFBaseTest

from tap_tester import runner, menagerie, connections

from base import SalesforceBaseTest
class SFBookmarkTest(BookmarkTest, SFBaseTest):


class SalesforceBookmarks(SalesforceBaseTest):
salesforce_api = 'BULK'
@staticmethod
def name():
return "tap_tester_salesforce_bookmarks"
return "tt_sf_bookmarks"

@staticmethod
def expected_sync_streams():
def streams_to_test():
return {
'Account',
'Contact',
# 'Lead', # TODO grab the dates that exist
# 'Opportunity', # cannot test, dates are 1 s apart
'User',
'Publisher',
'AppDefinition',
}

@staticmethod
def convert_state_to_utc(date_str):
"""
Convert a saved bookmark value of the form '2020-08-25T13:17:36-07:00' to
a string formatted utc datetime,
in order to compare aginast json formatted datetime values
"""
date_object = dateutil.parser.parse(date_str)
date_object_utc = date_object.astimezone(tz=pytz.UTC)
return datetime.datetime.strftime(date_object_utc, "%Y-%m-%dT%H:%M:%SZ")

def calculated_states_by_stream(self, current_state):
"""
Look at the bookmarks from a previous sync and set a new bookmark
value that is 1 day prior. This ensures the subsequent sync will replicate
at least 1 record but, fewer records than the previous sync.
"""

stream_to_current_state = {stream : bookmark.get(self.expected_replication_keys()[stream].pop())
for stream, bookmark in current_state['bookmarks'].items()}
stream_to_calculated_state = {stream: "" for stream in self.expected_sync_streams()}

timedelta_by_stream = {stream: [1,0,0] # {stream_name: [days, hours, minutes], ...}
for stream in self.expected_streams()}
timedelta_by_stream['Account'] = [0, 0, 2]

for stream, state in stream_to_current_state.items():
days, hours, minutes = timedelta_by_stream[stream]

# convert state from string to datetime object
state_as_datetime = dateutil.parser.parse(state)
calculated_state_as_datetime = state_as_datetime - datetime.timedelta(days=days, hours=hours, minutes=minutes)
# convert back to string and format
calculated_state = datetime.datetime.strftime(calculated_state_as_datetime, "%Y-%m-%dT%H:%M:%S.000000Z")
stream_to_calculated_state[stream] = calculated_state

return stream_to_calculated_state

def test_run(self):
self.salesforce_api = 'BULK'

replication_keys = self.expected_replication_keys()

# SYNC 1
conn_id = connections.ensure_connection(self)

# Run in check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# Select only the expected streams tables
expected_streams = self.expected_sync_streams()
catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
self.select_all_streams_and_fields(conn_id, catalog_entries)
streams_replication_methods = {stream: self.INCREMENTAL
for stream in expected_streams}
self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods)

# Run a sync job using orchestrator
first_sync_record_count = self.run_and_verify_sync(conn_id)
first_sync_records = runner.get_records_from_target_output()
first_sync_bookmarks = menagerie.get_state(conn_id)

# UPDATE STATE BETWEEN SYNCS
new_states = {'bookmarks': dict()}
for stream, new_state in self.calculated_states_by_stream(first_sync_bookmarks).items():
replication_key = list(replication_keys[stream])[0]
new_states['bookmarks'][stream] = {replication_key: new_state}
menagerie.set_state(conn_id, new_states)

# SYNC 2
second_sync_record_count = self.run_and_verify_sync(conn_id)
second_sync_records = runner.get_records_from_target_output()
second_sync_bookmarks = menagerie.get_state(conn_id)

# Test by stream
for stream in expected_streams:
with self.subTest(stream=stream):
# record counts
first_sync_count = first_sync_record_count.get(stream, 0)
second_sync_count = second_sync_record_count.get(stream, 0)

# data from record messages
first_sync_messages = [record.get('data') for record in
first_sync_records.get(stream).get('messages')
if record.get('action') == 'upsert']
second_sync_messages = [record.get('data') for record in
second_sync_records.get(stream).get('messages')
if record.get('action') == 'upsert']

# replication key for comparing data
self.assertEqual(1, len(list(replication_keys[stream])),
msg="Compound primary keys require a change to test expectations")
replication_key = list(replication_keys[stream])[0]

# bookmarked states (top level objects)
first_bookmark_key_value = first_sync_bookmarks.get('bookmarks').get(stream)
second_bookmark_key_value = second_sync_bookmarks.get('bookmarks').get(stream)

# Verify the first sync sets a bookmark of the expected form
self.assertIsNotNone(first_bookmark_key_value)
self.assertIsNotNone(first_bookmark_key_value.get(replication_key))

# Verify the second sync sets a bookmark of the expected form
self.assertIsNotNone(second_bookmark_key_value)
self.assertIsNotNone(second_bookmark_key_value.get(replication_key))

# bookmarked states (actual values)
first_bookmark_value = first_bookmark_key_value.get(replication_key)
second_bookmark_value = second_bookmark_key_value.get(replication_key)
# bookmarked values as utc for comparing against records
first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value)
second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value)

# Verify the second sync bookmark is Equal to the first sync bookmark
self.assertEqual(second_bookmark_value, first_bookmark_value) # assumes no changes to data during test

# Verify the second sync records respect the previous (simulated) bookmark value
simulated_bookmark_value = new_states['bookmarks'][stream][replication_key]
for record in second_sync_messages:
replication_key_value = record.get(replication_key)
self.assertGreaterEqual(replication_key_value, simulated_bookmark_value,
msg="Second sync records do not repect the previous bookmark.")

# Verify the first sync bookmark value is the max replication key value for a given stream
for record in first_sync_messages:
replication_key_value = record.get(replication_key)
self.assertLessEqual(replication_key_value, first_bookmark_value_utc,
msg="First sync bookmark was set incorrectly, a record with a greater rep key value was synced")

# Verify the second sync bookmark value is the max replication key value for a given stream
for record in second_sync_messages:
replication_key_value = record.get(replication_key)
self.assertLessEqual(replication_key_value, second_bookmark_value_utc,
msg="Second sync bookmark was set incorrectly, a record with a greater rep key value was synced")

# Verify the number of records in the 2nd sync is less then the first
self.assertLess(second_sync_count, first_sync_count)
bookmark_format ="%Y-%m-%dT%H:%M:%S.%fZ"

initial_bookmarks = {}
streams_replication_method = {}
def streams_replication_methods(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if these two methods are specific to the bookmarks test or should be moved to sfbase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I think its better to leave them in bookmark test, I don't see other test which requires these methods for now.

streams_to_set_rep_method = [catalog['tap_stream_id'] for catalog in BookmarkTest.test_catalogs
if 'forced-replication-method' not in catalog['metadata'].keys()]
if len(streams_to_set_rep_method) > 0:
self.streams_replication_method = {stream: 'INCREMENTAL'
for stream in streams_to_set_rep_method}
return self.streams_replication_method

def adjusted_expected_replication_method(self):
streams_to_set_rep_method = [catalog['tap_stream_id'] for catalog in BookmarkTest.test_catalogs
if 'forced-replication-method' not in catalog['metadata'].keys()]
expected_replication_methods = self.expected_replication_method()
if self.streams_replication_method:
for stream in streams_to_set_rep_method :
expected_replication_methods[stream] = self.streams_replication_method[stream]
return expected_replication_methods
return expected_replication_methods

# Verify at least 1 record was replicated in the second sync
self.assertGreater(second_sync_count, 0, msg="We are not fully testing bookmarking for {}".format(stream))
Loading