Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
xmtigra authored Feb 9, 2024
2 parents 1786615 + 63e925e commit 058d9e8
Show file tree
Hide file tree
Showing 41 changed files with 1,429 additions and 255 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/chores.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ jobs:
' cumulusci/cumulusci.yml
- name: Commit changes
run: |
git config user.name github-actions
git config user.email github-actions@github.com
git config user.name github-actions[bot]
git config user.email 41898282+github-actions[bot]@users.noreply.github.com
git switch -c "update-sfdc-api-v$VERSION"
git add cumulusci/cumulusci.yml
git commit -m "Automated update to sfdc API version $VERSION"
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/pre-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ jobs:
sed -e 's_\(https.*\/\)\([0-9]*\)$_[#\2](\1\2)_' \
-e 's_by @\(.*\) in_by [@\1](https://github.com/\1) in_' >> changelog.md
python utility/update-history.py
- name: Lint history
run: |
npm install prettier
npx prettier --write docs/history.md
- name: Commit changes
run: |
git config user.name github-actions
git config user.email github-actions@github.com
git config user.name github-actions[bot]
git config user.email 41898282+github-actions[bot]@users.noreply.github.com
git switch -c "release-$(hatch version)"
git add docs/history.md cumulusci/__about__.py
git commit -m "Update changelog (automated)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,11 @@ interactions:
include_file: GET_sobjects_Global_describe.yaml
- &id007
include_file: GET_sobjects_Account_describe.yaml
- &id010
include_file: GET_sobjects_Account_describe.yaml
- *id006
- *id007
- *id010
- request:
method: POST
uri: https://orgname.my.salesforce.com/services/data/vxx.0/composite/sobjects
Expand Down
35 changes: 32 additions & 3 deletions cumulusci/core/tests/test_datasets_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,19 @@ def test_datasets_extract_standard_objects(
def test_datasets_read_explicit_extract_declaration(
self, sf, project_config, org_config, delete_data_from_org, ensure_accounts
):
object_counts = {"Account": 6, "Contact": 1, "Opportunity": 5}
object_counts = {
"Account": 6,
"Contact": 1,
"Opportunity": 5,
"Lead": 1,
"Event": 2,
}
obj_describes = (
describe_for("Account"),
describe_for("Contact"),
describe_for("Opportunity"),
describe_for("Lead"),
describe_for("Event"),
)
with patch.object(type(org_config), "is_person_accounts_enabled", False), patch(
"cumulusci.core.datasets.get_org_schema",
Expand Down Expand Up @@ -263,12 +271,16 @@ def write_yaml(filename: str, json: Any):
"Contact": {
"fields": ["FirstName", "LastName", "AccountId"]
},
"Event": {"fields": ["Subject", "WhoId"]},
}
},
)
loading_rules = write_yaml(
"loading_rules.load.yml",
[{"sf_object": "Account", "load_after": "Contact"}],
[
{"sf_object": "Account", "load_after": "Contact"},
{"sf_object": "Lead", "load_after": "Event"},
],
)

# Don't actually extract data.
Expand All @@ -286,17 +298,34 @@ def write_yaml(filename: str, json: Any):
"fields": ["FirstName", "LastName"],
"lookups": {
"AccountId": {
"table": "Account",
"table": ["Account"],
"key_field": "AccountId",
"after": "Insert Account",
}
},
},
"Insert Event": {
"sf_object": "Event",
"table": "Event",
"fields": ["Subject"],
"lookups": {
"WhoId": {
"table": ["Contact", "Lead"],
"key_field": "WhoId",
"after": "Insert Lead",
}
},
},
"Insert Account": {
"sf_object": "Account",
"table": "Account",
"fields": ["Name"],
},
"Insert Lead": {
"sf_object": "Lead",
"table": "Lead",
"fields": ["Company", "LastName"],
},
}
assert tuple(actual.items()) == tuple(expected.items()), actual.items()

Expand Down
117 changes: 90 additions & 27 deletions cumulusci/tasks/bulkdata/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
import re
from contextlib import contextmanager

from sqlalchemy import Column, Integer, MetaData, Table, Unicode, create_engine
from sqlalchemy import Column, MetaData, Table, Unicode, create_engine
from sqlalchemy.orm import create_session, mapper

