Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error during upsert on identical DeveloperName for RecordType of same sObject #3702

Merged
merged 10 commits into from
Dec 28, 2023
5 changes: 4 additions & 1 deletion cumulusci/tasks/bulkdata/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ def strip_name_field(record):

if "RecordTypeId" in mapping.fields:
self._extract_record_types(
mapping.sf_object, mapping.get_source_record_type_table(), conn
mapping.sf_object,
mapping.get_source_record_type_table(),
conn,
self.org_config.is_person_accounts_enabled,
)

self.session.commit()
Expand Down
4 changes: 3 additions & 1 deletion cumulusci/tasks/bulkdata/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ def _load_record_types(self, sobjects, conn):
"""Persist record types for the given sObjects into the database."""
for sobject in sobjects:
table_name = sobject + "_rt_target_mapping"
self._extract_record_types(sobject, table_name, conn)
self._extract_record_types(
sobject, table_name, conn, self.org_config.is_person_accounts_enabled
)

def _get_statics(self, mapping):
"""Return the static values (not column names) to be appended to
Expand Down
11 changes: 8 additions & 3 deletions cumulusci/tasks/bulkdata/query_transformers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import typing as T
from functools import cached_property

from sqlalchemy import func, text
from sqlalchemy import and_, func, text
from sqlalchemy.orm import Query, aliased

from cumulusci.core.exceptions import BulkDataException
Expand Down Expand Up @@ -134,10 +134,15 @@ def outerjoins_to_add(self):
rt_source_table.columns.record_type_id
== getattr(self.model, self.mapping.fields["RecordTypeId"]),
),
# Combination of IsPersonType and DeveloperName is unique
(
rt_dest_table,
rt_dest_table.columns.developer_name
== rt_source_table.columns.developer_name,
and_(
rt_dest_table.columns.developer_name
== rt_source_table.columns.developer_name,
rt_dest_table.columns.is_person_type
== rt_source_table.columns.is_person_type,
),
),
]

Expand Down
7 changes: 4 additions & 3 deletions cumulusci/tasks/bulkdata/tests/recordtypes.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
BEGIN TRANSACTION;
CREATE TABLE "Account_rt_mapping" (
record_type_id VARCHAR(18) NOT NULL,
developer_name VARCHAR(255),
developer_name VARCHAR(255),
is_person_type BOOLEAN,
PRIMARY KEY (record_type_id)
);
INSERT INTO "Account_rt_mapping" VALUES('012P0000000bCMdIAM','Organization');
INSERT INTO "Account_rt_mapping" VALUES('012P0000000bCQqIAM','Subsidiary');
INSERT INTO "Account_rt_mapping" VALUES('012P0000000bCMdIAM','Organization',0);
INSERT INTO "Account_rt_mapping" VALUES('012P0000000bCQqIAM','Subsidiary',0);
CREATE TABLE accounts (
sf_id VARCHAR(255) NOT NULL,
"Name" VARCHAR(255),
Expand Down
7 changes: 4 additions & 3 deletions cumulusci/tasks/bulkdata/tests/recordtypes_2.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
BEGIN TRANSACTION;
CREATE TABLE Beta_rt_mapping (
record_type_id VARCHAR(18) NOT NULL,
developer_name VARCHAR(255),
developer_name VARCHAR(255),
is_person_type BOOLEAN,
PRIMARY KEY (record_type_id)
);
INSERT INTO "Beta_rt_mapping" VALUES('012H40000003jCoIAI','recordtype2');
INSERT INTO "Beta_rt_mapping" VALUES('012H40000003jCZIAY','recordtype1');
INSERT INTO "Beta_rt_mapping" VALUES('012H40000003jCoIAI','recordtype2',0);
INSERT INTO "Beta_rt_mapping" VALUES('012H40000003jCZIAY','recordtype1',0);
CREATE TABLE Beta (
id INTEGER NOT NULL,
"Name" VARCHAR(255),
Expand Down
1 change: 1 addition & 0 deletions cumulusci/tasks/bulkdata/tests/test_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ def test_import_results__record_type_mapping(self):
"Account",
mapping.get_source_record_type_table(),
task.session.connection.return_value,
task.org_config._is_person_accounts_enabled,
)

def test_import_results__person_account_name_stripped(self):
Expand Down
7 changes: 5 additions & 2 deletions cumulusci/tasks/bulkdata/tests/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,7 @@ def test_query_db__record_type_mapping(self):
FROM accounts
LEFT OUTER JOIN "Account_rt_mapping" ON "Account_rt_mapping".record_type_id = accounts."RecordTypeId"
LEFT OUTER JOIN "Account_rt_target_mapping" ON "Account_rt_target_mapping".developer_name = "Account_rt_mapping".developer_name
AND "account_rt_target_mapping".is_person_type = "account_rt_mapping".is_person_type
""",
)

