Skip to content

Commit

Permalink
TDl-24356-swtich replication methods
Browse files Browse the repository at this point in the history
  • Loading branch information
JYOTHINARAYANSETTY committed Nov 8, 2023
1 parent 8f2a106 commit 9829b9e
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 0 deletions.
46 changes: 46 additions & 0 deletions tests/sfbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,52 @@ def get_unsupported_by_bulk_api(self):

return unsupported_streams_bulk_only | unsupported_streams_rest

def get_full_table_streams(self):
full_table_streams = {
'EventBusSubscriber',
'ContentFolderLink',
'TabDefinition',
'ReportEvent',
'FormulaFunctionCategory',
'FormulaFunction',
'UserSetupEntityAccess',
'AuraDefinitionBundleInfo',
'DatacloudAddress',
'ContentTagSubscription',
'FeedAttachment',
'EmbeddedServiceDetail',
'UriEvent',
'DashboardComponent',
'RecentlyViewed',
'IdpEventLog',
'PlatformEventUsageMetric',
'UserPermissionAccess',
'LightningUriEvent',
'Publisher',
'CronJobDetail',
'EmbeddedServiceLabel',
'DatacloudDandBCompany',
'ContentDocumentSubscription',
'ThirdPartyAccountLink',
'ContentUserSubscription',
'LogoutEvent',
'ContentWorkspaceSubscription',
'LoginEvent',
'UserAppMenuItem',
'AppDefinition',
'DatacloudContact',
'SalesStore',
'DatacloudCompany',
'FormulaFunctionAllowedType',
'ApexPageInfo'
}
return full_table_streams

def switchable_streams(self):
streams = self.expected_stream_names().difference(self.get_full_table_streams())
final_list = streams.intersection(self.get_streams_with_data())
return final_list

def is_unsupported_by_rest_api(self, stream):
"""returns True if stream is unsupported by REST API"""

Expand Down
106 changes: 106 additions & 0 deletions tests/test_salesforce_switch_rep_method_ft_incrmntl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import unittest
import datetime
import dateutil.parser
import pytz
import copy
from datetime import datetime, timedelta

from tap_tester import runner, menagerie, connections

from sfbase import SFBaseTest


class SFSwitchRepMethodIncrmntl(SFBaseTest):

start_date = '2000-11-11T00:00:00Z'#to get max data available for testing
@staticmethod
def name():
return "tt_sf_table_switch_rep_method_ft_incrmntl"


def expected_sync_streams(self):
streams = self.switchable_streams() - {'FlowDefinitionView','EntityDefinition'}
return self.partition_streams(streams)


def test_run(self):
self.salesforce_api = 'BULK'

replication_keys = self.expected_replication_keys()
primary_keys = self.expected_primary_keys()
# SYNC 1
conn_id = connections.ensure_connection(self)

# Run in check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# Select only the expected streams tables
expected_streams = self.expected_sync_streams()
catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
self.select_all_streams_and_fields(conn_id, catalog_entries)
streams_replication_methods = {stream: self.FULL_TABLE
for stream in expected_streams}
self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods)

# Run a sync job using orchestrator
fulltbl_sync_record_count = self.run_and_verify_sync_mode(conn_id)
fulltbl_sync_records = runner.get_records_from_target_output()

fulltbl_sync_bookmarks = menagerie.get_state(conn_id)

#Switch the replication method from full table to Incremental
streams_replication_methods = {stream: self.INCREMENTAL
for stream in expected_streams}
self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods)

# SYNC 2
incrmntl_sync_record_count = self.run_and_verify_sync_mode(conn_id)
incrmntl_sync_records = runner.get_records_from_target_output()
incrmntl_sync_bookmarks = menagerie.get_state(conn_id)

# Test by stream
for stream in expected_streams:
with self.subTest(stream=stream):
# record counts
fulltbl_sync_count = fulltbl_sync_record_count.get(stream, 0)
incrmntl_sync_count = incrmntl_sync_record_count.get(stream, 0)
replication_key = list(replication_keys[stream])[0]

# Verify at least 1 record was replicated in the Incrmental sync
self.assertGreater(incrmntl_sync_count, 0,
msg="We are not fully testing bookmarking for {}".format(stream))
# data from record messages
primary_key = list(primary_keys[stream])[0]
fulltbl_sync_messages = [record['data'] for record in
fulltbl_sync_records.get(stream).get('messages')
if record.get('action') == 'upsert']
fulltbl_primary_keys = {message[primary_key] for message in fulltbl_sync_messages}
incrmntl_sync_messages = [record['data'] for record in
incrmntl_sync_records.get(stream).get('messages')
if record.get('action') == 'upsert']
incrmntl_primary_keys = {message[primary_key] for message in incrmntl_sync_messages}

import ipdb; ipdb.set_trace()
1+1
#Verify all records are synced in the second sync
self.assertTrue(fulltbl_primary_keys.issubset(incrmntl_primary_keys))