from cumulusci.core.exceptions import BulkDataException, TaskOptionsError
from cumulusci.core.exceptions import (
BulkDataException,
ConfigError,
CumulusCIException,
TaskOptionsError,
)
from cumulusci.core.utils import process_bool_arg
from cumulusci.tasks.bulkdata.dates import adjust_relative_dates
from cumulusci.tasks.bulkdata.mapping_parser import (
Expand Down Expand Up @@ -70,6 +75,7 @@ def _init_options(self, kwargs):
self.options["drop_missing_schema"] = process_bool_arg(
self.options.get("drop_missing_schema") or False
)
self._id_generators = {}

def _run_task(self):
self._init_mapping()
Expand Down Expand Up @@ -127,7 +133,7 @@ def _init_mapping(self):
def _soql_for_mapping(self, mapping):
"""Return a SOQL query suitable for extracting data for this mapping."""
sf_object = mapping.sf_object
fields = mapping.get_complete_field_map(include_id=True).keys()
fields = mapping.get_extract_field_list()
soql = f"SELECT {', '.join(fields)} FROM {sf_object}"

if mapping.record_type:
Expand All @@ -146,7 +152,7 @@ def _run_query(self, soql, mapping):
step = get_query_operation(
sobject=mapping.sf_object,
api=mapping.api,
fields=list(mapping.get_complete_field_map(include_id=True).keys()),
fields=list(mapping.get_extract_field_list()),
api_options={},
context=self,
query=soql,
Expand Down Expand Up @@ -225,19 +231,25 @@ def strip_name_field(record):
# If using the autogenerated id field, split out the returned records
# into two separate streams and load into the main table and the sf_id_table
values, ids = itertools.tee(record_iterator)
f_values = (row[1:] for row in values)
f_ids = (row[:1] for row in ids)

# Generate naming format (<sobject>-<number>)
id_source_sobj, id_source_global = itertools.tee(
self._id_generator_for_object(mapping.sf_object)
)

f_ids = ((row[0], next(id_source_global)) for row in ids)
f_values = ([next(id_source_sobj)] + row[1:] for row in values)

values_chunks = sql_bulk_insert_from_records_incremental(
connection=conn,
table=self.metadata.tables[mapping.table],
columns=columns[1:], # Strip off the Id column
columns=["id"] + columns[1:],
record_iterable=f_values,
)
ids_chunks = sql_bulk_insert_from_records_incremental(
connection=conn,
table=self.metadata.tables[mapping.get_sf_id_table()],
columns=["sf_id"],
columns=["sf_id", "id"],
record_iterable=f_ids,
)

Expand All @@ -255,6 +267,21 @@ def strip_name_field(record):

self.session.commit()

def _id_generator_for_object(self, sobject: str):
"""Generates strings for local ids in format {sobject}-{counter}
(example: Account-2)"""
if sobject not in self._id_generators:

def _generate_ids():
counter = 1
while True:
yield f"{sobject}-{counter}"
counter += 1

self._id_generators[sobject] = _generate_ids()

return self._id_generators[sobject]

def _map_autopks(self):
# Convert Salesforce Ids to autopks
for m in self.mapping.values():
Expand All @@ -270,9 +297,23 @@ def _map_autopks(self):

def _get_mapping_for_table(self, table):
"""Return the first mapping for a table name"""
for mapping in self.mapping.values():
if mapping["table"] == table:
return mapping

# Get all mappings for lookups
mappings = [
mapping
for mapping in self.mapping.values()
if (isinstance(table, str) and mapping["table"] == table)
or (isinstance(table, list) and mapping["table"] in table)
]

# For polymorphic lookups, raise exception if missing mappings
if isinstance(table, list) and len(mappings) != len(table):
missing_tables = set(table) - set(mapping["table"] for mapping in mappings)
raise CumulusCIException(
f"The following tables are missing in the mapping file: {missing_tables}"
)

return mappings

def _convert_lookups_to_id(self, mapping, lookup_keys):
"""Rewrite persisted Salesforce Ids to refer to auto-PKs."""
Expand All @@ -286,29 +327,51 @@ def throw(string): # pragma: no cover
)
model = self.models.get(mapping.table)

lookup_mapping = self._get_mapping_for_table(lookup_info.table) or throw(
lookup_mappings = self._get_mapping_for_table(lookup_info.table) or throw(
f"Cannot find lookup mapping for {lookup_info.table}"
)

lookup_model = self.models.get(lookup_mapping.get_sf_id_table())

key_field = lookup_info.get_lookup_key_field()

key_attr = getattr(model, key_field, None) or throw(
f"key_field {key_field} not found in table {mapping.table}"
)
try:
self.session.query(model).filter(
key_attr.isnot(None), key_attr == lookup_model.sf_id
).update({key_attr: lookup_model.id}, synchronize_session=False)
except NotImplementedError:
# Some databases such as sqlite don't support multitable update
mappings = []
for row, lookup_id in self.session.query(model, lookup_model.id).join(
lookup_model, key_attr == lookup_model.sf_id
):
mappings.append({"id": row.id, key_field: lookup_id})
self.session.bulk_update_mappings(model, mappings)

# Keep track of total mapping operations
total_mapping_operations = 0

for lookup_mapping in lookup_mappings:
lookup_model = self.models.get(lookup_mapping.get_sf_id_table())
try:
update_query = (
self.session.query(model)
.filter(key_attr.isnot(None), key_attr == lookup_model.sf_id)
.update({key_attr: lookup_model.id}, synchronize_session=False)
)
total_mapping_operations += update_query.rowcount
except NotImplementedError:
# Some databases such as sqlite don't support multitable update
mappings = []
for row, lookup_id in self.session.query(
model, lookup_model.id
).join(lookup_model, key_attr == lookup_model.sf_id):
mappings.append({"id": row.id, key_field: lookup_id})
total_mapping_operations += len(mappings)
self.session.bulk_update_mappings(model, mappings)
# Count the total number of rows excluding those with no entry for that field
total_rows = (
self.session.query(model)
.filter(
key_attr.isnot(None), # Ensure key_attr is not None
key_attr.isnot(""), # Ensure key_attr is not an empty string
)
.count()
)

if total_mapping_operations != total_rows:
raise ConfigError(
f"Total mapping operations ({total_mapping_operations}) do not match total non-empty rows ({total_rows}) for lookup_key: {lookup_key}. Mention all related tables for lookup: {lookup_key}"
)
self.session.commit()

def _create_tables(self):
Expand Down Expand Up @@ -339,7 +402,7 @@ def _create_table(self, mapping):
sf_id_model_name, (object,), {}
)
sf_id_fields = [
Column("id", Integer(), primary_key=True, autoincrement=True),
Column("id", Unicode(255), primary_key=True),
Column("sf_id", Unicode(24)),
]
id_t = Table(mapping.get_sf_id_table(), self.metadata, *sf_id_fields)
Expand Down
Loading

0 comments on commit 058d9e8

Please sign in to comment.