Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object Mapping #50

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
aa767a5
Merge in object mapping changes
davidmreed Apr 19, 2020
4316b23
Merge branch 'master' into feature/object-mapping
davidmreed Apr 19, 2020
44a7819
Remove obsolete instance variables
davidmreed Apr 20, 2020
70d4332
Merge master
davidmreed Apr 26, 2020
1188711
Working extraction of mapped sObjects
davidmreed Apr 28, 2020
1409ec7
Much of Load-side object mapping complete
davidmreed Apr 29, 2020
5b7aa93
WIP: mapping miss handling
davidmreed May 2, 2020
ce84e92
Merge branch 'master' into feature/object-mapping
davidmreed May 5, 2020
9f97fae
Add mapping miss behaviors, rebuild transformer function
davidmreed May 8, 2020
40de811
Make loader create mapper functions
davidmreed May 8, 2020
4a04e71
Fix unit test failures and clean up exception message
davidmreed May 8, 2020
2893c84
Fix bug in extract operation loader
davidmreed May 8, 2020
fe561c2
Fix tests
davidmreed May 8, 2020
0ed2864
First working round-trip of object mapping
davidmreed May 9, 2020
64f2449
Remove explicit Record Type mapping, update test_data_csv with mappin…
davidmreed May 10, 2020
a747b99
Add documentation for mapping feature
davidmreed May 10, 2020
2c3f2f5
Add mapping to end-to-end testing with scratch org config
davidmreed May 11, 2020
cc72291
Refactor mapper cache to use ExtractOperation
davidmreed May 13, 2020
a05ea2f
Implement miss behavior and defaults
davidmreed Jun 7, 2020
8ea1d13
Fix precommit issues
davidmreed May 7, 2021
bc0d63f
Small fixes, make basic mapping behavior work
davidmreed May 8, 2021
c157cf8
Handle cache errors better
davidmreed May 8, 2021
381bd62
Update README
davidmreed May 11, 2021
6deab33
Merge remote-tracking branch 'origin/master' into feature/object-mapping
davidmreed May 11, 2021
855fc62
Merge branch 'main' into feature/object-mapping
davidmreed Oct 6, 2021
5adc681
WIP
davidmreed Oct 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,36 @@ Amaxa - a multi-object ETL tool for Salesforce
Introduction
------------

Amaxa is a new data loader and ETL (extract-transform-load) tool for Salesforce, designed to support the extraction and loading of complex networks of records in a single operation. For example, an Amaxa operation can extract a designated set of Accounts, their associated Contacts and Opportunities, their Opportunity Contact Roles, and associated Campaigns, and then load all of that data into another Salesforce org while preserving the connections between the records.
Amaxa is a data loader and ETL (extract-transform-load) tool for Salesforce, designed to support the extraction and loading of complex networks of records in a single operation. For example, an Amaxa operation can extract a designated set of Accounts, their associated Contacts and Opportunities, their Opportunity Contact Roles, and associated Campaigns, and then load all of that data into another Salesforce org while preserving the connections between the records.

Amaxa is designed to replace complex, error-prone workflows that manipulate data exports with spreadsheet functions like ``VLOOKUP()`` to maintain object relationships.

Amaxa is free and open source software, distributed under the BSD License. Amaxa is by `David Reed <https://ktema.org>`_, (c) 2019-2020.

Documentation for Amaxa is available on `ReadTheDocs <https://amaxa.readthedocs.io>`_. The project is developed on `GitHub <https://github.com/davidmreed/amaxa>`_.

Why Amaxa?
----------

Amaxa offers a number of features and design choices that differ from other data loaders and ETL tools.

