diff --git a/CHANGELOG.md b/CHANGELOG.md
index f81cebb4..fe1dd7c2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,29 @@
+### v0.3.2
+
+
+Released 2024-06-14
+
+* feature: Add `DebugOperation` for logging data head, tail, columns, or metadata midrun
+* feature: Add `FlattenOperation` for splitting and exploding string columns into values
+* feature: Add optional 'fill_missing_columns' field to `UnionOperation` to fill disjunct columns with nulls, instead of raising an error (default `False`)
+* feature: Add `git_auth_timeout` config when entering Git credentials during package composition
+* feature: [Add `earthmover clean` command that removes local project artifacts](https://github.com/edanalytics/earthmover/pull/87)
+* feature: only output compiled template during `earthmover compile`
+* feature: Render full row into JSON lines when `template` is undefined in `FileDestination`
+* internal: Move `FileSource` size-checking and `FtpSource` FTP-connecting from compile to execute
+* internal: Move template-file check from compile to execute in `FileDestination`
+* internal: Allow filepaths to be passed to an optional `FileSource`, and check for file before creating empty dataframe
+* internal: Build an empty dataframe if an empty folder is passed to an optional `FileSource`
+* internal: fix some examples in README
+* internal: remove GitPython dependency
+* bugfix: fix bug in `FileDestination` where `linearize: False` resulted in BOM characters
+* bugfix: fix bug where nested JSON would be loaded as a stringified Python dictionary
+* bugfix: [Ensure command list in help menu and log output is always consistent](https://github.com/edanalytics/earthmover/pull/87)
+* bugfix: fix bug in `ModifyColumnsOperation` where `__row_data__` was not exposed in Jinja templating
+
+
+
+
### v0.3.1
@@ -195,4 +221,4 @@
Released 2022-09-22
* initial release
-
\ No newline at end of file
+
diff --git a/README.md b/README.md
index 3e8b8a45..1c3639b3 100644
--- a/README.md
+++ b/README.md
@@ -85,8 +85,8 @@ transA: transA:
- operation: add_columns - operation: add_columns
source: $sources.A
columns: columns:
- - A: "a" - A: "a"
- - B: "b" - B: "b"
+ A: "a" A: "a"
+ B: "b" B: "b"
- operation: union - operation: union
sources: sources:
- $transformations.transA
@@ -133,6 +133,7 @@ config:
parameter_defaults:
SOURCE_DIR: ./sources/
show_progress: True
+ git_auth_timeout: 120
```
* (optional) `output_dir` determines where generated JSONL is stored. The default is `./`.
@@ -148,7 +149,7 @@ config:
* (optional) Specify Jinja `macros` which will be available within any Jinja template content throughout the project. (This can slow performance.)
* (optional) Specify `parameter_defaults` which will be used if the user fails to specify a particular [parameter](#command-line-parameters) or [environment variable](#environment-variable-references).
* (optional) Specify whether to `show_progress` while processing, via a Dask progress bar.
-
+* (optional) Specify the `git_auth_timeout` (in seconds) to wait for the user to enter Git credentials if needed during package installation; default is 60. See [project composition](#project-composition) for more details on package installation.
### **`definitions`**
The `definitions` section of the [YAML configuration](#yaml-configuration) is an optional section you can use to define configurations which are reused throughout the rest of the configuration. `earthmover` does nothing special with this section, it's just interpreted by the YAML parser. However, this can be a very useful way to keep your YAML configuration [DRY](https://en.wikipedia.org/wiki/Don%27t_repeat_yourself) – rather than redefine the same values, Jinja phrases, etc. throughout your config, define them once in this section and refer to them later using [YAML anchors, aliases, and overrides](https://www.linode.com/docs/guides/yaml-anchors-aliases-overrides-extensions/).
@@ -318,7 +319,10 @@ Concatenates the transformation source with one or more sources sources of the s
- $sources.courses_list_1
- $sources.courses_list_2
- $sources.courses_list_3
+ fill_missing_columns: False
```
+By default, unioning sources with different columns raises an error.
+Set `fill_missing_columns` to `True` to union all columns into the output dataframe.
@@ -371,10 +375,10 @@ Adds columns with specified values.
```yaml
- operation: add_columns
columns:
- - new_column_1: value_1
- - new_column_2: "{%raw%}{% if True %}Jinja works here{% endif %}{%endraw%}"
- - new_column_3: "{%raw%}Reference values from {{AnotherColumn}} in this new column{%endraw%}"
- - new_column_4: "{%raw%}{% if col1>col2 %}{{col1|float + col2|float}}{% else %}{{col1|float - col2|float}}{% endif %}{%endraw%}"
+ new_column_1: value_1
+ new_column_2: "{%raw%}{% if True %}Jinja works here{% endif %}{%endraw%}"
+ new_column_3: "{%raw%}Reference values from {{AnotherColumn}} in this new column{%endraw%}"
+ new_column_4: "{%raw%}{% if col1>col2 %}{{col1|float + col2|float}}{% else %}{{col1|float - col2|float}}{% endif %}{%endraw%}"
```
Use Jinja: `{{value}}` refers to this column's value; `{{AnotherColumn}}` refers to another column's value. Any [Jinja filters](https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters) and [math operations](https://jinja.palletsprojects.com/en/3.0.x/templates/#math) should work. Reference the current row number with `{{__row_number__}}` or a dictionary containing the row data with `{{__row_data__['column_name']}}`. *You must wrap Jinja expressions* in `{%raw%}...{%endraw%}` to avoid them being parsed at YAML load time.
@@ -565,6 +569,59 @@ By default, rows are sorted ascendingly. Set `descending: True` to reverse this
+
+flatten
+
+Split values in a column and create a copy of the row for each value.
+```yaml
+ - operation: flatten
+ flatten_column: my_column
+ left_wrapper: '["' # characters to trim from the left of values in `flatten_column`
+ right_wrapper: '"]' # characters to trim from the right of values in `flatten_column`
+ separator: "," # the string by which to split values in `flatten_column`
+ value_column: my_value # name of the new column to create with flattened values
+ trim_whitespace: " \t\r\n\"" # characters to trim from `value_column` _after_ flattening
+```
+The defaults above are designed to allow flattening JSON arrays (in a string) with simply
+```yaml
+ - operation: flatten
+ flatten_column: my_column
+ value_column: my_value
+```
+Note that for empty string values or empty arrays, a row will still be preserved. These can be removed in a second step with a `filter_rows` operation. Example:
+```yaml
+# Given a dataframe like this:
+# foo bar to_flatten
+# --- --- ----------
+# foo1 bar1 "[\"test1\",\"test2\",\"test3\"]"
+# foo2 bar2 ""
+# foo3 bar3 "[]"
+# foo4 bar4 "[\"test4\",\"test5\",\"test6\"]"
+#
+# a flatten operation like this:
+ - operation: flatten
+ flatten_column: to_flatten
+ value_column: my_value
+# will produce a dataframe like this:
+# foo bar my_value
+# --- --- --------
+# foo1 bar1 test1
+# foo1 bar1 test2
+# foo1 bar1 test3
+# foo2 bar2 ""
+# foo3 bar3 ""
+# foo4 bar4 test4
+# foo4 bar4 test5
+# foo4 bar4 test6
+#
+# and you can remove the blank rows if needed with a further operation:
+ - operation: filter_rows
+ query: my_value == ''
+ behavior: exclude
+```
+
+
+
#### Group operations
@@ -608,6 +665,23 @@ Note the difference between `min()`/`max()` and `str_min()`/`str_max()`: given a
+#### Debug operation
+
+
+debug
+
+Sort rows by one or more columns.
+```yaml
+ - operation: debug
+ function: head | tail | describe | columns # default=head
+ rows: 10 # (optional, default=5; ignored if function=describe|columns)
+ transpose: True # (default=False; ignored when function=columns)
+ skip_columns: [a, b, c] # to avoid logging PII
+ keep_columns: [x, y, z] # to look only at specific columns
+```
+`function=head|tail` displays the `rows` first or last rows of the dataframe, respectively. (Note that on large dataframes, these may not truly be the first/last rows, due to Dask's memory optimizations.) `function=describe` shows statistics about the values in the dataframe. `function=columns` shows the column names in the dataframe. `transpose` can be helpful with very wide dataframes. `keep_columns` defaults to all columns, `skip_columns` defaults to no columns.
+
+
### **`destinations`**
The `destinations` section of the [YAML configuration](#yaml-configuration) specifies how transformed data is materialized to files.
@@ -700,7 +774,7 @@ transformations:
operations:
- operation: add_columns
columns:
- - source_file: {{i}}
+ source_file: {{i}}
{% endfor %}
stacked:
source: $transformations.source1
@@ -891,7 +965,7 @@ Some details of the design of this tool are discussed below.
Note that due to step (3) above, *runtime* Jinja expressions (such as column definitions for `add_columns` or `modify_columns` operations) should be wrapped with `{%raw%}...{%endraw%}` to avoid being parsed when the YAML is being loaded.
-The parsed YAML is written to a file called `earthmover_compiled.yaml` in your working directory during a `compile` or `run` command. This file can be used to debug issues related to compile-time Jinja or [project composition](#project-composition).
+The parsed YAML is written to a file called `earthmover_compiled.yaml` in your working directory during a `compile` command. This file can be used to debug issues related to compile-time Jinja or [project composition](#project-composition).
## DAG
diff --git a/earthmover/VERSION.txt b/earthmover/VERSION.txt
index 9e11b32f..d15723fb 100644
--- a/earthmover/VERSION.txt
+++ b/earthmover/VERSION.txt
@@ -1 +1 @@
-0.3.1
+0.3.2
diff --git a/earthmover/__main__.py b/earthmover/__main__.py
index f24b92c6..dd33fb7c 100644
--- a/earthmover/__main__.py
+++ b/earthmover/__main__.py
@@ -5,6 +5,13 @@
from earthmover.earthmover import Earthmover
+# Any new command should be added to this list
+RUN = "run"
+COMPILE = "compile"
+DEPS = "deps"
+CLEAN = "clean"
+ALLOWED_COMMANDS = [RUN, COMPILE, DEPS, CLEAN]
+command_list = ", ".join(f"`{c}`" for c in ALLOWED_COMMANDS)
class ExitOnExceptionHandler(logging.StreamHandler):
"""
@@ -50,7 +57,7 @@ def main(argv=None):
parser.add_argument('command',
nargs="?",
type=str,
- help='the command to run: `run`, `compile`, `visualize`'
+ help=f'the command to run. One of: {command_list}'
)
parser.add_argument("-c", "--config-file",
nargs="?",
@@ -103,9 +110,17 @@ def main(argv=None):
})
### Parse the user-inputs and run Earthmover, depending on the command and subcommand passed.
- args, remaining_argv = parser.parse_known_args()
+ args, unknown_args = parser.parse_known_args()
+ if len(unknown_args) > 0:
+ unknown_args_str = ', '.join(f"`{c}`" for c in unknown_args)
+ print(f"unknown arguments {unknown_args_str} passed, use -h flag for help")
+ exit(1)
+
+ if args.command is not None and args.command not in ALLOWED_COMMANDS:
+ print(f"unknown command '{args.command}' passed, use -h flag for help")
+ exit(1)
- # Command: Version
+ # -v / --version
if args.version:
em_dir = os.path.dirname(os.path.abspath(__file__))
version_file = os.path.join(em_dir, 'VERSION.txt')
@@ -114,7 +129,7 @@ def main(argv=None):
print(f"earthmover, version {VERSION}")
exit(0)
- # Command: Test
+ # -t / --test
if args.test:
tests_dir = os.path.join( os.path.realpath(os.path.dirname(__file__)), "tests" )
@@ -130,7 +145,7 @@ def main(argv=None):
em.logger.info('tests passed successfully.')
exit(0)
- ### Otherwise, initialize Earthmover for a main run.
+ ### Otherwise, initialize Earthmover to execute a command.
if not args.config_file:
for file in DEFAULT_CONFIG_FILES:
test_file = os.path.join(".", file)
@@ -162,15 +177,15 @@ def main(argv=None):
force=args.force,
skip_hashing=args.skip_hashing,
cli_state_configs=cli_state_configs,
- results_file=args.results_file
+ results_file=args.results_file,
)
except Exception as err:
logger.exception(err, exc_info=True)
raise # Avoids linting error
- # Subcommand: deps (parse Earthmover YAML and compile listed packages)
- if args.command == 'deps':
+ # Command: deps (parse Earthmover YAML and compile listed packages)
+ if args.command == DEPS:
em.logger.info(f"installing packages...")
if args.selector != '*':
em.logger.info("selector is ignored for package install.")
@@ -182,23 +197,35 @@ def main(argv=None):
logger.exception(e, exc_info=em.state_configs['show_stacktrace'])
raise
- # Subcommand: compile (parse Earthmover YAML and build graph)
- elif args.command == 'compile':
+ # Command: compile (parse Earthmover YAML and build graph)
+ elif args.command == COMPILE:
em.logger.info(f"compiling project...")
if args.selector != '*':
em.logger.info("selector is ignored for compile-only run.")
try:
- em.compile()
+ em.compile(to_disk=True)
em.logger.info("looks ok")
except Exception as e:
logger.exception(e, exc_info=em.state_configs['show_stacktrace'])
raise
- # Subcommand: run (compile + execute)
+ elif args.command == CLEAN:
+ em.logger.info(f"removing local artifacts...")
+ if args.selector != '*':
+ em.logger.info("selector is ignored for project cleaning.")
+
+ try:
+ em.clean()
+ em.logger.info("done!")
+ except Exception as e:
+ logger.exception(e, exc_info=em.state_configs['show_stacktrace'])
+ raise
+
+ # Command: run (compile + execute)
# This is the default if none is specified.
- elif args.command == 'run' or not args.command:
+ elif args.command == RUN or not args.command:
if not args.command:
em.logger.warning("[no command specified; proceeding with `run` but we recommend explicitly giving a command]")
@@ -212,7 +239,7 @@ def main(argv=None):
raise
else:
- logger.exception("unknown command, use -h flag for help")
+ logger.exception(f"unknown command '{args.command}', use -h flag for help")
raise
diff --git a/earthmover/earthmover.py b/earthmover/earthmover.py
index feef1bb6..58b6fae9 100644
--- a/earthmover/earthmover.py
+++ b/earthmover/earthmover.py
@@ -5,6 +5,7 @@
import tempfile
import networkx as nx
import os
+import shutil
import time
import datetime
import pandas as pd
@@ -22,6 +23,7 @@
from typing import List, Optional
+COMPILED_YAML_FILE = "./earthmover_compiled.yaml"
class Earthmover:
"""
@@ -38,6 +40,7 @@ class Earthmover:
"show_stacktrace": False,
"tmp_dir": tempfile.gettempdir(),
"show_progress": False,
+ "git_auth_timeout": 60
}
sources: List[Source] = []
@@ -59,6 +62,7 @@ def __init__(self,
self.results_file = results_file
self.config_file = config_file
+ self.compiled_yaml_file = COMPILED_YAML_FILE
self.error_handler = ErrorHandler(file=self.config_file)
# Set a directory for installing packages
@@ -86,11 +90,6 @@ def __init__(self,
# Prepare the output directory for destinations.
self.state_configs['output_dir'] = os.path.expanduser(self.state_configs['output_dir'])
- if not os.path.isdir(self.state_configs['output_dir']):
- self.logger.info(
- f"creating output directory {self.state_configs['output_dir']}"
- )
- os.makedirs(self.state_configs['output_dir'], exist_ok=True)
# Set the temporary directory in cases of disk-spillage.
dask.config.set({'temporary_directory': self.state_configs['tmp_dir']})
@@ -128,7 +127,7 @@ def load_project_configs(self, filepath: str):
return configs
- def compile(self):
+ def compile(self, to_disk: bool = False):
"""
Parse optional packages, iterate the node configs, compile each Node, and build the graph.
Save the Nodes to their `Earthmover.{node_type}` objects.
@@ -141,7 +140,8 @@ def compile(self):
### Optionally merge packages to update user-configs and write the composed YAML to disk.
self.user_configs = self.merge_packages() or self.user_configs
- self.user_configs.to_disk("./earthmover_compiled.yaml")
+ if to_disk:
+ self.user_configs.to_disk(self.compiled_yaml_file)
### Compile the nodes and add to the graph type-by-type.
self.sources = self.compile_node_configs(
@@ -224,6 +224,12 @@ def execute(self, graph: Graph):
Iterate subgraphs in `Earthmover.graph` and execute each Node in order.
:return:
"""
+ if not os.path.isdir(self.state_configs['output_dir']):
+ self.logger.info(
+ f"creating output directory {self.state_configs['output_dir']}"
+ )
+ os.makedirs(self.state_configs['output_dir'], exist_ok=True)
+
for idx, component in enumerate(nx.weakly_connected_components(graph)):
self.logger.debug(f"processing component {idx}")
@@ -517,7 +523,7 @@ def build_package_graph(self, root_node: str, package_subgraph: Graph, packages_
# Install packages if necessary, or retrieve path to package yaml file
package_node = self.package_graph.nodes[package_name]
if install:
- installed_package_yaml = package_node['package'].install(packages_dir)
+ installed_package_yaml = package_node['package'].install(packages_dir, git_auth_timeout=self.state_configs['git_auth_timeout'])
else:
package_node['package'].package_path = os.path.join(packages_dir, package_name)
installed_package_yaml = package_node['package'].get_installed_config_file()
@@ -539,3 +545,22 @@ def build_package_graph(self, root_node: str, package_subgraph: Graph, packages_
nested_package_dir = os.path.join(package_node['package'].package_path, 'packages')
nested_package_subgraph = nx.ego_graph(self.package_graph, package_name)
self.build_package_graph(root_node=package_name, package_subgraph=nested_package_subgraph, packages_dir=nested_package_dir, install=install)
+
+ def clean(self):
+ """
+ Removes local artifacts created by `earthmover run`
+ :return:
+ """
+
+ was_noop = True
+ if os.path.isdir(self.state_configs['output_dir']):
+ shutil.rmtree(self.state_configs['output_dir'], ignore_errors = True)
+ was_noop = False
+
+ if os.path.isfile(self.compiled_yaml_file):
+ os.remove(self.compiled_yaml_file)
+ was_noop = False
+
+ if was_noop:
+ self.logger.warning("Nothing to remove!")
+ exit(1)
diff --git a/earthmover/nodes/destination.py b/earthmover/nodes/destination.py
index 5ad05607..5199beab 100644
--- a/earthmover/nodes/destination.py
+++ b/earthmover/nodes/destination.py
@@ -1,5 +1,3 @@
-import csv
-import jinja2
import os
import pandas as pd
import re
@@ -39,57 +37,54 @@ class FileDestination(Destination):
EXP = re.compile(r"\s+")
TEMPLATED_COL = "____OUTPUT____"
+ DEFAULT_TEMPLATE = """{ {% for col, val in __row_data__.items() %}"{{ col }}": {{ val | tojson }}{% if not loop.last %}, {% endif %}{% endfor %} }"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- self.template = self.error_handler.assert_get_key(self.config, 'template', dtype=str)
- self.header = self.config.get("header")
- self.footer = self.config.get("footer")
+ self.template = self.error_handler.assert_get_key(self.config, 'template', dtype=str, required=False, default=None)
+ self.header = self.error_handler.assert_get_key(self.config, 'header', dtype=str, required=False, default=None)
+ self.footer = self.error_handler.assert_get_key(self.config, 'footer', dtype=str, required=False, default=None)
+ self.linearize = self.error_handler.assert_get_key(self.config, 'linearize', dtype=bool, required=False, default=True)
+ self.extension = self.error_handler.assert_get_key(self.config, 'extension', dtype=str, required=False, default='')
+ self.jinja_template = None # Defined in execute()
#config->extension is optional: if not present, we assume the destination name has an extension
- extension = ""
- if "extension" in self.config:
- extension = f".{self.config['extension']}"
-
- self.file = os.path.join(
- self.earthmover.state_configs['output_dir'],
- f"{self.name}{extension}"
- )
+ filename = f"{self.name}.{self.extension}" if self.extension else self.name
+ self.file = os.path.join(self.earthmover.state_configs['output_dir'], filename)
+
+ def execute(self, **kwargs):
+ """
+
+ :return:
+ """
+ super().execute(**kwargs)
- #
+ # Prepare the Jinja template for rendering rows.
try:
- with open(self.template, 'r', encoding='utf-8') as fp:
- template_string = fp.read()
+ if self.template:
+ with open(self.template, 'r', encoding='utf-8') as fp:
+ template_string = fp.read()
+ else:
+ template_string = self.DEFAULT_TEMPLATE
- except Exception as err:
+ # Replace multiple spaces with a single space to flatten templates.
+ if self.linearize:
+ template_string = self.EXP.sub(" ", template_string)
+
+ self.jinja_template = util.build_jinja_template(template_string, macros=self.earthmover.macros)
+
+ except OSError as err:
self.error_handler.throw(
f"`template` file {self.template} cannot be opened ({err})"
)
raise
- #
- if self.config.get('linearize', True):
- template_string = self.EXP.sub(" ", template_string) # Replace multiple spaces with a single space.
-
- #
- try:
- self.jinja_template = util.build_jinja_template(template_string, macros=self.earthmover.macros)
-
except Exception as err:
- self.earthmover.error_handler.throw(
+ self.error_handler.throw(
f"syntax error in Jinja template in `template` file {self.template} ({err})"
)
raise
- def execute(self, **kwargs):
- """
- There is a bug in Dask where `dd.to_csv(mode='a', single_file=True)` fails.
- This is resolved in 2023.8.1: https://docs.dask.org/en/stable/changelog.html#id7
-
- :return:
- """
- super().execute(**kwargs)
-
# this renders each row without having to itertuples() (which is much slower)
# (meta=... below is how we prevent dask warnings that it can't infer the output data type)
self.data = (
@@ -103,15 +98,15 @@ def execute(self, **kwargs):
# Verify the output directory exists.
os.makedirs(os.path.dirname(self.file), exist_ok=True)
- # Write the optional header, the JSON lines as CSV (for performance), and the optional footer.
- self.data.to_csv(
- filename=self.file, single_file=True, mode='wt', index=False,
- header=[self.header] if self.header else False, # We must write the header directly due to aforementioned bug.
- escapechar="\x01", sep="\x02", quoting=csv.QUOTE_NONE, # Pretend to be CSV to improve performance
- )
+ # Write the optional header, each line, and the optional footer.
+ with open(self.file, 'w+', encoding='utf-8') as fp:
+
+ if self.header:
+ fp.write(self.header)
+
+ self.data.apply(lambda row: fp.write(row + '\n'), meta=pd.Series('string')).compute()
- if self.footer:
- with open(self.file, 'a', encoding='utf-8') as fp:
+ if self.footer:
fp.write(self.footer)
self.logger.debug(f"output `{self.file}` written")
diff --git a/earthmover/nodes/node.py b/earthmover/nodes/node.py
index c7644849..6afb64f4 100644
--- a/earthmover/nodes/node.py
+++ b/earthmover/nodes/node.py
@@ -164,7 +164,7 @@ def check_expectations(self, expectations: List[str]):
num_failed = len(result.query(f"{expectation_result_col}=='False'").index)
if num_failed > 0:
self.error_handler.throw(
- f"Source `${self.type}s.{self.name}` failed expectation `{expectation}` ({num_failed} rows fail)"
+ f"Source `{self.full_name}` failed expectation `{expectation}` ({num_failed} rows fail)"
)
else:
self.logger.info(
diff --git a/earthmover/nodes/source.py b/earthmover/nodes/source.py
index 5d0c437a..0fae440a 100644
--- a/earthmover/nodes/source.py
+++ b/earthmover/nodes/source.py
@@ -1,3 +1,4 @@
+import dask.config as dask_config
import dask.dataframe as dd
import ftplib
import io
@@ -8,7 +9,7 @@
from earthmover.nodes.node import Node
from earthmover import util
-from typing import Callable, List, Optional, Tuple
+from typing import List, Optional, Tuple
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from dask.dataframe.core import DataFrame
@@ -119,10 +120,12 @@ def __init__(self, *args, **kwargs):
)
raise
- #
- if not self.file and self.optional and ('columns' not in self.config or not isinstance(self.config['columns'], list)):
+ # Columns are required if a source is optional.
+ self.columns_list = self.error_handler.assert_get_key(self.config, 'columns', dtype=list, required=False)
+
+ if self.optional and not self.columns_list:
self.error_handler.throw(
- f"source `{self.name}` is optional and missing, but does not specify `columns` (which are required in this case)"
+ f"source `{self.name}` is optional, but does not specify `columns` (which are required in this case)"
)
raise
@@ -137,22 +140,10 @@ def __init__(self, *args, **kwargs):
)
raise
- #
- self.columns_list = self.error_handler.assert_get_key(self.config, 'columns', dtype=list, required=False)
-
- #
+ # Remote files cannot be size-checked in execute.
if "://" in self.file:
self.is_remote = True
- elif self.file and not self.optional:
- try:
- self.size = os.path.getsize(self.file)
- except FileNotFoundError:
- self.error_handler.throw(
- f"Source file {self.file} not found"
- )
- raise
-
def execute(self):
"""
@@ -164,10 +155,14 @@ def execute(self):
self._verify_packages(self.file_type)
try:
- if not self.file and self.optional:
+ # Build an empty dataframe if the path is not populated or if an empty directory is passed (for Parquet files).
+ if self.optional and not os.path.exists(self.file) or (os.path.isdir(self.file) and not os.listdir(self.file)):
self.data = pd.DataFrame(columns=self.columns_list, dtype="string")
else:
+ dask_config.set({'dataframe.convert-string': False})
self.data = self.read_lambda(self.file, self.config)
+ if not self.is_remote:
+ self.size = os.path.getsize(self.file)
# Verify the column list provided matches the number of columns in the dataframe.
if self.columns_list:
@@ -313,7 +308,16 @@ class FtpSource(Source):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.connection = self.error_handler.assert_get_key(self.config, 'connection', dtype=str)
+ self.ftp = None # FTP connection is made during execute.
+
+ def execute(self):
+ """
+ ftp://user:pass@host:port/path/to/file.ext
+ :return:
+ """
+ super().execute()
+ ### Parse the connection string and attempt connection to FTP.
# There's probably a network builtin to simplify this.
user, passwd, host, port, self.file = re.match(
r"ftp://(.*?):?(.*?)@?([^:/]*):?(.*?)/(.*)",
@@ -335,13 +339,6 @@ def __init__(self, *args, **kwargs):
f"source file {self.connection} could not be accessed: {err}"
)
- def execute(self):
- """
- ftp://user:pass@host:port/path/to/file.ext
- :return:
- """
- super().execute()
-
try:
# TODO: Can Dask read from FTP directly without this workaround?
flo = io.BytesIO()
diff --git a/earthmover/operations/dataframe.py b/earthmover/operations/dataframe.py
index ac6d2df0..b729a04b 100644
--- a/earthmover/operations/dataframe.py
+++ b/earthmover/operations/dataframe.py
@@ -146,12 +146,13 @@ class UnionOperation(Operation):
"""
allowed_configs: Tuple[str] = (
- 'operation', 'repartition', 'sources',
+ 'operation', 'repartition', 'sources', 'fill_missing_columns',
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sources = self.error_handler.assert_get_key(self.config, 'sources', dtype=list)
+ self.fill_missing_columns = self.error_handler.assert_get_key(self.config, 'fill_missing_columns', dtype=bool, required=False, default=False)
def execute(self, data: 'DataFrame', data_mapping: Dict[str, Node], **kwargs) -> 'DataFrame':
"""
@@ -164,8 +165,11 @@ def execute(self, data: 'DataFrame', data_mapping: Dict[str, Node], **kwargs) ->
source_data = data_mapping[source].data
if set(source_data.columns) != set(data.columns):
- self.error_handler.throw('dataframes to union do not share identical columns')
- raise
+ if self.fill_missing_columns:
+ self.logger.debug('Dataframes to union do not share identical columns. Missing columns will be filled with nulls.')
+ else:
+ self.error_handler.throw('dataframes to union do not share identical columns')
+ raise
try:
data = dd.concat([data, source_data], ignore_index=True)
@@ -177,3 +181,62 @@ def execute(self, data: 'DataFrame', data_mapping: Dict[str, Node], **kwargs) ->
raise
return data
+
+
+class DebugOperation(Operation):
+ """
+ """
+ allowed_configs: Tuple[str] = (
+ 'operation', 'function', 'rows', 'transpose', 'skip_columns', 'keep_columns'
+ )
+
+ DEBUG_FUNCTIONS = ['head', 'tail', 'describe', 'columns']
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.func = self.error_handler.assert_get_key(self.config, 'function', dtype=str, required=False, default="head")
+ self.rows = self.error_handler.assert_get_key(self.config, 'rows', dtype=int, required=False, default=5)
+ self.skip_columns = self.error_handler.assert_get_key(self.config, 'skip_columns', dtype=list, required=False, default=[])
+ self.keep_columns = self.error_handler.assert_get_key(self.config, 'keep_columns', dtype=list, required=False, default=None)
+ self.transpose = self.error_handler.assert_get_key(self.config, 'transpose', dtype=bool, required=False, default=False)
+
+ if self.func not in self.DEBUG_FUNCTIONS:
+ self.error_handler.throw(f"debug type `{self.func}` not defined")
+
+ def execute(self, data: 'DataFrame', data_mapping: Dict[str, Node], **kwargs) -> 'DataFrame':
+ """
+ :return:
+ """
+ super().execute(data, data_mapping=data_mapping, **kwargs)
+
+ # construct log message, removing reference to the debug operation
+ transformation_name = self.full_name.replace('.operations:debug', '')
+ rows_str = ' ' + str(self.rows) if self.func in ['head', 'tail'] else ''
+ transpose_str = ', Transpose' if self.transpose else ''
+ self.logger.info(f"debug ({self.func}{rows_str}{transpose_str}) for {transformation_name}:")
+
+ # `columns` debug does not require column selection or compute
+ if self.func == 'columns':
+ print(list(data.columns))
+ return data # do not actually transform the data
+
+ # otherwise, subset to desired columns
+ if not self.keep_columns:
+ self.keep_columns = list(data.columns)
+
+ selected_columns = [col for col in list(data.columns) if col in self.keep_columns and col not in self.skip_columns]
+ debug_data = data[selected_columns]
+
+ # call function, and display debug info
+ if self.func == 'head':
+ debug_data = debug_data.head(self.rows)
+ elif self.func == 'tail':
+ debug_data = debug_data.tail(self.rows)
+ elif self.func == 'describe':
+ debug_data = debug_data.compute().describe()
+
+ if self.transpose:
+ debug_data = debug_data.transpose().reset_index(names="column")
+
+ print(debug_data.to_string(index=False))
+ return data # do not actually transform the data
diff --git a/earthmover/operations/operation.py b/earthmover/operations/operation.py
index b01d2a7d..0b3e1553 100644
--- a/earthmover/operations/operation.py
+++ b/earthmover/operations/operation.py
@@ -30,6 +30,7 @@ def __new__(cls, name: str, config: 'YamlMapping', *, earthmover: 'Earthmover'):
operation_mapping = {
'join': dataframe_operations.JoinOperation,
'union': dataframe_operations.UnionOperation,
+ 'debug': dataframe_operations.DebugOperation,
'add_columns': column_operations.AddColumnsOperation,
'modify_columns': column_operations.ModifyColumnsOperation,
@@ -45,6 +46,7 @@ def __new__(cls, name: str, config: 'YamlMapping', *, earthmover: 'Earthmover'):
'distinct_rows': row_operations.DistinctRowsOperation,
'filter_rows': row_operations.FilterRowsOperation,
'sort_rows': row_operations.SortRowsOperation,
+ 'flatten': row_operations.FlattenOperation,
'group_by_with_rank': groupby_operations.GroupByWithRankOperation,
'group_by': groupby_operations.GroupByOperation,
diff --git a/earthmover/operations/row.py b/earthmover/operations/row.py
index e2d96638..67e56b72 100644
--- a/earthmover/operations/row.py
+++ b/earthmover/operations/row.py
@@ -1,8 +1,6 @@
-import dask
-
from earthmover.operations.operation import Operation
-from typing import List, Tuple
+from typing import Tuple
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from dask.dataframe.core import DataFrame
@@ -124,3 +122,66 @@ def execute(self, data: 'DataFrame', **kwargs):
raise
return data.sort_values(by=self.columns_list, ascending=(not self.descending))
+
+
+class FlattenOperation(Operation):
+ """
+
+ """
+ allowed_configs: Tuple[str] = (
+ 'operation', 'repartition',
+ 'flatten_column', 'left_wrapper', 'right_wrapper', 'separator', 'value_column', 'trim_whitespace'
+ )
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.flatten_column = self.error_handler.assert_get_key(self.config, 'flatten_column', dtype=str, required=True)
+ self.left_wrapper = self.error_handler.assert_get_key(self.config, 'left_wrapper', dtype=str, required=False, default="[\"'")
+ self.right_wrapper = self.error_handler.assert_get_key(self.config, 'right_wrapper', dtype=str, required=False, default="\"']")
+ self.separator = self.error_handler.assert_get_key(self.config, 'separator', dtype=str, required=False, default=',')
+ self.value_column = self.error_handler.assert_get_key(self.config, 'value_column', dtype=str, required=True)
+ self.trim_whitespace = self.error_handler.assert_get_key(self.config, 'trim_whitespace', dtype=str, required=False, default=" \t\r\n\"'")
+
+ def execute(self, data: 'DataFrame', **kwargs) -> 'DataFrame':
+ """
+
+ :return:
+ """
+ super().execute(data, **kwargs)
+
+ # Update the meta to reflect the flattened column.
+ target_dtypes = data.dtypes.to_dict()
+ target_dtypes.update({self.value_column: target_dtypes[self.flatten_column]})
+ del target_dtypes[self.flatten_column]
+
+ return data.map_partitions(self.flatten_partition, meta=target_dtypes)
+
+ def flatten_partition(self, df):
+
+ flattened_values_df = (df[self.flatten_column]
+ # force to a string before splitting
+ .astype("string")
+
+ # trim off `left_wrapper` and `right_wrapper` characters
+ .str.lstrip(self.left_wrapper)
+ .str.rstrip(self.right_wrapper)
+
+ # split by `separator` and explode rows
+ .str.split(self.separator, expand=True)
+ .stack()
+
+ # trim off `trim_whitespace` characters from each of the split values
+ .str.strip(self.trim_whitespace)
+
+ # remove the hierarchical index and set the `value_column` name
+ .reset_index(level=1)
+ .drop('level_1', axis=1)
+ .rename(columns={0: self.value_column})
+ )
+
+ # join the exploded df to the original and drop `flatten_column` which is no longer needed
+ return (df
+ .join(flattened_values_df)
+ .drop(self.flatten_column, axis=1)
+ .reset_index(drop=True)
+ )
diff --git a/earthmover/package.py b/earthmover/package.py
index b110227b..6958608e 100644
--- a/earthmover/package.py
+++ b/earthmover/package.py
@@ -1,6 +1,6 @@
-import git
import os
import shutil
+import subprocess
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from earthmover.earthmover import Earthmover
@@ -137,7 +137,7 @@ class LocalPackage(Package):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- def install(self, packages_dir):
+ def install(self, packages_dir, **kwargs):
"""
Makes a copy of a local package directory into /packages.
:return:
@@ -166,7 +166,7 @@ class GitHubPackage(Package):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- def install(self, packages_dir):
+ def install(self, packages_dir, git_auth_timeout):
"""
Clones a GitHub repository into /packages.
If a subdirectory is specified, clone the repository into a temporary folder and copy the desired subdirectory into /packages.
@@ -179,19 +179,30 @@ def install(self, packages_dir):
branch = self.error_handler.assert_get_key(self.config, 'branch', dtype=str, required=False, default=None)
tmp_package_path = os.path.join(packages_dir, 'tmp_git')
- os.mkdir(tmp_package_path)
+ os.makedirs(tmp_package_path, exist_ok=True)
+
+ try:
+ if branch:
+ command = ["git", "clone", "-b", branch, source_path, "."]
+ else: #If branch is not specified, default working branch is used
+ command = ["git", "clone", source_path, "."]
+
+ subprocess.run(command, cwd=tmp_package_path, timeout=git_auth_timeout)
- if branch:
- repo = git.Repo.clone_from(source_path, tmp_package_path, branch=branch)
- else: #If branch is not specified, default working branch is used
- repo = git.Repo.clone_from(source_path, tmp_package_path)
+ # Timeouts are implemented to prevent automated runs from hanging if the git clone command is prompting for credentials
+ except subprocess.TimeoutExpired:
+ shutil.rmtree(tmp_package_path)
+ self.error_handler.throw(
+ f"Git clone command timed out for the {self.name} package ({source_path}). Are git credentials correctly configured?"
+ )
+ raise
if subdirectory: # Avoids the package being nested in folders
- subdirectory_path = os.path.join(repo.working_tree_dir, subdirectory)
+ subdirectory_path = os.path.join(tmp_package_path, subdirectory)
shutil.copytree(subdirectory_path, self.package_path)
else:
- shutil.copytree(repo.working_tree_dir, self.package_path)
+ shutil.copytree(tmp_package_path, self.package_path)
- git.rmtree(repo.working_tree_dir)
+ shutil.rmtree(tmp_package_path)
return super().get_installed_config_file()
\ No newline at end of file
diff --git a/earthmover/util.py b/earthmover/util.py
index 35719896..aeefc8f7 100644
--- a/earthmover/util.py
+++ b/earthmover/util.py
@@ -80,7 +80,9 @@ def render_jinja_template(row: 'Series', template: jinja2.Template, template_str
:return:
"""
try:
- return template.render(row)
+ row_data = row.to_dict()
+ row_data.update({"__row_data__": row.to_dict()})
+ return template.render(row_data)
except Exception as err:
error_handler.ctx.remove('line')
diff --git a/example_projects/.gitignore b/example_projects/.gitignore
new file mode 100644
index 00000000..d0a0e8d9
--- /dev/null
+++ b/example_projects/.gitignore
@@ -0,0 +1 @@
+*/earthmover_compiled.yaml
diff --git a/requirements.txt b/requirements.txt
index 0a0ca928..d3231ffa 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,6 @@
wheel
aiohttp>=3.8.1
dask[dataframe]~=2023.5.0
-GitPython>=3.1.40
Jinja2>=2.11.3
networkx>=2.6.3
pandas>=1.3.5