Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polymorphic Lookup Data Extraction #3741

Merged
merged 37 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e379f87
Successfully able to retrieve polymorphic lookups
aditya-balachander Jan 9, 2024
2c5d005
Updated id numbering
aditya-balachander Jan 9, 2024
eacb283
Changed id type from Unicode to Integer
aditya-balachander Jan 9, 2024
4a3d898
Fix existing tests
aditya-balachander Jan 10, 2024
3ec6308
Tests written for extract
aditya-balachander Jan 10, 2024
38a6540
Increased Coverage
aditya-balachander Jan 10, 2024
4de7153
load-data with new format
lakshmi2506 Jan 12, 2024
18947ae
backward compatability enabled
lakshmi2506 Jan 12, 2024
773501f
old_format condition changed
lakshmi2506 Jan 16, 2024
2fc9a52
Merge pull request #3731 from SFDO-Tooling/feature/load_data
aditya-balachander Jan 16, 2024
c563a58
Revert "Polymorphic Reference - Load (merging into another branch)"
aditya-balachander Jan 16, 2024
c3ce2d2
Merge pull request #3732 from SFDO-Tooling/revert-3731-feature/load_data
aditya-balachander Jan 16, 2024
28c2006
Added load changes
aditya-balachander Jan 16, 2024
a484821
person accounts backward compatability
lakshmi2506 Jan 16, 2024
17bf608
person accounts sf_id table corrected
lakshmi2506 Jan 16, 2024
5d3b7be
Updated load.py
aditya-balachander Jan 17, 2024
17efadd
Update mapping_parser.py
aditya-balachander Jan 17, 2024
b7b22e0
Added validation for mapping
aditya-balachander Jan 17, 2024
cbdebb5
test_cases
lakshmi2506 Jan 17, 2024
9e9a3ed
Fix test cases for extract and e2e
aditya-balachander Jan 17, 2024
007f73c
failed test cases corrected
lakshmi2506 Jan 17, 2024
4714dd8
Merge branch 'feature/polymorphic_reference' of https://github.com/SF…
lakshmi2506 Jan 17, 2024
8d386ef
tests
lakshmi2506 Jan 18, 2024
8708896
Fix tests
aditya-balachander Jan 19, 2024
c7edb21
Update mapping_parser.py
aditya-balachander Jan 19, 2024
a9bdae4
Update test_utils.py
aditya-balachander Jan 19, 2024
94974d3
Lint checks
aditya-balachander Jan 19, 2024
6a96cac
test_load.py covered
lakshmi2506 Jan 19, 2024
e5bd27f
Update test_load.py
lakshmi2506 Jan 19, 2024
5d1f1d9
Fixed Tests
aditya-balachander Jan 19, 2024
6e7b269
Handling for is_person_type column not present
aditya-balachander Jan 21, 2024
a71c22c
Generation of mapping file with polymorphic support
aditya-balachander Jan 31, 2024
59718b3
Update query_transformers.py
aditya-balachander Jan 31, 2024
3efb05a
Test for generation of mapping file
aditya-balachander Feb 1, 2024
d4f3e0b
Resolve pyright failure
aditya-balachander Feb 1, 2024
26a9484
Added comments
aditya-balachander Feb 2, 2024
7249bfd
Merge branch 'main' into feature/polymorphic_reference
aditya-balachander Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,11 @@ interactions:
include_file: GET_sobjects_Global_describe.yaml
- &id007
include_file: GET_sobjects_Account_describe.yaml
- &id010
include_file: GET_sobjects_Account_describe.yaml
- *id006
- *id007
- *id010
- request:
method: POST
uri: https://orgname.my.salesforce.com/services/data/vxx.0/composite/sobjects
Expand Down
35 changes: 32 additions & 3 deletions cumulusci/core/tests/test_datasets_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,19 @@ def test_datasets_extract_standard_objects(
def test_datasets_read_explicit_extract_declaration(
self, sf, project_config, org_config, delete_data_from_org, ensure_accounts
):
object_counts = {"Account": 6, "Contact": 1, "Opportunity": 5}
object_counts = {
"Account": 6,
"Contact": 1,
"Opportunity": 5,
"Lead": 1,
"Event": 2,
}
obj_describes = (
describe_for("Account"),
describe_for("Contact"),
describe_for("Opportunity"),
describe_for("Lead"),
describe_for("Event"),
)
with patch.object(type(org_config), "is_person_accounts_enabled", False), patch(
"cumulusci.core.datasets.get_org_schema",
Expand Down Expand Up @@ -263,12 +271,16 @@ def write_yaml(filename: str, json: Any):
"Contact": {
"fields": ["FirstName", "LastName", "AccountId"]
},
"Event": {"fields": ["Subject", "WhoId"]},
}
},
)
loading_rules = write_yaml(
"loading_rules.load.yml",
[{"sf_object": "Account", "load_after": "Contact"}],
[
{"sf_object": "Account", "load_after": "Contact"},
{"sf_object": "Lead", "load_after": "Event"},
],
)