- Amaxa supports loading whole object networks in a single operation, not one object at a time.
- Amaxa specializes in messy ETL operations, like selecting a coherent subset of complex data in a production Salesforce org and migrating it to a sandbox whose configuration mostly, but not entirely, matches. Amaxa's reference tracing, object mapping, and transform features help tailor operations to highly specific needs.
- Amaxa deemphasizes the use of SOQL queries in favor of defining scope through object relationships.
- Amaxa supports all core Salesforce data model features supported by the Bulk API, including arbitrary-length text fields, polymorphic lookups, and arbitrarily complex models including hierarchies and reference cycles.
- Amaxa focuses heavily on correctness, validation, and error management. Amaxa loads are recoverable and resumable if errors occur, and Amaxa runs extensive checks to ensure the operation is valid before starting. Amaxa itself is heavily tested to ensure correctness.
- Amaxa uses easy-to-consume formats: CSV for all data storage and YAML for specifying data operations.
- Amaxa is free, open source, and written in Python.

Why Not Amaxa?
--------------

There are a few things Amaxa does not, or does not yet, support.

- Amaxa does not support Person Accounts.
- Amaxa does not support Content or binary files.
- Amaxa does not support using the REST API (all operations are done via Bulk).

What Does Amaxa Mean?
---------------------

Expand Down
44 changes: 26 additions & 18 deletions amaxa/amaxa.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ class OutsideLookupBehavior(StringEnum):
ERROR = "error"


class MappingMissBehavior(StringEnum):
ERROR = "error"
DROP = "drop"
DEFAULT = "default"


class LoadStage(StringEnum):
INSERTS = "inserts"
DEPENDENTS = "dependents"
Expand Down Expand Up @@ -126,7 +132,7 @@ def run(self):
self.initialize()
return self.execute()
except Exception as e:
self.logger.error("Unexpected exception {} occurred.".format(str(e)))
self.logger.exception("Unexpected exception {} occurred.".format(str(e)))
return -1
finally:
self.file_store.close()
Expand Down Expand Up @@ -243,6 +249,7 @@ def __init__(self, connection):
self.global_id_map = {}
self.success = True
self.stage = LoadStage.INSERTS
self.mapper_cache = None

def register_new_id(self, sobjectname, old_id, new_id):
self.global_id_map[old_id] = new_id
Expand All @@ -263,6 +270,15 @@ def execute(self):
self.logger.info(
"Starting load with sObjects %s", ", ".join(self.get_sobject_list())
)

if self.mapper_cache:
self.logger.info("Loading mapped sObjects into cache")
self.mapper_cache.initialize()
result = self.mapper_cache.execute()
if result:
self.logger.error("Failed to load mapped sObjects")
return result

if self.stage is LoadStage.INSERTS:
for s in self.steps:
self.logger.info("%s: starting load", s.sobjectname)
Expand Down Expand Up @@ -575,11 +591,7 @@ def add_dependency(self, sobjectname, id):
self.required_ids[sobjectname].add(id)

def get_dependencies(self, sobjectname):
return (
self.required_ids[sobjectname]
if sobjectname in self.required_ids
else set()
)
return self.required_ids.get(sobjectname, set())

def get_sobject_ids_for_reference(self, sobjectname, field):
ids = set()
Expand All @@ -593,21 +605,17 @@ def get_sobject_ids_for_reference(self, sobjectname, field):
return ids

def get_extracted_ids(self, sobjectname):
return (
self.extracted_ids[sobjectname]
if sobjectname in self.extracted_ids
else set()
)
return self.extracted_ids.get(sobjectname, set())

def store_result(self, sobjectname, record):
if sobjectname not in self.extracted_ids:
self.extracted_ids[sobjectname] = set()

if SalesforceId(record["Id"]) not in self.extracted_ids[sobjectname]:
self.logger.debug(
"%s: extracting record %s", sobjectname, SalesforceId(record["Id"])
)
self.extracted_ids[sobjectname].add(SalesforceId(record["Id"]))
record_id = SalesforceId(record["Id"])

