From 4de7153c48eb46be721a58c7b0cdc9309042dd65 Mon Sep 17 00:00:00 2001 From: lakshmi2506 Date: Fri, 12 Jan 2024 15:45:53 +0530 Subject: [PATCH 1/3] load-data with new format --- cumulusci/tasks/bulkdata/load.py | 69 ++++++------- .../tasks/bulkdata/query_transformers.py | 5 +- cumulusci/tasks/bulkdata/utils.py | 71 ++++++++++++-- datasets/mapping.yml | 23 +++-- datasets/sample.sql | 97 +++++++++++-------- 5 files changed, 166 insertions(+), 99 deletions(-) diff --git a/cumulusci/tasks/bulkdata/load.py b/cumulusci/tasks/bulkdata/load.py index 9fa66c2a4f..cddf93bc91 100644 --- a/cumulusci/tasks/bulkdata/load.py +++ b/cumulusci/tasks/bulkdata/load.py @@ -124,6 +124,7 @@ def _init_options(self, kwargs): self.options["enable_rollback"] = process_bool_arg( self.options.get("enable_rollback", False) ) + self._id_generators = {} def _init_dataset(self): """Find the dataset paths to use with the following sequence: @@ -205,10 +206,11 @@ def _run_task(self): "No data will be loaded because this is a persistent org and no dataset was specified." ) return + self.ID_TABLE_NAME = "cumulusci_id_table" self._init_mapping() with self._init_db(): self._expand_mapping() - + self._initialize_id_table(self.reset_oids) start_step = self.options.get("start_step") started = False results = {} @@ -359,7 +361,8 @@ def check_simple_upsert(self, mapping): def _stream_queried_data(self, mapping, local_ids, query): """Get data from the local db""" - statics = self._get_statics(mapping) + # statics = self._get_statics(mapping) + staticizer = self._add_statics_to_row(mapping) total_rows = 0 if mapping.anchor_date: @@ -372,13 +375,13 @@ def _stream_queried_data(self, mapping, local_ids, query): batch_size = mapping.batch_size or DEFAULT_BULK_BATCH_SIZE for row in query.yield_per(batch_size): total_rows += 1 - # Add static values to row - pkey = row[0] - row = list(row[1:]) + statics + if mapping.anchor_date and (date_context[0] or date_context[1]): row = adjust_relative_dates( mapping, date_context, row, DataOperationType.INSERT ) + pkey = row[0] # FIXME: This is a local-DB ordering assumption. + row = staticizer(list(row[1:])) if mapping.action is DataOperationType.UPDATE: if len(row) > 1 and all([f is None for f in row[1:]]): # Skip update rows that contain no values @@ -389,7 +392,7 @@ def _stream_queried_data(self, mapping, local_ids, query): yield row self.logger.info( - f"Prepared {total_rows} rows for {mapping['action']} to {mapping['sf_object']}." + f"Prepared {total_rows} rows for {mapping.action.value} to {mapping.sf_object}." ) def _load_record_types(self, sobjects, conn): @@ -400,10 +403,9 @@ def _load_record_types(self, sobjects, conn): sobject, table_name, conn, self.org_config.is_person_accounts_enabled ) - def _get_statics(self, mapping): - """Return the static values (not column names) to be appended to - records for this mapping.""" + def _add_statics_to_row(self, mapping): statics = list(mapping.static.values()) + if mapping.record_type: query = ( f"SELECT Id FROM RecordType WHERE SObjectType='{mapping.sf_object}'" @@ -416,7 +418,10 @@ def _get_statics(self, mapping): raise BulkDataException(f"Cannot find RecordType with query `{query}`") statics.append(record_type_id) - return statics + def add_statics(row): + return row + statics + + return add_statics def _query_db(self, mapping): """Build a query to retrieve data from the local db. @@ -479,10 +484,8 @@ def _process_job_results(self, mapping, step, local_ids): DataOperationType.UPSERT, DataOperationType.ETL_UPSERT, ) - if is_insert_or_upsert: - id_table_name = self._initialize_id_table(mapping, self.reset_oids) - conn = self.session.connection() + conn = self.session.connection() sf_id_results = self._generate_results_id_map(step, local_ids) # If we know we have no successful inserts, don't attempt to persist Ids. @@ -490,9 +493,10 @@ def _process_job_results(self, mapping, step, local_ids): if is_insert_or_upsert and ( step.job_result.records_processed - step.job_result.total_row_errors ): + table = self.metadata.tables[self.ID_TABLE_NAME] sql_bulk_insert_from_records( connection=conn, - table=self.metadata.tables[id_table_name], + table=table, columns=("id", "sf_id"), record_iterable=sf_id_results, ) @@ -510,7 +514,7 @@ def _process_job_results(self, mapping, step, local_ids): if account_id_lookup: sql_bulk_insert_from_records( connection=conn, - table=self.metadata.tables[id_table_name], + table=self.metadata.tables[self.ID_TABLE_NAME], columns=("id", "sf_id"), record_iterable=self._generate_contact_id_map_for_person_accounts( mapping, account_id_lookup, conn @@ -554,7 +558,7 @@ def _generate_results_id_map(self, step, local_ids): CreateRollback.prepare_for_rollback(self, step, created_results) return sf_id_results - def _initialize_id_table(self, mapping, should_reset_table): + def _initialize_id_table(self, should_reset_table): """initalize or find table to hold the inserted SF Ids The table has a name like xxx_sf_ids and has just two columns, id and sf_id. @@ -562,29 +566,20 @@ def _initialize_id_table(self, mapping, should_reset_table): If the table already exists, should_reset_table determines whether to drop and recreate it or not. """ - id_table_name = f"{mapping['table']}_sf_ids" - already_exists = id_table_name in self.metadata.tables + already_exists = self.ID_TABLE_NAME in self.metadata.tables if already_exists and not should_reset_table: - return id_table_name - - if not hasattr(self, "_initialized_id_tables"): - self._initialized_id_tables = set() - if id_table_name not in self._initialized_id_tables: - if already_exists: - self.metadata.remove(self.metadata.tables[id_table_name]) - id_table = Table( - id_table_name, - self.metadata, - Column("id", Unicode(255), primary_key=True), - Column("sf_id", Unicode(18)), - ) - if self.inspector.has_table(id_table_name): - id_table.drop() - id_table.create() - self._initialized_id_tables.add(id_table_name) - return id_table_name + return + id_table = Table( + self.ID_TABLE_NAME, + self.metadata, + Column("id", Unicode(255), primary_key=True), + Column("sf_id", Unicode(18)), + ) + if id_table.exists(): + id_table.drop() + id_table.create() def _sqlite_load(self): """Read a SQLite script and initialize the in-memory database.""" @@ -655,7 +650,7 @@ def _init_mapping(self): mapping=self.mapping, sf=self.sf, namespace=self.project_config.project__package__namespace, - data_operation=DataOperationType.INSERT, + data_operation=DataOperationType.QUERY, inject_namespaces=self.options["inject_namespaces"], drop_missing=self.options["drop_missing_schema"], ) diff --git a/cumulusci/tasks/bulkdata/query_transformers.py b/cumulusci/tasks/bulkdata/query_transformers.py index cbc50e389a..cb80b182fa 100644 --- a/cumulusci/tasks/bulkdata/query_transformers.py +++ b/cumulusci/tasks/bulkdata/query_transformers.py @@ -7,6 +7,7 @@ from cumulusci.core.exceptions import BulkDataException Criterion = T.Any +ID_TABLE_NAME = "cumulusci_id_table" class LoadQueryExtender: @@ -59,9 +60,7 @@ def __init__(self, mapping, metadata, model) -> None: @cached_property def columns_to_add(self): for lookup in self.lookups: - lookup.aliased_table = aliased( - self.metadata.tables[f"{lookup.table}_sf_ids"] - ) + lookup.aliased_table = aliased(self.metadata.tables[ID_TABLE_NAME]) return [lookup.aliased_table.columns.sf_id for lookup in self.lookups] @cached_property diff --git a/cumulusci/tasks/bulkdata/utils.py b/cumulusci/tasks/bulkdata/utils.py index ea09ba49df..1d51ddc6cf 100644 --- a/cumulusci/tasks/bulkdata/utils.py +++ b/cumulusci/tasks/bulkdata/utils.py @@ -13,6 +13,8 @@ from cumulusci.core.exceptions import BulkDataException from cumulusci.utils.iterators import iterate_in_chunks +ID_TABLE_NAME = "cumulusci_id_table" + class SqlAlchemyMixin: logger: logging.Logger @@ -73,16 +75,60 @@ def _database_url(self): else: return self._temp_database_url() + def _id_generator_for_object(self, sobject: str): + if sobject not in self._id_generators: + + def _generate_ids(): + counter = 0 + while True: + yield f"{sobject}-{counter}" + counter += 1 + + self._id_generators[sobject] = _generate_ids() + + return self._id_generators[sobject] + + def _update_column( + self, *, source_model, target_model, key_field, join_field, target_field + ): + key_attr = getattr(source_model, key_field) + join_attr = getattr(target_model, join_field) + target_attr = getattr(target_model, target_field) + + id_column = inspect(source_model).primary_key[0].name + + try: + self.session.query(source_model).filter( + key_attr.isnot(None), key_attr == join_attr + ).update({key_attr: target_attr}, synchronize_session=False) + except NotImplementedError: + # Some databases, such as SQLite, don't support multitable update + # TODO: review memory consumption of this routine. + mappings = [] + for row, lookup_id in self.session.query(source_model, target_attr).join( + target_model, key_attr == join_attr + ): + mappings.append( + {id_column: getattr(row, id_column), key_field: lookup_id} + ) + self.session.bulk_update_mappings(source_model, mappings) + + def _update_sf_id_column(self, model, key_field): + self._update_column( + source_model=model, + target_model=self.models[self.ID_TABLE_NAME], + key_field=key_field, + join_field="sf_id", + target_field="id", + ) -def _handle_primary_key(mapping, fields): - """Provide support for legacy mappings which used the OID as the pk but - default to using an autoincrementing int pk and a separate sf_id column""" + def _is_autopk_database(self): + # If the type of the Id column on a mapping is INTEGER, + # this is an autopk database. - if mapping.get_oid_as_pk(): - id_column = mapping.fields["Id"] - fields.append(Column(id_column, Unicode(255), primary_key=True)) - else: - fields.append(Column("id", Integer(), primary_key=True, autoincrement=True)) + mapping = self.mapping.values()[0] + id_field = mapping.fields["Id"] + return isinstance(getattr(self.models[mapping.table], id_field).type, Integer) def create_table(mapping, metadata) -> Table: @@ -92,10 +138,15 @@ def create_table(mapping, metadata) -> Table: Mapping should be a MappingStep instance""" fields = [] - _handle_primary_key(mapping, fields) + # _handle_primary_key(mapping, fields) + id_column = mapping.fields["Id"] # Guaranteed to be present by mapping parser. + fields.append(Column(id_column, Unicode(255), primary_key=True)) # make a field list to create - for field, db in mapping.get_complete_field_map().items(): + # for field, db in mapping.get_complete_field_map().items(): + for field, db in zip( + mapping.get_extract_field_list(), mapping.get_database_column_list() + ).items(): if field == "Id": continue diff --git a/datasets/mapping.yml b/datasets/mapping.yml index ae7952b22c..878c7fa9f7 100644 --- a/datasets/mapping.yml +++ b/datasets/mapping.yml @@ -35,12 +35,23 @@ Contact: lookups: AccountId: table: Account -Opportunity: - sf_object: Opportunity +Lead: + sf_object: Lead + api: rest + batch_size: 2 + fields: + - LastName + - Company +Event: + sf_object: Event api: bulk batch_size: 2 fields: - - Name - - CloseDate - - Amount - - StageName + - Subject + - DurationInMinutes + - ActivityDateTime + lookups: + WhoId: + table: + - Contact + - Lead diff --git a/datasets/sample.sql b/datasets/sample.sql index dc9bc8fdd6..921b10aadb 100644 --- a/datasets/sample.sql +++ b/datasets/sample.sql @@ -1,53 +1,64 @@ BEGIN TRANSACTION; CREATE TABLE "Account" ( - id INTEGER NOT NULL, - "Name" VARCHAR(255), - "Description" VARCHAR(255), - "NumberOfEmployees" VARCHAR(255), - "BillingStreet" VARCHAR(255), - "BillingCity" VARCHAR(255), - "BillingState" VARCHAR(255), - "BillingPostalCode" VARCHAR(255), - "BillingCountry" VARCHAR(255), - "ShippingStreet" VARCHAR(255), - "ShippingCity" VARCHAR(255), - "ShippingState" VARCHAR(255), - "ShippingPostalCode" VARCHAR(255), - "ShippingCountry" VARCHAR(255), - "Phone" VARCHAR(255), - "Fax" VARCHAR(255), - "Website" VARCHAR(255), - "AccountNumber" VARCHAR(255), + "id" VARCHAR(255) NOT NULL, + "Name" VARCHAR(255), + "Description" VARCHAR(255), + "NumberOfEmployees" VARCHAR(255), + "BillingStreet" VARCHAR(255), + "BillingCity" VARCHAR(255), + "BillingState" VARCHAR(255), + "BillingPostalCode" VARCHAR(255), + "BillingCountry" VARCHAR(255), + "ShippingStreet" VARCHAR(255), + "ShippingCity" VARCHAR(255), + "ShippingState" VARCHAR(255), + "ShippingPostalCode" VARCHAR(255), + "ShippingCountry" VARCHAR(255), + "Phone" VARCHAR(255), + "Fax" VARCHAR(255), + "Website" VARCHAR(255), + "AccountNumber" VARCHAR(255), PRIMARY KEY (id) ); -INSERT INTO "Account" VALUES(1,'Sample Account for Entitlements','','','','','','','','','','','','','','','',''); -INSERT INTO "Account" VALUES(2,'The Bluth Company','Solid as a rock','6','','','','','','','','','','','','','',''); -INSERT INTO "Account" VALUES(3,'Camacho PLC','Total logistical task-force','59908','2852 Caleb Village Suite 428','Porterside','Maryland','14525','Canada','6070 Davidson Rapids','Gibsonland','North Dakota','62676','Lithuania','221.285.1033','+1-081-230-6073x31438','http://jenkins.info/category/tag/tag/terms/','2679965'); +INSERT INTO "Account" VALUES("Account-1",'alpha','','','Baker St.','','','','','','','','','','','','',''); +INSERT INTO "Account" VALUES("Account-2",'beta','','','Baker St.','','','','','','','','','','','','',''); +INSERT INTO "Account" VALUES("Account-3",'gamma','','','Baker St.','','','','','','','','','','','','',''); + CREATE TABLE "Contact" ( - id INTEGER NOT NULL, - "FirstName" VARCHAR(255), - "LastName" VARCHAR(255), - "Salutation" VARCHAR(255), - "Email" VARCHAR(255), - "Phone" VARCHAR(255), - "MobilePhone" VARCHAR(255), - "Title" VARCHAR(255), - "Birthdate" VARCHAR(255), - "AccountId" VARCHAR(255), + "id" VARCHAR(255) NOT NULL, + "FirstName" VARCHAR(255), + "LastName" VARCHAR(255), + "Salutation" VARCHAR(255), + "Email" VARCHAR(255), + "Phone" VARCHAR(255), + "MobilePhone" VARCHAR(255), + "Title" VARCHAR(255), + "Birthdate" VARCHAR(255), + "AccountId" VARCHAR(255), PRIMARY KEY (id) ); -INSERT INTO "Contact" VALUES(1,'Michael','Bluth','','','','','','','2'); -INSERT INTO "Contact" VALUES(2,'Jared','Burnett','Ms.','ja-burnett2011@example.net','372.865.5762x5990','033.134.7156x7943','Systems analyst','2000-04-18','3'); -CREATE TABLE "Opportunity" ( - id INTEGER NOT NULL, - "Name" VARCHAR(255), - "CloseDate" VARCHAR(255), - "Amount" VARCHAR(255), - "StageName" VARCHAR(255), +INSERT INTO "Contact" VALUES("Contact-1",'alphass','Mannwereerevhefwingefew','','krithvtffder@example.com','','','','','Account-1'); +INSERT INTO "Contact" VALUES("Contact-2",'betasss','Blackefefererf','','kathjvhryn85@exaerfemple.com','','','','','Account-2'); +INSERT INTO "Contact" VALUES("Contact-3",'deltasss','Hunteererbhjbefrewererfef','','dfdfvgh@example.com','','','','','Account-3'); +INSERT INTO "Contact" VALUES("Contact-4",'gammasss','Carererfbhjhjbrlsonere','','johnmjbbhontddfgfdcsdcsces@example.com','','','','',''); +CREATE TABLE "Event" ( + "id" VARCHAR(255) NOT NULL, + "Subject" VARCHAR(255), + "DurationInMinutes" VARCHAR(255), + "ActivityDateTime" VARCHAR(255), + "WhoId" VARCHAR(255), PRIMARY KEY (id) ); -INSERT INTO "Opportunity" VALUES(1,'democratic Opportunity','2022-07-27','69.0','In Progress'); -INSERT INTO "Opportunity" VALUES(2,'your Opportunity','2022-10-09','76.0','Closed Won'); -INSERT INTO "Opportunity" VALUES(3,'heart Opportunity','2022-11-04','32.0','Closed Won'); -INSERT INTO "Opportunity" VALUES(4,'treat Opportunity','2022-12-12','137.0','Closed Won'); +INSERT INTO "Event" VALUES("Event-1",'helllo','60','2024-01-10T05:30:00.000Z','Contact-1'); +INSERT INTO "Event" VALUES("Event-2",'newer','60','2024-01-10T05:30:00.000Z','Lead-1'); + +CREATE TABLE "Lead" ( + "id" VARCHAR(255) NOT NULL, + "LastName" VARCHAR(255), + "Company" VARCHAR(255), + PRIMARY KEY (id) +); +INSERT INTO "Lead" VALUES("Lead-1",'deltassssds','Farmers Coop. of Florida'); +INSERT INTO "Lead" VALUES("Lead-2",'gauramm','Abbott Insurance'); + COMMIT; From 18947ae788acc897244816e982be92133ea5410b Mon Sep 17 00:00:00 2001 From: lakshmi2506 Date: Fri, 12 Jan 2024 17:19:36 +0530 Subject: [PATCH 2/3] backward compatability enabled --- cumulusci/tasks/bulkdata/load.py | 11 ++++++++++- cumulusci/tasks/bulkdata/query_transformers.py | 18 +++++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/cumulusci/tasks/bulkdata/load.py b/cumulusci/tasks/bulkdata/load.py index cddf93bc91..0eb905ca95 100644 --- a/cumulusci/tasks/bulkdata/load.py +++ b/cumulusci/tasks/bulkdata/load.py @@ -125,6 +125,7 @@ def _init_options(self, kwargs): self.options.get("enable_rollback", False) ) self._id_generators = {} + self._old_format = False def _init_dataset(self): """Find the dataset paths to use with the following sequence: @@ -444,12 +445,14 @@ def _query_db(self, mapping): query = self.session.query(*columns) classes = [ - AddLookupsToQuery, AddRecordTypesToQuery, AddMappingFiltersToQuery, AddUpsertsToQuery, ] transformers = [cls(mapping, self.metadata, model) for cls in classes] + transformers.append( + AddLookupsToQuery(mapping, self.metadata, model, self._old_format) + ) if mapping.sf_object == "Contact" and self._can_load_person_accounts(mapping): transformers.append(AddPersonAccountsToQuery(mapping, self.metadata, model)) @@ -488,6 +491,12 @@ def _process_job_results(self, mapping, step, local_ids): conn = self.session.connection() sf_id_results = self._generate_results_id_map(step, local_ids) + for i in range(len(sf_id_results)): + if str.isdigit(sf_id_results[i][0][0]): + self._old_format = True + sf_id_results[i][0] = mapping.table + "-" + str(sf_id_results[i][0]) + else: + break # If we know we have no successful inserts, don't attempt to persist Ids. # Do, however, drain the generator to get error-checking behavior. if is_insert_or_upsert and ( diff --git a/cumulusci/tasks/bulkdata/query_transformers.py b/cumulusci/tasks/bulkdata/query_transformers.py index cb80b182fa..84c1e66207 100644 --- a/cumulusci/tasks/bulkdata/query_transformers.py +++ b/cumulusci/tasks/bulkdata/query_transformers.py @@ -51,8 +51,9 @@ def add_outerjoins(self, query: Query): class AddLookupsToQuery(LoadQueryExtender): """Adds columns and joins relatinng to lookups""" - def __init__(self, mapping, metadata, model) -> None: + def __init__(self, mapping, metadata, model, _old_format) -> None: super().__init__(mapping, metadata, model) + self._old_format = _old_format self.lookups = [ lookup for lookup in self.mapping.lookups.values() if not lookup.after ] @@ -70,10 +71,17 @@ def outerjoins_to_add(self): def join_for_lookup(lookup): key_field = lookup.get_lookup_key_field(self.model) value_column = getattr(self.model, key_field) - return ( - lookup.aliased_table, - lookup.aliased_table.columns.id == value_column, - ) + if self._old_format: + return ( + lookup.aliased_table, + lookup.aliased_table.columns.id + == str(lookup.table) + "-" + value_column, + ) + else: + return ( + lookup.aliased_table, + lookup.aliased_table.columns.id == value_column, + ) return [join_for_lookup(lookup) for lookup in self.lookups] From 773501fda7ecf47ca2fb19ebfa6a34f541851946 Mon Sep 17 00:00:00 2001 From: lakshmi2506 Date: Tue, 16 Jan 2024 12:18:24 +0530 Subject: [PATCH 3/3] old_format condition changed --- cumulusci/tasks/bulkdata/load.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cumulusci/tasks/bulkdata/load.py b/cumulusci/tasks/bulkdata/load.py index 0eb905ca95..62e8188d09 100644 --- a/cumulusci/tasks/bulkdata/load.py +++ b/cumulusci/tasks/bulkdata/load.py @@ -492,7 +492,7 @@ def _process_job_results(self, mapping, step, local_ids): sf_id_results = self._generate_results_id_map(step, local_ids) for i in range(len(sf_id_results)): - if str.isdigit(sf_id_results[i][0][0]): + if str(sf_id_results[i][0]).isnumeric(): self._old_format = True sf_id_results[i][0] = mapping.table + "-" + str(sf_id_results[i][0]) else: