-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tdl-17121, 24442, 24575 #174
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,165 +1,41 @@ | ||
import unittest | ||
import datetime | ||
import dateutil.parser | ||
import pytz | ||
from tap_tester.base_suite_tests.bookmark_test import BookmarkTest | ||
from sfbase import SFBaseTest | ||
|
||
from tap_tester import runner, menagerie, connections | ||
|
||
from base import SalesforceBaseTest | ||
class SFBookmarkTest(BookmarkTest, SFBaseTest): | ||
|
||
|
||
class SalesforceBookmarks(SalesforceBaseTest): | ||
salesforce_api = 'BULK' | ||
@staticmethod | ||
def name(): | ||
return "tap_tester_salesforce_bookmarks" | ||
return "tt_sf_bookmarks" | ||
|
||
@staticmethod | ||
def expected_sync_streams(): | ||
def streams_to_test(): | ||
return { | ||
'Account', | ||
'Contact', | ||
# 'Lead', # TODO grab the dates that exist | ||
# 'Opportunity', # cannot test, dates are 1 s apart | ||
'User', | ||
'Publisher', | ||
'AppDefinition', | ||
} | ||
|
||
@staticmethod | ||
def convert_state_to_utc(date_str): | ||
""" | ||
Convert a saved bookmark value of the form '2020-08-25T13:17:36-07:00' to | ||
a string formatted utc datetime, | ||
in order to compare aginast json formatted datetime values | ||
""" | ||
date_object = dateutil.parser.parse(date_str) | ||
date_object_utc = date_object.astimezone(tz=pytz.UTC) | ||
return datetime.datetime.strftime(date_object_utc, "%Y-%m-%dT%H:%M:%SZ") | ||
|
||
def calculated_states_by_stream(self, current_state): | ||
""" | ||
Look at the bookmarks from a previous sync and set a new bookmark | ||
value that is 1 day prior. This ensures the subsequent sync will replicate | ||
at least 1 record but, fewer records than the previous sync. | ||
""" | ||
|
||
stream_to_current_state = {stream : bookmark.get(self.expected_replication_keys()[stream].pop()) | ||
for stream, bookmark in current_state['bookmarks'].items()} | ||
stream_to_calculated_state = {stream: "" for stream in self.expected_sync_streams()} | ||
|
||
timedelta_by_stream = {stream: [1,0,0] # {stream_name: [days, hours, minutes], ...} | ||
for stream in self.expected_streams()} | ||
timedelta_by_stream['Account'] = [0, 0, 2] | ||
|
||
for stream, state in stream_to_current_state.items(): | ||
days, hours, minutes = timedelta_by_stream[stream] | ||
|
||
# convert state from string to datetime object | ||
state_as_datetime = dateutil.parser.parse(state) | ||
calculated_state_as_datetime = state_as_datetime - datetime.timedelta(days=days, hours=hours, minutes=minutes) | ||
# convert back to string and format | ||
calculated_state = datetime.datetime.strftime(calculated_state_as_datetime, "%Y-%m-%dT%H:%M:%S.000000Z") | ||
stream_to_calculated_state[stream] = calculated_state | ||
|
||
return stream_to_calculated_state | ||
|
||
def test_run(self): | ||
self.salesforce_api = 'BULK' | ||
|
||
replication_keys = self.expected_replication_keys() | ||
|
||
# SYNC 1 | ||
conn_id = connections.ensure_connection(self) | ||
|
||
# Run in check mode | ||
found_catalogs = self.run_and_verify_check_mode(conn_id) | ||
|
||
# Select only the expected streams tables | ||
expected_streams = self.expected_sync_streams() | ||
catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams] | ||
self.select_all_streams_and_fields(conn_id, catalog_entries) | ||
streams_replication_methods = {stream: self.INCREMENTAL | ||
for stream in expected_streams} | ||
self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods) | ||
|
||
# Run a sync job using orchestrator | ||
first_sync_record_count = self.run_and_verify_sync(conn_id) | ||
first_sync_records = runner.get_records_from_target_output() | ||
first_sync_bookmarks = menagerie.get_state(conn_id) | ||
|
||
# UPDATE STATE BETWEEN SYNCS | ||
new_states = {'bookmarks': dict()} | ||
for stream, new_state in self.calculated_states_by_stream(first_sync_bookmarks).items(): | ||
replication_key = list(replication_keys[stream])[0] | ||
new_states['bookmarks'][stream] = {replication_key: new_state} | ||
menagerie.set_state(conn_id, new_states) | ||
|
||
# SYNC 2 | ||
second_sync_record_count = self.run_and_verify_sync(conn_id) | ||
second_sync_records = runner.get_records_from_target_output() | ||
second_sync_bookmarks = menagerie.get_state(conn_id) | ||
|
||
# Test by stream | ||
for stream in expected_streams: | ||
with self.subTest(stream=stream): | ||
# record counts | ||
first_sync_count = first_sync_record_count.get(stream, 0) | ||
second_sync_count = second_sync_record_count.get(stream, 0) | ||
|
||
# data from record messages | ||
first_sync_messages = [record.get('data') for record in | ||
first_sync_records.get(stream).get('messages') | ||
if record.get('action') == 'upsert'] | ||
second_sync_messages = [record.get('data') for record in | ||
second_sync_records.get(stream).get('messages') | ||
if record.get('action') == 'upsert'] | ||
|
||
# replication key for comparing data | ||
self.assertEqual(1, len(list(replication_keys[stream])), | ||
msg="Compound primary keys require a change to test expectations") | ||
replication_key = list(replication_keys[stream])[0] | ||
|
||
# bookmarked states (top level objects) | ||
first_bookmark_key_value = first_sync_bookmarks.get('bookmarks').get(stream) | ||
second_bookmark_key_value = second_sync_bookmarks.get('bookmarks').get(stream) | ||
|
||
# Verify the first sync sets a bookmark of the expected form | ||
self.assertIsNotNone(first_bookmark_key_value) | ||
self.assertIsNotNone(first_bookmark_key_value.get(replication_key)) | ||
|
||
# Verify the second sync sets a bookmark of the expected form | ||
self.assertIsNotNone(second_bookmark_key_value) | ||
self.assertIsNotNone(second_bookmark_key_value.get(replication_key)) | ||
|
||
# bookmarked states (actual values) | ||
first_bookmark_value = first_bookmark_key_value.get(replication_key) | ||
second_bookmark_value = second_bookmark_key_value.get(replication_key) | ||
# bookmarked values as utc for comparing against records | ||
first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value) | ||
second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value) | ||
|
||
# Verify the second sync bookmark is Equal to the first sync bookmark | ||
self.assertEqual(second_bookmark_value, first_bookmark_value) # assumes no changes to data during test | ||
|
||
# Verify the second sync records respect the previous (simulated) bookmark value | ||
simulated_bookmark_value = new_states['bookmarks'][stream][replication_key] | ||
for record in second_sync_messages: | ||
replication_key_value = record.get(replication_key) | ||
self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, | ||
msg="Second sync records do not repect the previous bookmark.") | ||
|
||
# Verify the first sync bookmark value is the max replication key value for a given stream | ||
for record in first_sync_messages: | ||
replication_key_value = record.get(replication_key) | ||
self.assertLessEqual(replication_key_value, first_bookmark_value_utc, | ||
msg="First sync bookmark was set incorrectly, a record with a greater rep key value was synced") | ||
|
||
# Verify the second sync bookmark value is the max replication key value for a given stream | ||
for record in second_sync_messages: | ||
replication_key_value = record.get(replication_key) | ||
self.assertLessEqual(replication_key_value, second_bookmark_value_utc, | ||
msg="Second sync bookmark was set incorrectly, a record with a greater rep key value was synced") | ||
|
||
# Verify the number of records in the 2nd sync is less then the first | ||
self.assertLess(second_sync_count, first_sync_count) | ||
bookmark_format ="%Y-%m-%dT%H:%M:%S.%fZ" | ||
|
||
initial_bookmarks = {} | ||
streams_replication_method = {} | ||
def streams_replication_methods(self): | ||
streams_to_set_rep_method = [catalog['tap_stream_id'] for catalog in BookmarkTest.test_catalogs | ||
if 'forced-replication-method' not in catalog['metadata'].keys()] | ||
if len(streams_to_set_rep_method) > 0: | ||
self.streams_replication_method = {stream: 'INCREMENTAL' | ||
for stream in streams_to_set_rep_method} | ||
return self.streams_replication_method | ||
|
||
def adjusted_expected_replication_method(self): | ||
streams_to_set_rep_method = [catalog['tap_stream_id'] for catalog in BookmarkTest.test_catalogs | ||
if 'forced-replication-method' not in catalog['metadata'].keys()] | ||
expected_replication_methods = self.expected_replication_method() | ||
if self.streams_replication_method: | ||
for stream in streams_to_set_rep_method : | ||
expected_replication_methods[stream] = self.streams_replication_method[stream] | ||
return expected_replication_methods | ||
return expected_replication_methods | ||
|
||
# Verify at least 1 record was replicated in the second sync | ||
self.assertGreater(second_sync_count, 0, msg="We are not fully testing bookmarking for {}".format(stream)) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if these two methods are specific to the bookmarks test or should be moved to sfbase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I think its better to leave them in bookmark test, I don't see other test which requires these methods for now.