# Don't actually extract data.
Expand All @@ -286,17 +298,34 @@ def write_yaml(filename: str, json: Any):
"fields": ["FirstName", "LastName"],
"lookups": {
"AccountId": {
"table": "Account",
"table": ["Account"],
"key_field": "AccountId",
"after": "Insert Account",
}
},
},
"Insert Event": {
"sf_object": "Event",
"table": "Event",
"fields": ["Subject"],
"lookups": {
"WhoId": {
"table": ["Contact", "Lead"],
"key_field": "WhoId",
"after": "Insert Lead",
}
},
},
"Insert Account": {
"sf_object": "Account",
"table": "Account",
"fields": ["Name"],
},
"Insert Lead": {
"sf_object": "Lead",
"table": "Lead",
"fields": ["Company", "LastName"],
},
}
assert tuple(actual.items()) == tuple(expected.items()), actual.items()

Expand Down
117 changes: 90 additions & 27 deletions cumulusci/tasks/bulkdata/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
import re
from contextlib import contextmanager

from sqlalchemy import Column, Integer, MetaData, Table, Unicode, create_engine
from sqlalchemy import Column, MetaData, Table, Unicode, create_engine
from sqlalchemy.orm import create_session, mapper

from cumulusci.core.exceptions import BulkDataException, TaskOptionsError
from cumulusci.core.exceptions import (
BulkDataException,
ConfigError,
CumulusCIException,
TaskOptionsError,
)
from cumulusci.core.utils import process_bool_arg
from cumulusci.tasks.bulkdata.dates import adjust_relative_dates
from cumulusci.tasks.bulkdata.mapping_parser import (
Expand Down Expand Up @@ -70,6 +75,7 @@ def _init_options(self, kwargs):
self.options["drop_missing_schema"] = process_bool_arg(
self.options.get("drop_missing_schema") or False
)
self._id_generators = {}

def _run_task(self):
self._init_mapping()
Expand Down Expand Up @@ -127,7 +133,7 @@ def _init_mapping(self):
def _soql_for_mapping(self, mapping):
"""Return a SOQL query suitable for extracting data for this mapping."""
sf_object = mapping.sf_object
fields = mapping.get_complete_field_map(include_id=True).keys()
fields = mapping.get_extract_field_list()
soql = f"SELECT {', '.join(fields)} FROM {sf_object}"

if mapping.record_type:
Expand All @@ -146,7 +152,7 @@ def _run_query(self, soql, mapping):
step = get_query_operation(
sobject=mapping.sf_object,
api=mapping.api,
fields=list(mapping.get_complete_field_map(include_id=True).keys()),
fields=list(mapping.get_extract_field_list()),
api_options={},
context=self,
query=soql,
Expand Down Expand Up @@ -225,19 +231,25 @@ def strip_name_field(record):
# If using the autogenerated id field, split out the returned records
# into two separate streams and load into the main table and the sf_id_table
values, ids = itertools.tee(record_iterator)
f_values = (row[1:] for row in values)
f_ids = (row[:1] for row in ids)

# Generate naming format (<sobject>-<number>)
id_source_sobj, id_source_global = itertools.tee(
self._id_generator_for_object(mapping.sf_object)
)

f_ids = ((row[0], next(id_source_global)) for row in ids)
f_values = ([next(id_source_sobj)] + row[1:] for row in values)

values_chunks = sql_bulk_insert_from_records_incremental(
connection=conn,
table=self.metadata.tables[mapping.table],
columns=columns[1:], # Strip off the Id column
columns=["id"] + columns[1:],
record_iterable=f_values,
)
ids_chunks = sql_bulk_insert_from_records_incremental(
connection=conn,
table=self.metadata.tables[mapping.get_sf_id_table()],
columns=["sf_id"],
columns=["sf_id", "id"],
record_iterable=f_ids,
)

Expand All @@ -255,6 +267,21 @@ def strip_name_field(record):

self.session.commit()

def _id_generator_for_object(self, sobject: str):
"""Generates strings for local ids in format {sobject}-{counter}
(example: Account-2)"""
if sobject not in self._id_generators:

def _generate_ids():
counter = 1
while True:
yield f"{sobject}-{counter}"
counter += 1

self._id_generators[sobject] = _generate_ids()

return self._id_generators[sobject]

def _map_autopks(self):
# Convert Salesforce Ids to autopks
for m in self.mapping.values():
Expand All @@ -270,9 +297,23 @@ def _map_autopks(self):

def _get_mapping_for_table(self, table):
"""Return the first mapping for a table name"""
for mapping in self.mapping.values():
if mapping["table"] == table:
return mapping

# Get all mappings for lookups
mappings = [
mapping
for mapping in self.mapping.values()
if (isinstance(table, str) and mapping["table"] == table)
or (isinstance(table, list) and mapping["table"] in table)
]

# For polymorphic lookups, raise exception if missing mappings
if isinstance(table, list) and len(mappings) != len(table):
missing_tables = set(table) - set(mapping["table"] for mapping in mappings)
raise CumulusCIException(
f"The following tables are missing in the mapping file: {missing_tables}"
)

return mappings

def _convert_lookups_to_id(self, mapping, lookup_keys):
"""Rewrite persisted Salesforce Ids to refer to auto-PKs."""
Expand All @@ -286,29 +327,51 @@ def throw(string): # pragma: no cover
)
model = self.models.get(mapping.table)

lookup_mapping = self._get_mapping_for_table(lookup_info.table) or throw(
lookup_mappings = self._get_mapping_for_table(lookup_info.table) or throw(
f"Cannot find lookup mapping for {lookup_info.table}"
)

lookup_model = self.models.get(lookup_mapping.get_sf_id_table())

key_field = lookup_info.get_lookup_key_field()

key_attr = getattr(model, key_field, None) or throw(
f"key_field {key_field} not found in table {mapping.table}"
)
try:
self.session.query(model).filter(
key_attr.isnot(None), key_attr == lookup_model.sf_id
).update({key_attr: lookup_model.id}, synchronize_session=False)
except NotImplementedError:
# Some databases such as sqlite don't support multitable update
mappings = []
for row, lookup_id in self.session.query(model, lookup_model.id).join(
lookup_model, key_attr == lookup_model.sf_id
):
mappings.append({"id": row.id, key_field: lookup_id})
self.session.bulk_update_mappings(model, mappings)

# Keep track of total mapping operations
total_mapping_operations = 0

for lookup_mapping in lookup_mappings:
lookup_model = self.models.get(lookup_mapping.get_sf_id_table())
try:
update_query = (
self.session.query(model)
.filter(key_attr.isnot(None), key_attr == lookup_model.sf_id)
.update({key_attr: lookup_model.id}, synchronize_session=False)
)
total_mapping_operations += update_query.rowcount
except NotImplementedError:
# Some databases such as sqlite don't support multitable update
mappings = []
for row, lookup_id in self.session.query(
model, lookup_model.id
).join(lookup_model, key_attr == lookup_model.sf_id):
mappings.append({"id": row.id, key_field: lookup_id})
total_mapping_operations += len(mappings)
self.session.bulk_update_mappings(model, mappings)
# Count the total number of rows excluding those with no entry for that field
total_rows = (
self.session.query(model)
.filter(
key_attr.isnot(None), # Ensure key_attr is not None
key_attr.isnot(""), # Ensure key_attr is not an empty string
)
.count()
)

if total_mapping_operations != total_rows:
raise ConfigError(
f"Total mapping operations ({total_mapping_operations}) do not match total non-empty rows ({total_rows}) for lookup_key: {lookup_key}. Mention all related tables for lookup: {lookup_key}"
)
self.session.commit()

def _create_tables(self):
Expand Down Expand Up @@ -339,7 +402,7 @@ def _create_table(self, mapping):
sf_id_model_name, (object,), {}
)
sf_id_fields = [
Column("id", Integer(), primary_key=True, autoincrement=True),
Column("id", Unicode(255), primary_key=True),
Column("sf_id", Unicode(24)),
]
id_t = Table(mapping.get_sf_id_table(), self.metadata, *sf_id_fields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class SObjDependency(T.NamedTuple):
table_name_from: str
table_name_to: str
table_names_to: T.Union[str, T.Tuple[str, ...]]
field_name: str
priority: bool = False

Expand Down Expand Up @@ -51,16 +51,25 @@ def _collect_dependencies_for_sobject(
if not field_info.createable: # pragma: no cover
continue
references = field_info.referenceTo
if len(references) == 1 and not references[0] == "RecordType":
target = references[0]

target_disallowed = target in NOT_EXTRACTABLE
field_disallowed = target_disallowed or not field_info.createable
if references:
# Remove RecordType from references
if "RecordType" in references:
references.remove("RecordType")
if not references:
continue

targets = tuple(
target for target in references if target not in NOT_EXTRACTABLE
)
field_disallowed = not targets or not field_info.createable
field_allowed = not (only_required_fields or field_disallowed)
if field_info.requiredOnCreate or field_allowed:
dependencies.setdefault(source_sfobject, []).append(
SObjDependency(
source_sfobject, target, field_name, field_info.requiredOnCreate
source_sfobject,
targets,
field_name,
field_info.requiredOnCreate,
)
)

Expand All @@ -80,28 +89,29 @@ def extend_declarations_to_include_referenced_tables(
assert isinstance(sf_object, str)
my_dependencies = dependencies.get(sf_object, ())
for dep in my_dependencies:
target_table = dep.table_name_to
sobj = schema.get(target_table)
target_extractable = (
target_table not in NOT_EXTRACTABLE and sobj and sobj.extractable
)
if target_table not in decls and target_extractable:
required_fields = [
field.name
for field in schema[target_table].fields.values()
if field.requiredOnCreate
]
decls[target_table] = synthesize_declaration_for_sobject(
target_table, required_fields, schema[target_table].fields
target_tables = dep.table_names_to
for target_table in target_tables:
sobj = schema.get(target_table)
target_extractable = (
target_table not in NOT_EXTRACTABLE and sobj and sobj.extractable
)
if target_table not in decls and target_extractable:
required_fields = [
field.name
for field in schema[target_table].fields.values()
if field.requiredOnCreate
]
decls[target_table] = synthesize_declaration_for_sobject(
target_table, required_fields, schema[target_table].fields
)

new_dependencies = _collect_dependencies_for_sobject(
target_table,
decls[target_table].fields,
schema,
only_required_fields=True,
)
dependencies.update(new_dependencies)
to_process.append(target_table)
new_dependencies = _collect_dependencies_for_sobject(
target_table,
decls[target_table].fields,
schema,
only_required_fields=True,
)
dependencies.update(new_dependencies)
to_process.append(target_table)

return list(decls.values())
Loading
Loading