#verify that the last message is not a activateversion message for incremental sync
self.assertNotEqual('activate_version', incrmntl_sync_records[stream]['messages'][-1]['action'])

#verify that the table version incremented after every sync
self.assertGreater(
incrmntl_sync_records[stream]['table_version'], fulltbl_sync_records[stream]['table_version'])

# bookmarked states (top level objects)
incrmntl_bookmark_key_value = incrmntl_sync_bookmarks.get('bookmarks').get(stream)

# bookmarked states (actual values)
incrmntl_bookmark_value = incrmntl_bookmark_key_value.get(replication_key)

# Verify the incremental sync sets a bookmark of the expected form
self.assertIsNotNone(incrmntl_bookmark_key_value)
self.assertIsNotNone(incrmntl_bookmark_key_value.get(replication_key))

#verify that bookmarks are present after switching to Incremental rep method
self.assertIsNotNone(incrmntl_bookmark_value)
103 changes: 103 additions & 0 deletions tests/test_salesforce_switch_rep_method_incrmntl_ft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import unittest
import datetime
import dateutil.parser
import pytz
import copy
from datetime import datetime, timedelta

from tap_tester import runner, menagerie, connections

from sfbase import SFBaseTest


class SFSwitchRepMethodFulltable(SFBaseTest):

start_date = '2000-01-23T00:00:00Z'
@staticmethod
def name():
return "tt_sf_table_switch_rep_method_incrmntl_ft"

def expected_sync_streams(self):
streams = self.switchable_streams() - {'FlowDefinitionView', 'EntityDefinition'}
return self.partition_streams(streams)

def test_run(self):
self.salesforce_api = 'BULK'
replication_keys = self.expected_replication_keys()
primary_keys = self.expected_primary_keys()
# SYNC 1
conn_id = connections.ensure_connection(self)

# Run in check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# Select only the expected streams tables
expected_streams = self.expected_sync_streams()
catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
self.select_all_streams_and_fields(conn_id, catalog_entries)
streams_replication_methods = {stream: self.INCREMENTAL
for stream in expected_streams}
self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods)

# Run a sync job using orchestrator
incrmntl_sync_record_count = self.run_and_verify_sync_mode(conn_id)
incrmntl_sync_records = runner.get_records_from_target_output()

incrmntl_sync_bookmarks = menagerie.get_state(conn_id)

#Switch the replication method from incremental to full table
streams_replication_methods = {stream: self.FULL_TABLE
for stream in expected_streams}
self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods)

# SYNC 2
fulltbl_sync_record_count = self.run_and_verify_sync_mode(conn_id)
fulltbl_sync_records = runner.get_records_from_target_output()
fulltbl_sync_bookmarks = menagerie.get_state(conn_id)

# Test by stream
for stream in expected_streams:
with self.subTest(stream=stream):
# record counts
incrmntl_sync_count = incrmntl_sync_record_count.get(stream, 0)
fulltbl_sync_count = fulltbl_sync_record_count.get(stream, 0)
replication_key = list(replication_keys[stream])[0]
# Verify at least 1 record was replicated in the fulltbl sync
self.assertGreater(fulltbl_sync_count, 0,
msg="We are not fully testing bookmarking for {}".format(stream))
# data from record messages
primary_key = list(primary_keys[stream])[0]
incrmntl_sync_messages = [record['data'] for record in
incrmntl_sync_records.get(stream).get('messages')
if record.get('action') == 'upsert']
incrmntl_primary_keys = {message[primary_key] for message in incrmntl_sync_messages}
fulltbl_sync_messages = [record['data'] for record in
fulltbl_sync_records.get(stream).get('messages')
if record.get('action') == 'upsert']
fulltbl_primary_keys = {message[primary_key] for message in fulltbl_sync_messages}

#Verify all records are synced in the second sync
self.assertTrue(incrmntl_primary_keys.issubset(fulltbl_primary_keys))

#Verify that the fulltable sync count is greater or equal to incrmental sync count
self.assertGreaterEqual(fulltbl_sync_count, incrmntl_sync_count,
msg = "Full table sync didn't fetch all the records")

#verify that last messages of every stream is the activate version message
self.assertEqual('activate_version', fulltbl_sync_records[stream]['messages'][fulltbl_sync_count+1]
['action'])

#verify that table version is present for a fulltable sync
self.assertIsNotNone(fulltbl_sync_records[stream]['table_version'])

#Verify that the table version is incremented after every sync
self.assertGreater(
fulltbl_sync_records[stream]['table_version'],incrmntl_sync_records[stream]['table_version'])

# bookmarked states (top level objects)
fulltbl_bookmark_key_value = fulltbl_sync_bookmarks.get('bookmarks').get(stream)

# bookmarked states (actual values)
fulltbl_bookmark_value = fulltbl_bookmark_key_value.get(replication_key)
#verify no bookmarks are present in fulltbl sync
self.assertIsNone(fulltbl_bookmark_value)

0 comments on commit 9829b9e

Please sign in to comment.