if record_id not in self.extracted_ids[sobjectname]:
self.logger.debug("%s: extracting record %s", sobjectname, record_id)
self.extracted_ids[sobjectname].add(record_id)
self.file_store.get_csv(sobjectname, FileType.OUTPUT).writerow(
self.mappers[sobjectname].transform_record(record)
if sobjectname in self.mappers
Expand All @@ -616,9 +624,9 @@ def store_result(self, sobjectname, record):

if (
sobjectname in self.required_ids
and SalesforceId(record["Id"]) in self.required_ids[sobjectname]
and record_id in self.required_ids[sobjectname]
):
self.required_ids[sobjectname].remove(SalesforceId(record["Id"]))
self.required_ids[sobjectname].remove(record_id)


class ExtractionStep(Step):
Expand Down
32 changes: 31 additions & 1 deletion amaxa/loader/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import collections
import itertools
import json
import logging

Expand Down Expand Up @@ -200,6 +201,10 @@ def _validate_field_permissions(self, permission=None):
# Validate that all fields designated for steps are real, writeable,
# and of a supported type.
all_sobjects = [step.sobjectname for step in self.result.steps]
if "object-mappings" in self.input: # TODO: is this right for all cases?
all_sobjects.extend(
entry["sobject"] for entry in self.input.get("object-mappings")
)

for step in self.result.steps:
field_map = self.result.get_field_map(step.sobjectname)
Expand Down Expand Up @@ -248,6 +253,29 @@ def _validate_field_permissions(self, permission=None):
)

def _validate_sobjects(self, permission):
# Validate that sObjects are listed only once, and that they are not both mapped and included.
sobjects = collections.Counter(
entry["sobject"]
for entry in itertools.chain(
self.input["operation"], self.input.get("object-mappings", [])
)
)

duplicate_objects = list(
filter(
lambda f: sobjects[f] > 1,
sobjects.keys(),
)
)

if duplicate_objects:
sobject_list = "\n".join(duplicate_objects)
self.errors.append(
f"One or more sObjects is specified "
f"multiple times across `operation` and `object-mappings`: {sobject_list}"
)

# Validate that all listed sObjects can be accessed appropriately.
try:
global_describe = {
entry["name"]: entry
Expand All @@ -257,7 +285,9 @@ def _validate_sobjects(self, permission):
self.errors.append("Unable to authenticate to Salesforce: {}".format(e))
return

for entry in self.input["operation"]:
for entry in itertools.chain(
self.input["operation"], self.input.get("object-mappings", [])
):
sobject = entry["sobject"]

if (
Expand Down
25 changes: 20 additions & 5 deletions amaxa/loader/extract_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def _initialize(self):
def _load(self):
# Create the core operation
self.result = amaxa.ExtractOperation(self.connection)
self.filenames = []

options = self.input.get("options") or {}

Expand Down Expand Up @@ -79,6 +80,22 @@ def _load(self):

self._populate_lookup_behaviors(step, entry)
self.result.add_step(step)
self.filenames.append(entry["file"])

# Handle mapped sObjects
mapped_schema = self.input.get("object-mappings", [])
for mapping in mapped_schema:
sobject = mapping["sobject"]
key_fields = mapping["key-fields"]
step = amaxa.ExtractionStep(
sobject,
amaxa.ExtractionScope.DESCENDENTS,
["Id", *key_fields],
None,
options=options,
)
self.result.add_step(step)
self.filenames.append(mapping["file"])

def _post_load_validate(self):
self._validate_field_permissions()
Expand Down Expand Up @@ -125,9 +142,9 @@ def include(f):
def _open_files(self):
# Open all of the output files
# Create DictWriters and populate them in the context
for (step, entry) in zip(self.result.steps, self.input["operation"]):
for (step, filename) in zip(self.result.steps, self.filenames):
try:
file_handle = open(entry["file"], "w", newline="", encoding="utf-8")
file_handle = open(filename, "w", newline="", encoding="utf-8")
if step.sobjectname not in self.result.mappers:
fieldnames = step.field_scope
else:
Expand All @@ -152,7 +169,5 @@ def _open_files(self):
)
except IOError as exp:
self.errors.append(
"Unable to open file {} for writing ({}).".format(
entry["file"], exp
)
"Unable to open file {} for writing ({}).".format(filename, exp)
)
Loading