Expand All @@ -1675,6 +1676,7 @@ def test_query_db__record_type_mapping_table_from_tablename(self):
FROM "Beta"
LEFT OUTER JOIN "Beta_rt_mapping" ON "Beta_rt_mapping".record_type_id = "Beta"."RecordType"
LEFT OUTER JOIN "Account_rt_target_mapping" ON "Account_rt_target_mapping".developer_name = "Beta_rt_mapping".developer_name
AND "Account_rt_target_mapping".is_person_type = "Beta_rt_mapping".is_person_type
""",
)

Expand Down Expand Up @@ -1716,11 +1718,12 @@ def test_load_record_types(self):

conn = mock.Mock()
task._extract_record_types = mock.Mock()
task.org_config._is_person_accounts_enabled = True
task._load_record_types(["Account", "Contact"], conn)
task._extract_record_types.assert_has_calls(
[
mock.call("Account", "Account_rt_target_mapping", conn),
mock.call("Contact", "Contact_rt_target_mapping", conn),
mock.call("Account", "Account_rt_target_mapping", conn, True),
mock.call("Contact", "Contact_rt_target_mapping", conn, True),
]
)

Expand Down
106 changes: 106 additions & 0 deletions cumulusci/tasks/bulkdata/tests/test_upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
import responses
import yaml

from cumulusci.core.exceptions import BulkDataException
from cumulusci.tasks.bulkdata import LoadData
Expand Down Expand Up @@ -738,6 +739,111 @@ def test_simple_upsert_smart__native_field(
}
}

@responses.activate
def test_upsert_recordtype_same_developername_different_ispersontype(
self, create_task, cumulusci_test_repo_root, org_config, sf
):
domain = org_config.get_domain()
ver = CURRENT_SF_API_VERSION
expected_number_of_records = 3
responses.add(
method="GET",
url=f"https://{domain}/services/data/v{ver}/query/?q=SELECT+Id%2C+DeveloperName%2C+IsPersonType+FROM+RecordType+WHERE+SObjectType%3D%27Account%27",
status=200,
json={
"totalSize": 4,
"done": True,
"records": [
{
"Id": "0125j000000RqVkAAK",
"DeveloperName": "HH_Account",
"IsPersonType": False,
},
{
"Id": "0125j000000RqVlAAK",
"DeveloperName": "Organization",
"IsPersonType": False,
},
{
"Id": "0125j000000bo4yAAA",
"DeveloperName": "PersonAccount",
"IsPersonType": True,
},
{
"Id": "0125j000000bo53AAA",
"DeveloperName": "PersonAccount",
"IsPersonType": False,
},
],
},
)
responses.add(
method="GET",
url=f"https://{domain}/services/data/v{ver}/limits/recordCount?sObjects=Account",
status=200,
json={"sObjects": [{"count": 3, "name": "Account"}]},
)
responses.add(
method="GET",
url=f"https://{domain}/services/data/v{ver}/query/?q=select+Id%2CAccountNumber+from+Account",
status=200,
json={
"totalSize": 3,
"done": True,
"records": [
{"Id": "0015j00001H0q4NAAR", "AccountNumber": "12345"},
{"Id": "0015j00001H0q4OAAR", "AccountNumber": "456789"},
{"Id": "0015j00001H0q7bAAB", "AccountNumber": "909098"},
],
},
)
with (
cumulusci_test_repo_root
/ "cumulusci/tests/cassettes/GET_sobjects_Account_PersonAccount_describe.yaml"
).open("r") as f:
body_accounts = yaml.safe_load(f)["response"]["body"]["string"]
responses.add(
method="GET",
url=f"https://{domain}/services/data/v{ver}/sobjects/Account/describe",
body=body_accounts,
status=200,
)
with (
cumulusci_test_repo_root
/ "cumulusci/tests/shared_cassettes/GET_sobjects_Global_describe.yaml"
).open("r") as f:
body_global = yaml.safe_load(f)["response"]["body"]["string"]
responses.add(
method="GET",
url=f"https://{domain}/services/data/v{ver}/sobjects",
body=body_global,
status=200,
)
task = create_task(
LoadData,
{
"sql_path": cumulusci_test_repo_root
/ "datasets/upsert/upsert_recordtypes.sql",
"mapping": cumulusci_test_repo_root
/ "datasets/upsert/upsert_mapping_recordtypes.yml",
"set_recently_viewed": False,
},
)
task._update_credentials = mock.Mock()
task.sf = sf
task.bulk = mock.Mock()
task._init_mapping()
with task._init_db():
task._expand_mapping()
mapping = task.mapping["Upsert Accounts"]
if "RecordTypeId" in mapping.fields:
conn = task.session.connection()
task._load_record_types([mapping.sf_object], conn)
task.session.commit()
_, query = task.configure_step(mapping)
# Assert no duplicate records are trying to be deployed
assert len(list(query)) == expected_number_of_records

@responses.activate
def test_simple_upsert_smart__non_native_field(
self, create_task, cumulusci_test_repo_root, org_config
Expand Down
20 changes: 14 additions & 6 deletions cumulusci/tasks/bulkdata/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ def test_extract_record_types(self):
util.sf = mock.Mock()
util.sf.query.return_value = {
"totalSize": 1,
"records": [{"Id": "012000000000000", "DeveloperName": "Organization"}],
"records": [
{
"Id": "012000000000000",
"DeveloperName": "Organization",
"IsPersonType": "0",
}
],
}
util.logger = mock.Mock()
util.metadata = mock.MagicMock()
Expand All @@ -60,17 +66,19 @@ def test_extract_record_types(self):
with mock.patch(
"cumulusci.tasks.bulkdata.utils.sql_bulk_insert_from_records"
) as sql_bulk_insert_from_records:
util._extract_record_types("Account", "test_table", conn)
util._extract_record_types("Account", "test_table", conn, True)

util.sf.query.assert_called_once_with(
"SELECT Id, DeveloperName FROM RecordType WHERE SObjectType='Account'"
"SELECT Id, DeveloperName, IsPersonType FROM RecordType WHERE SObjectType='Account'"
)
sql_bulk_insert_from_records.assert_called_once()
sql_bulk_insert_from_records.assert_called()
call = sql_bulk_insert_from_records.call_args_list[0][1]
assert call["connection"] == conn
assert call["table"] == util.metadata.tables["test_table"]
assert call["columns"] == ["record_type_id", "developer_name"]
assert list(call["record_iterable"]) == [["012000000000000", "Organization"]]
assert call["columns"] == ["record_type_id", "developer_name", "is_person_type"]
assert list(call["record_iterable"]) == [
["012000000000000", "Organization", "0"]
]

def test_sql_bulk_insert_from_records__sqlite(self):
engine, metadata = create_db_memory()
Expand Down
19 changes: 12 additions & 7 deletions cumulusci/tasks/bulkdata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path

from simple_salesforce import Salesforce
from sqlalchemy import Column, Integer, MetaData, Table, Unicode, inspect
from sqlalchemy import Boolean, Column, Integer, MetaData, Table, Unicode, inspect
from sqlalchemy.engine.base import Connection
from sqlalchemy.orm import Session, mapper

Expand All @@ -29,26 +29,31 @@ def _create_record_type_table(self, table_name):
rt_map_fields = [
Column("record_type_id", Unicode(18), primary_key=True),
Column("developer_name", Unicode(255)),
Column("is_person_type", Boolean),
]
rt_map_table = Table(table_name, self.metadata, *rt_map_fields)
mapper(self.models[table_name], rt_map_table)

def _extract_record_types(self, sobject, tablename: str, conn):
def _extract_record_types(
self, sobject, tablename: str, conn, is_person_accounts_enabled: bool
):
"""Query for Record Type information and persist it in the database."""
self.logger.info(f"Extracting Record Types for {sobject}")
query = (
f"SELECT Id, DeveloperName FROM RecordType WHERE SObjectType='{sobject}'"
)
if is_person_accounts_enabled:
query = f"SELECT Id, DeveloperName, IsPersonType FROM RecordType WHERE SObjectType='{sobject}'"
else:
query = f"SELECT Id, DeveloperName FROM RecordType WHERE SObjectType='{sobject}'"

result = self.sf.query(query)

if result["totalSize"]:
sql_bulk_insert_from_records(
connection=conn,
table=self.metadata.tables[tablename],
columns=["record_type_id", "developer_name"],
columns=["record_type_id", "developer_name", "is_person_type"],
record_iterable=(
[rt["Id"], rt["DeveloperName"]] for rt in result["records"]
[rt["Id"], rt["DeveloperName"], rt.get("IsPersonType", False)]
for rt in result["records"]
),
)

Expand Down

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions datasets/upsert/upsert_mapping_recordtypes.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Upsert Accounts:
api: rest
sf_object: Account
action: etl_upsert
update_key: AccountNumber
fields:
- AccountNumber
- RecordTypeId
23 changes: 23 additions & 0 deletions datasets/upsert/upsert_recordtypes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
BEGIN TRANSACTION;
CREATE TABLE "Account" (
id INTEGER NOT NULL,
"Name" VARCHAR(255),
"AccountNumber" VARCHAR(255),
"RecordTypeId" VARCHAR(255),
"IsPersonAccount" VARCHAR(255),
PRIMARY KEY (id)
);
INSERT INTO "Account" VALUES(1,'Sitwell-Bluth', "12345", '0125j000000bo4yAAA','true');
INSERT INTO "Account" VALUES(2,'John-Doe', "456789", '0125j000000bo53AAA','false');
INSERT INTO "Account" VALUES(3,'Jane-Doe', "422", '0125j000000bo53AAA','false');
CREATE TABLE "Account_rt_mapping" (
record_type_id VARCHAR(18) NOT NULL,
developer_name VARCHAR(255),
is_person_type BOOLEAN,
PRIMARY KEY (record_type_id)
);
INSERT INTO "Account_rt_mapping" VALUES('0125j000000RqVkAAK','HH_Account',0);
INSERT INTO "Account_rt_mapping" VALUES('0125j000000RqVlAAK','Organization',0);
INSERT INTO "Account_rt_mapping" VALUES('0125j000000bo4yAAA','PersonAccount',1);
INSERT INTO "Account_rt_mapping" VALUES('0125j000000bo53AAA','PersonAccount',0);
COMMIT;
Loading