From 617e577ecf76fa8273934ff047013ebae97a8cfc Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Sat, 28 Sep 2019 18:02:46 +0300 Subject: [PATCH 01/28] FIX(build): py2 needs pinning networkx-2.2 --- setup.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index bd7883f4..d3dfec84 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,10 @@ author_email='huyng@yahoo-inc.com', url='http://github.com/yahoo/graphkit', packages=['graphkit'], - install_requires=['networkx'], + install_requires=[ + "networkx; python_version >= '3.5'", + "networkx == 2.2; python_version < '3.5'", + ], extras_require={ 'plot': ['pydot', 'matplotlib'] }, From f58d14865f45f07e5125aed9b0a3f151073fa122 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Mon, 30 Sep 2019 00:36:57 +0300 Subject: [PATCH 02/28] FIX(#13): BUG in plot-diagram writtin from PY2 era, were writing in text-mode in PY3. and failing as encoding error. --- graphkit/network.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index 0df3ddf8..24c3ac37 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -422,8 +422,8 @@ def get_node_name(a): # save plot if filename: - basename, ext = os.path.splitext(filename) - with open(filename, "w") as fh: + _basename, ext = os.path.splitext(filename) + with open(filename, "wb") as fh: if ext.lower() == ".png": fh.write(g.create_png()) elif ext.lower() == ".dot": From 52c0d7797489866c0a2b745b44014a3bf6626286 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Tue, 1 Oct 2019 01:00:48 +0300 Subject: [PATCH 03/28] enh(test): + x2 TC breaking UNSATISFIED operations... receiving partial inputs, needed for other operations. --- test/test_graphkit.py | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/test/test_graphkit.py b/test/test_graphkit.py index bd97b317..a6d4dcb3 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -5,7 +5,7 @@ import pickle from pprint import pprint -from operator import add +from operator import add, sub, floordiv, mul from numpy.testing import assert_raises import graphkit.network as network @@ -184,6 +184,38 @@ def test_pruning_raises_for_bad_output(): outputs=['sum1', 'sum3', 'sum4']) +def test_unsatisfied_operations(): + # Test that operations with partial inputs are culled and not failing. + graph = compose(name="graph")( + operation(name="add", needs=["a", "b1"], provides=["a+b1"])(add), + operation(name="sub", needs=["a", "b2"], provides=["a-b2"])(sub), + ) + + exp = {"a": 10, "b1": 2, "a+b1": 12} + assert graph({"a": 10, "b1": 2}) == exp + assert graph({"a": 10, "b1": 2}, outputs=["a+b1"]) == {"a+b1": 12} + + exp = {"a": 10, "b2": 2, "a-b2": 8} + assert graph({"a": 10, "b2": 2}) == exp + assert graph({"a": 10, "b2": 2}, outputs=["a-b2"]) == {"a-b2": 8} + +def test_unsatisfied_operations_same_out(): + # Test unsatisfied pairs of operations providing the same output. + graph = compose(name="graph")( + operation(name="mul", needs=["a", "b1"], provides=["ab"])(mul), + operation(name="div", needs=["a", "b2"], provides=["ab"])(floordiv), + operation(name="add", needs=["ab", "c"], provides=["ab_plus_c"])(add), + ) + + exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21} + assert graph({"a": 10, "b1": 2, "c": 1}) == exp + assert graph({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == {"ab_plus_c": 21} + + exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6} + assert graph({"a": 10, "b2": 2, "c": 1}) == exp + assert graph({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == {"ab_plus_c": 6} + + def test_optional(): # Test that optional() needs work as expected. From bc4c2211d25466896cb5738c4fd9d00c04633e6e Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Sun, 29 Sep 2019 19:51:57 +0300 Subject: [PATCH 04/28] ENH(net,#18): ignore UN-SATISFIABLE operations with partial inputs + The x2 TCs added just before are now passing. --- graphkit/network.py | 67 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/graphkit/network.py b/graphkit/network.py index 24c3ac37..04f0dca1 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -8,6 +8,7 @@ from io import StringIO from .base import Operation +from .modifiers import optional class DataPlaceholderNode(str): @@ -141,6 +142,65 @@ def compile(self): raise TypeError("Unrecognized network graph node") + def _collect_satisfiable_needs(self, operation, inputs, satisfiables, visited): + """ + Recusrively check if operation inputs are given/calculated (satisfied), or not. + + :param satisfiables: + the set to populate with satisfiable operations + + :param visited: + a cache of operations & needs, not to visit them again + :return: + true if opearation is satisfiable + """ + assert isinstance(operation, Operation), ( + "Expected Operation, got:", + type(operation), + ) + + if operation in visited: + return visited[operation] + + + def is_need_satisfiable(need): + if need in visited: + return visited[need] + + if need in inputs: + satisfied = True + else: + need_providers = list(self.graph.predecessors(need)) + satisfied = bool(need_providers) and any( + self._collect_satisfiable_needs(op, inputs, satisfiables, visited) + for op in need_providers + ) + visited[need] = satisfied + + return satisfied + + satisfied = all( + is_need_satisfiable(need) + for need in operation.needs + if not isinstance(need, optional) + ) + if satisfied: + satisfiables.add(operation) + visited[operation] = satisfied + + return satisfied + + + def _collect_satisfiable_operations(self, nodes, inputs): + satisfiables = set() + visited = {} + for node in nodes: + if node not in visited and isinstance(node, Operation): + self._collect_satisfiable_needs(node, inputs, satisfiables, visited) + + return satisfiables + + def _find_necessary_steps(self, outputs, inputs): """ Determines what graph steps need to pe run to get to the requested @@ -204,6 +264,13 @@ def _find_necessary_steps(self, outputs, inputs): # Get rid of the unnecessary nodes from the set of necessary ones. necessary_nodes -= unnecessary_nodes + # Drop (un-satifiable) operations with partial inputs. + # See https://github.com/yahoo/graphkit/pull/18 + # + satisfiables = self._collect_satisfiable_operations(necessary_nodes, inputs) + for node in list(necessary_nodes): + if isinstance(node, Operation) and node not in satisfiables: + necessary_nodes.remove(node) necessary_steps = [step for step in self.steps if step in necessary_nodes] From b8daa07bc83b249276107eedf64f9a1d12f584a6 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Tue, 1 Oct 2019 12:27:17 +0300 Subject: [PATCH 05/28] refact(net): drop old `dag` nx-package --- graphkit/network.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphkit/network.py b/graphkit/network.py index 0df3ddf8..bb5a198c 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -107,7 +107,7 @@ def compile(self): self.steps = [] # create an execution order such that each layer's needs are provided. - ordered_nodes = list(nx.dag.topological_sort(self.graph)) + ordered_nodes = list(nx.topological_sort(self.graph)) # add Operations evaluation steps, and instructions to free data. for i, node in enumerate(ordered_nodes): From 12bdfe4965ab0c08606bf0551799f42057eb7776 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Tue, 1 Oct 2019 15:00:21 +0300 Subject: [PATCH 06/28] ENH(core): ORDERED SETs for DETERMINISTIC results NOTE dict are not deterministic in = '3.5'", "networkx == 2.2; python_version < '3.5'", + "boltons" # for IndexSet ], extras_require={ 'plot': ['pydot', 'matplotlib'] From 489b32c0ed5cae94baa58d4a588d638959fc4e3e Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Tue, 1 Oct 2019 17:26:29 +0300 Subject: [PATCH 07/28] refact(net): simpilify del-instruction loop --- graphkit/network.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index db9b91de..9fbfd1e9 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -126,19 +126,16 @@ def compile(self): # Add instructions to delete predecessors as possible. A # predecessor may be deleted if it is a data placeholder that # is no longer needed by future Operations. - for predecessor in self.graph.predecessors(node): + for need in self.graph.pred[node]: if self._debug: - print("checking if node %s can be deleted" % predecessor) - predecessor_still_needed = False + print("checking if node %s can be deleted" % need) for future_node in ordered_nodes[i+1:]: - if isinstance(future_node, Operation): - if predecessor in future_node.needs: - predecessor_still_needed = True - break - if not predecessor_still_needed: + if isinstance(future_node, Operation) and need in future_node.needs: + break + else: if self._debug: - print(" adding delete instruction for %s" % predecessor) - self.steps.append(DeleteInstruction(predecessor)) + print(" adding delete instruction for %s" % need) + self.steps.append(DeleteInstruction(need)) else: raise TypeError("Unrecognized network graph node") From b102d44358ef1e60e6d6cffea516bcf11ffe864d Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Tue, 1 Oct 2019 19:01:13 +0300 Subject: [PATCH 08/28] REFACT(unsatisfied): doubly-recursive func --> loop on topo-sorted --- graphkit/network.py | 100 ++++++++++++++++++-------------------------- 1 file changed, 40 insertions(+), 60 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index 9fbfd1e9..cad561ee 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -5,7 +5,10 @@ import os import networkx as nx +from collections import defaultdict from io import StringIO +from itertools import chain + from boltons.setutils import IndexedSet as iset @@ -138,66 +141,45 @@ def compile(self): self.steps.append(DeleteInstruction(need)) else: - raise TypeError("Unrecognized network graph node") + raise TypeError("Unrecognized network graph node %s" % type(node)) - def _collect_satisfiable_needs(self, operation, inputs, satisfiables, visited): + def _collect_unsatisfiable_operations(self, necessary_nodes, inputs): """ - Recusrively check if operation inputs are given/calculated (satisfied), or not. - - :param satisfiables: - the set to populate with satisfiable operations - - :param visited: - a cache of operations & needs, not to visit them again - :return: - true if opearation is satisfiable + Traverse ordered graph and mark satisfied needs on each operation, + + collecting those missing at least one. + Since the graph is ordered, as soon as we're on an operation, + all its needs have been accounted, so we can get its satisfaction. + + :param necessary_nodes: + the subset of the graph to consider but WITHOUT the initial data + (because that is what :meth:`_find_necessary_steps()` can gives us...) + :param inputs: + an iterable of the names of the input values + return: + a list of unsatisfiable operations """ - assert isinstance(operation, Operation), ( - "Expected Operation, got:", - type(operation), - ) - - if operation in visited: - return visited[operation] - - - def is_need_satisfiable(need): - if need in visited: - return visited[need] - - if need in inputs: - satisfied = True - else: - need_providers = list(self.graph.predecessors(need)) - satisfied = bool(need_providers) and any( - self._collect_satisfiable_needs(op, inputs, satisfiables, visited) - for op in need_providers - ) - visited[need] = satisfied - - return satisfied - - satisfied = all( - is_need_satisfiable(need) - for need in operation.needs - if not isinstance(need, optional) - ) - if satisfied: - satisfiables.add(operation) - visited[operation] = satisfied - - return satisfied - - - def _collect_satisfiable_operations(self, nodes, inputs): - satisfiables = set() # unordered, not iterated - visited = {} - for node in nodes: - if node not in visited and isinstance(node, Operation): - self._collect_satisfiable_needs(node, inputs, satisfiables, visited) + G = self.graph # shortcut + ok_data = set(inputs) # to collect producible data + op_satisfaction = defaultdict(set) # to collect operation satisfiable needs + unsatisfiables = [] # to collect operations with partial needs + # We also need inputs to mark op_satisfaction. + nodes = chain(necessary_nodes, inputs) # note that `inputs` are plain strings + for node in nx.topological_sort(G.subgraph(nodes)): + if isinstance(node, Operation): + real_needs = set(n for n in node.needs if not isinstance(n, optional)) + if real_needs.issubset(op_satisfaction[node]): + # mark all future data-provides as ok + ok_data.update(G.adj[node]) + else: + unsatisfiables.append(node) + elif isinstance(node, (DataPlaceholderNode, str)) and node in ok_data: + # mark satisfied-needs on all future operations + for future_op in G.adj[node]: + op_satisfaction[future_op].add(node) - return satisfiables + return unsatisfiables def _find_necessary_steps(self, outputs, inputs): @@ -264,12 +246,10 @@ def _find_necessary_steps(self, outputs, inputs): necessary_nodes -= unnecessary_nodes # Drop (un-satifiable) operations with partial inputs. - # See https://github.com/yahoo/graphkit/pull/18 + # See yahoo/graphkit#18 # - satisfiables = self._collect_satisfiable_operations(necessary_nodes, inputs) - for node in list(necessary_nodes): - if isinstance(node, Operation) and node not in satisfiables: - necessary_nodes.remove(node) + unsatisfiables = self._collect_unsatisfiable_operations(necessary_nodes, inputs) + necessary_nodes -= set(unsatisfiables) necessary_steps = [step for step in self.steps if step in necessary_nodes] From de0288524014c21a241d207a558e7da695265f5f Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Wed, 2 Oct 2019 04:09:21 +0300 Subject: [PATCH 09/28] test(dag,#25): FAILing TC for overriding intermediate data --- test/test_graphkit.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/test_graphkit.py b/test/test_graphkit.py index bd97b317..9a6473cf 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -184,6 +184,17 @@ def test_pruning_raises_for_bad_output(): outputs=['sum1', 'sum3', 'sum4']) +def test_pruning_not_overrides_given_intermediate(): + # Test #25: not overriding intermediate data when an output is not asked + graph = compose(name="graph")( + operation(name="unjustly run", needs=["a"], provides=["overriden"])(lambda a: a), + operation(name="op", needs=["overriden", "c"], provides=["asked"])(add), + ) + + assert graph({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == {"asked": 3} # that"s ok + assert graph({"a": 5, "overriden": 1, "c": 2}) == {"a": 5, "overriden": 1, "c": 2, "asked": 3} # FAILs + + def test_optional(): # Test that optional() needs work as expected. From e1454fdbdeac0a800f41fc32397a3b189ff8e7c4 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Wed, 2 Oct 2019 06:23:20 +0300 Subject: [PATCH 10/28] test(dag,#24): FAILing TC for over-pruning inetermediates when outs asked --- test/test_graphkit.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/test_graphkit.py b/test/test_graphkit.py index 9a6473cf..a34ab7e4 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -195,6 +195,18 @@ def test_pruning_not_overrides_given_intermediate(): assert graph({"a": 5, "overriden": 1, "c": 2}) == {"a": 5, "overriden": 1, "c": 2, "asked": 3} # FAILs +def test_pruning_with_given_intermediate_and_asked_out(): + # Test pruning intermidate data is the same when outputs are (not) asked . + graph = compose(name="graph")( + operation(name="unjustly pruned", needs=["given-1"], provides=["a"])(lambda a: a), + operation(name="shortcuted", needs=["a", "b"], provides=["given-2"])(add), + operation(name="good_op", needs=["a", "given-2"], provides=["asked"])(add), + ) + + assert graph({"given-1": 5, "b": 2, "given-2": 2}) == {"given-1": 5, "b": 2, "given-2": 7, "a": 5, "b": 2, "asked": 12} # that ok # FAILS! + assert graph({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == {"asked": 12} # FAILS! + + def test_optional(): # Test that optional() needs work as expected. From c2730685c880eebfbb4ad66256606bd7000564d0 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Wed, 2 Oct 2019 14:11:59 +0300 Subject: [PATCH 11/28] DOC(net): explain DAG solution & compilation... the later described in #21. --- graphkit/network.py | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index cad561ee..30ea7189 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -27,7 +27,7 @@ def __repr__(self): class DeleteInstruction(str): """ - An instruction for the compiled list of evaluation steps to free or delete + An instruction in the compiled list of operation steps to free or delete a Data instance from the Network's cache after it is no longer needed. """ def __repr__(self): @@ -39,6 +39,26 @@ class Network(object): This is the main network implementation. The class contains all of the code necessary to weave together operations into a directed-acyclic-graph (DAG) and pass data through. + + The computation, ie the execution of the *operations* for given *inputs* + and asked *outputs* is based on 4 data-structures: + + - The `networkx` :attr:`graph` DAG, containing interchanging layers of + :class:`Operation` and :class:`DataPlaceholderNode` nodes. + They are layed out and connected by :meth:`add_OP`. + + - the :attr:`steps` list holding all operation nodes in *execution order*. + It is constructed in :meth:`compile()` after all nodes have been added + into the `graph`. + + - The ``necessary_steps`` list which is the *DAG solution* of each run, and + is always a subset of :attr:`steps`. + It is computed by :meth:`_find_necessary_steps()` and cached in + :attr:`_necessary_steps_cache` across runs with the same inputs/outputs. + + - the :var:`cache` local-var, initialized on each run of both + ``_compute_xxx`` methods (for parallel or sequential executions), to + holding all given input & generated (aka intermediate) data values. """ def __init__(self, **kwargs): @@ -106,8 +126,17 @@ def show_layers(self): def compile(self): - """Create a set of steps for evaluating layers - and freeing memory as necessary""" + """ + Create a list of operations to evaluate layers and free memory asap + + + In the list :class:`DeleteInstructions` steps (DA) are inserted between + operation nodes to reduce the memory footprint of cached results. + A DA is inserted whenever a *need* is not used by any other *operation* + further down the DAG. + Note that since the *cache* is not reused across `compute()` invocations, + any memory-reductions are for as long as a single computation runs. + """ # clear compiled steps self.steps = [] @@ -150,7 +179,7 @@ def _collect_unsatisfiable_operations(self, necessary_nodes, inputs): collecting those missing at least one. Since the graph is ordered, as soon as we're on an operation, - all its needs have been accounted, so we can get its satisfaction. + all its needs have been accounted, so we can get its satisfaction. :param necessary_nodes: the subset of the graph to consider but WITHOUT the initial data From 16d42f1bd3523d712fa3f75b28ae06100288bc61 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Wed, 2 Oct 2019 15:00:05 +0300 Subject: [PATCH 12/28] TEST(prune): +Failing x2 TCs multi-out must run but not... override intermediate data. More changes only for newer pruning TCs: + refact(test): rename graph-->netop vars for results of compose(), to avoid of `graph.net.graph`. + Explain failure modes in v1.2.4 & this merged branch (#19 + #23). --- test/test_graphkit.py | 111 +++++++++++++++++++++++++++++++++--------- 1 file changed, 89 insertions(+), 22 deletions(-) diff --git a/test/test_graphkit.py b/test/test_graphkit.py index 5c731a29..9ee74b20 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -12,6 +12,21 @@ import graphkit.modifiers as modifiers from graphkit import operation, compose, Operation + +def identity(x): + return x + + +def filtdict(d, *keys): + """ + Keep dict items with the given keys + + >>> filtdict({"a": 1, "b": 2}, "b") + {"b": 2} + """ + return type(d)(i for i in d.items() if i[0] in keys) + + def test_network(): # Sum operation, late-bind compute function @@ -185,58 +200,110 @@ def test_pruning_raises_for_bad_output(): def test_pruning_not_overrides_given_intermediate(): - # Test #25: not overriding intermediate data when an output is not asked - graph = compose(name="graph")( - operation(name="unjustly run", needs=["a"], provides=["overriden"])(lambda a: a), + # Test #25: v1.2.4 overrides intermediate data when no output asked + netop = compose(name="netop")( + operation(name="unjustly run", needs=["a"], provides=["overriden"])(identity), operation(name="op", needs=["overriden", "c"], provides=["asked"])(add), ) - assert graph({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == {"asked": 3} # that"s ok - assert graph({"a": 5, "overriden": 1, "c": 2}) == {"a": 5, "overriden": 1, "c": 2, "asked": 3} # FAILs + exp = {"a": 5, "overriden": 1, "c": 2, "asked": 3} + # v1.2.4.ok + assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + # FAILs + # - on v1.2.4 with (overriden, asked): = (5, 7) instead of (1, 3) + # - on #18(unsatisfied) + #23(ordered-sets) with (overriden, asked) = (5, 7) instead of (1, 3) + assert netop({"a": 5, "overriden": 1, "c": 2}) == exp + + +def test_pruning_multiouts_not_override_intermediates1(): + # Test #25: v.1.2.4 overrides intermediate data when a previous operation + # must run for its other outputs (outputs asked or not) + netop = compose(name="netop")( + operation(name="must run", needs=["a"], provides=["overriden", "calced"]) + (lambda x: (x, 2 * x)), + operation(name="add", needs=["overriden", "calced"], provides=["asked"])(add), + ) + + exp = {"a": 5, "overriden": 1, "calced": 10, "asked": 11} + # FAILs + # - on v1.2.4 with (overriden, asked) = (5, 15) instead of (1, 11) + # - on #18(unsatisfied) + #23(ordered-sets) like v1.2.4. + assert netop({"a": 5, "overriden": 1}) == exp + # FAILs + # - on v1.2.4 with KeyError: 'e', + # - on #18(unsatisfied) + #23(ordered-sets) with empty result. + assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + + +def test_pruning_multiouts_not_override_intermediates2(): + # Test #25: v.1.2.4 overrides intermediate data when a previous operation + # must run for its other outputs (outputs asked or not) + netop = compose(name="netop")( + operation(name="must run", needs=["a"], provides=["overriden", "e"]) + (lambda x: (x, 2 * x)), + operation(name="op1", needs=["overriden", "c"], provides=["d"])(add), + operation(name="op2", needs=["d", "e"], provides=["asked"])(mul), + ) + + exp = {"a": 5, "overriden": 1, "c": 2, "asked": 3} + # FAILs + # - on v1.2.4 with (overriden, asked) = (5, 70) instead of (1, 13) + # - on #18(unsatisfied) + #23(ordered-sets) like v1.2.4. + assert netop({"a": 5, "overriden": 1, "c": 2}) == exp + # FAILs + # - on v1.2.4 with KeyError: 'e', + # - on #18(unsatisfied) + #23(ordered-sets) with empty result. + assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") def test_pruning_with_given_intermediate_and_asked_out(): - # Test pruning intermidate data is the same when outputs are (not) asked . - graph = compose(name="graph")( - operation(name="unjustly pruned", needs=["given-1"], provides=["a"])(lambda a: a), + # Test #24: v1.2.4 does not prune before given intermediate data when + # outputs not asked, but does so when output asked. + netop = compose(name="netop")( + operation(name="unjustly pruned", needs=["given-1"], provides=["a"])(identity), operation(name="shortcuted", needs=["a", "b"], provides=["given-2"])(add), operation(name="good_op", needs=["a", "given-2"], provides=["asked"])(add), ) - assert graph({"given-1": 5, "b": 2, "given-2": 2}) == {"given-1": 5, "b": 2, "given-2": 7, "a": 5, "b": 2, "asked": 12} # that ok # FAILS! - assert graph({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == {"asked": 12} # FAILS! + exp = {"given-1": 5, "b": 2, "given-2": 7, "a": 5, "asked": 12} + # v1.2.4 is ok + assert netop({"given-1": 5, "b": 2, "given-2": 2}) == exp + # FAILS + # - on v1.2.4 with KeyError: 'a', + # - on #19 (unsatisfied) with no result. + assert netop({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") def test_unsatisfied_operations(): # Test that operations with partial inputs are culled and not failing. - graph = compose(name="graph")( + netop = compose(name="netop")( operation(name="add", needs=["a", "b1"], provides=["a+b1"])(add), operation(name="sub", needs=["a", "b2"], provides=["a-b2"])(sub), ) - + exp = {"a": 10, "b1": 2, "a+b1": 12} - assert graph({"a": 10, "b1": 2}) == exp - assert graph({"a": 10, "b1": 2}, outputs=["a+b1"]) == {"a+b1": 12} + assert netop({"a": 10, "b1": 2}) == exp + assert netop({"a": 10, "b1": 2}, outputs=["a+b1"]) == filtdict(exp, "a+b1") exp = {"a": 10, "b2": 2, "a-b2": 8} - assert graph({"a": 10, "b2": 2}) == exp - assert graph({"a": 10, "b2": 2}, outputs=["a-b2"]) == {"a-b2": 8} + assert netop({"a": 10, "b2": 2}) == exp + assert netop({"a": 10, "b2": 2}, outputs=["a-b2"]) == filtdict(exp, "a-b2") def test_unsatisfied_operations_same_out(): # Test unsatisfied pairs of operations providing the same output. - graph = compose(name="graph")( + netop = compose(name="netop")( operation(name="mul", needs=["a", "b1"], provides=["ab"])(mul), operation(name="div", needs=["a", "b2"], provides=["ab"])(floordiv), operation(name="add", needs=["ab", "c"], provides=["ab_plus_c"])(add), ) - + exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21} - assert graph({"a": 10, "b1": 2, "c": 1}) == exp - assert graph({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == {"ab_plus_c": 21} + assert netop({"a": 10, "b1": 2, "c": 1}) == exp + assert netop({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6} - assert graph({"a": 10, "b2": 2, "c": 1}) == exp - assert graph({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == {"ab_plus_c": 6} + assert netop({"a": 10, "b2": 2, "c": 1}) == exp + assert netop({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") def test_optional(): From b92f103ee035b5e4fc8a8a49b90df96ae547e66a Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Wed, 2 Oct 2019 16:11:58 +0300 Subject: [PATCH 13/28] refact(dag): call compile() before compute.compute... not after compose(). + All TCs pass ok. + NOTE this is not yet what is described in #21. --- graphkit/functional.py | 1 - graphkit/network.py | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/graphkit/functional.py b/graphkit/functional.py index 9de470a5..dcbe2042 100644 --- a/graphkit/functional.py +++ b/graphkit/functional.py @@ -209,6 +209,5 @@ def order_preserving_uniquifier(seq, seen=None): net = Network() for op in operations: net.add_op(op) - net.compile() return NetworkOperation(name=self.name, needs=needs, provides=provides, params={}, net=net) diff --git a/graphkit/network.py b/graphkit/network.py index 30ea7189..9ebdfd27 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -307,11 +307,12 @@ def compute(self, outputs, named_inputs, method=None): :returns: a dictionary of output data objects, keyed by name. """ - # assert that network has been compiled - assert self.steps, "network must be compiled before calling compute." assert isinstance(outputs, (list, tuple)) or outputs is None,\ "The outputs argument must be a list" + # Compile lazily here. + if not self.steps: + self.compile() # choose a method of execution if method == "parallel": From 6d1884e9de6e3be2328aca7ce5502a028cf5caf4 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Wed, 2 Oct 2019 17:07:42 +0300 Subject: [PATCH 14/28] test(dag): +TC checking DeleteInst vary when inputs change --- test/test_graphkit.py | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/test/test_graphkit.py b/test/test_graphkit.py index 9ee74b20..dd0ef5cf 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -270,7 +270,7 @@ def test_pruning_with_given_intermediate_and_asked_out(): assert netop({"given-1": 5, "b": 2, "given-2": 2}) == exp # FAILS # - on v1.2.4 with KeyError: 'a', - # - on #19 (unsatisfied) with no result. + # - on #18 (unsatisfied) with no result. assert netop({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") @@ -348,6 +348,38 @@ def addplusplus(a, b, c=0): assert 'sum2' in results +def test_deleteinstructs_vary_with_inputs(): + # Check #21: DeleteInstructions positions vary when inputs change. + netop = compose(name="netop")( + operation(name="a free without b", needs=["a"], provides=["aa"])(identity), + operation(name="satisfiable", needs=["a", "b"], provides=["ab"])(add), + operation(name="optional ab", needs=["aa", modifiers.optional("ab")], provides=["asked"]) + (lambda a, ab=10: a + ab), + ) + + inp = {"a": 2, "b": 3} + exp = inp.copy(); exp.update({"aa": 2, "ab": 5, "asked": 7}) + res = netop(inp) + assert res == exp # ok + steps11 = netop.net.steps + res = netop(inp, outputs=["asked"]) + assert res == filtdict(exp, "asked") # ok + steps12 = netop.net.steps + + inp = {"a": 2} + exp = inp.copy(); exp.update({"aa": 2, "asked": 12}) + res = netop(inp) + assert res == exp # ok + steps21 = netop.net.steps + res = netop(inp, outputs=["asked"]) + assert res == filtdict(exp, "asked") # ok + steps22 = netop.net.steps + + assert steps11 == steps12 + assert steps21 == steps22 + assert steps11 != steps21 # FAILs in v1.2.4 + #18 + assert steps12 != steps22 # FAILs in v1.2.4 + #18 + def test_parallel_execution(): import time From 619cae72af228a9f1885dbc17f97a52f1c9e2d00 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Wed, 2 Oct 2019 23:28:03 +0300 Subject: [PATCH 15/28] ENH(net): move compile() after SOLVE DAG ... to pass +TC checking DeleteInst vary when inputs change. - x4 TCs still failing, and need revamp of dag-solution. --- graphkit/network.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index 9ebdfd27..60d959b1 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -125,7 +125,7 @@ def show_layers(self): print("") - def compile(self): + def compile(self, dag): """ Create a list of operations to evaluate layers and free memory asap @@ -142,7 +142,7 @@ def compile(self): self.steps = [] # create an execution order such that each layer's needs are provided. - ordered_nodes = iset(nx.topological_sort(self.graph)) + ordered_nodes = iset(nx.topological_sort(dag)) # add Operations evaluation steps, and instructions to free data. for i, node in enumerate(ordered_nodes): @@ -280,6 +280,7 @@ def _find_necessary_steps(self, outputs, inputs): unsatisfiables = self._collect_unsatisfiable_operations(necessary_nodes, inputs) necessary_nodes -= set(unsatisfiables) + self.compile(self.graph.subgraph(necessary_nodes)) necessary_steps = [step for step in self.steps if step in necessary_nodes] # save this result in a precomputed cache for future lookup @@ -310,10 +311,6 @@ def compute(self, outputs, named_inputs, method=None): assert isinstance(outputs, (list, tuple)) or outputs is None,\ "The outputs argument must be a list" - # Compile lazily here. - if not self.steps: - self.compile() - # choose a method of execution if method == "parallel": return self._compute_thread_pool_barrier_method(named_inputs, From eff351d8e10a40a1ecc25e5de2008e7a4cc56045 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Wed, 2 Oct 2019 23:21:59 +0300 Subject: [PATCH 16/28] REFACT(NET) COMPILE+COMPUTE... + Read the next doc-only commit to understand changes. + Renamed: + net.steps --> net.execution_plan. + (old)compile() --> _build_execution_plan() + _find_necessary_steps() --> (new)compile() + _solve_dag() compile() became the master function invoking _solve_dag & _build-execution_plan(), and do the caching. + refact(compute()): extract common tasks from sequential/parallel. + refact show_layers() to allow full-print, geting also string (not just printing), and using custom classes for representation. + Raise AssertionError when invalid class in plan. it's a logic error, not a language type-error. --- graphkit/functional.py | 6 +- graphkit/network.py | 181 ++++++++++++++++++++--------------------- test/test_graphkit.py | 8 +- 3 files changed, 97 insertions(+), 98 deletions(-) diff --git a/graphkit/functional.py b/graphkit/functional.py index dcbe2042..c113a298 100644 --- a/graphkit/functional.py +++ b/graphkit/functional.py @@ -190,7 +190,9 @@ def __call__(self, *operations): merge_set = iset() # Preseve given node order. for op in operations: if isinstance(op, NetworkOperation): - net_ops = filter(lambda x: isinstance(x, Operation), op.net.steps) + op.net.compile() + net_ops = filter(lambda x: isinstance(x, Operation), + op.net.execution_plan) merge_set.update(net_ops) else: merge_set.add(op) @@ -205,7 +207,7 @@ def order_preserving_uniquifier(seq, seen=None): needs = order_preserving_uniquifier(chain(*[op.needs for op in operations]), set(provides)) # unordered, not iterated - # compile network + # Build network net = Network() for op in operations: net.add_op(op) diff --git a/graphkit/network.py b/graphkit/network.py index 60d959b1..e1395141 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -72,14 +72,14 @@ def __init__(self, **kwargs): # this holds the timing information for eache layer self.times = {} - # a compiled list of steps to evaluate layers *in order* and free mem. - self.steps = [] + #: The list of operation-nodes & *instructions* needed to evaluate + #: the given inputs & asked outputs, free memory and avoid overwritting + #: any given intermediate inputs. + self.execution_plan = [] - # This holds a cache of results for the _find_necessary_steps - # function, this helps speed up the compute call as well avoid - # a multithreading issue that is occuring when accessing the - # graph in networkx - self._necessary_steps_cache = {} + #: Speed up :meth:`compile()` call and avoid a multithreading issue(?) + #: that is occuring when accessing the dag in networkx. + self._cached_execution_plans = {} def add_op(self, operation): @@ -107,28 +107,27 @@ def add_op(self, operation): for p in operation.provides: self.graph.add_edge(operation, DataPlaceholderNode(p)) - # clear compiled steps (must recompile after adding new layers) - self.steps = [] + def list_layers(self, debug=False): + # Make a generic plan. + plan = self._build_execution_plan(self.graph) + return [n for n in plan if debug or isinstance(n, Operation)] - def list_layers(self): - assert self.steps, "network must be compiled before listing layers." - return [(s.name, s) for s in self.steps if isinstance(s, Operation)] - - - def show_layers(self): - """Shows info (name, needs, and provides) about all layers in this network.""" - for name, step in self.list_layers(): - print("layer_name: ", name) - print("\t", "needs: ", step.needs) - print("\t", "provides: ", step.provides) - print("") + def show_layers(self, debug=False, ret=False): + """Shows info (name, needs, and provides) about all operations in this dag.""" + s = "\n".join(repr(n) for n in self.list_layers(debug=debug)) + if ret: + return s + else: + print(s) - def compile(self, dag): + def _build_execution_plan(self, dag): """ - Create a list of operations to evaluate layers and free memory asap + Create the list of operation-nodes & *instructions* evaluating all + operations & instructions needed a) to free memory and b) avoid + overwritting given intermediate inputs. In the list :class:`DeleteInstructions` steps (DA) are inserted between operation nodes to reduce the memory footprint of cached results. @@ -136,10 +135,10 @@ def compile(self, dag): further down the DAG. Note that since the *cache* is not reused across `compute()` invocations, any memory-reductions are for as long as a single computation runs. + """ - # clear compiled steps - self.steps = [] + plan = [] # create an execution order such that each layer's needs are provided. ordered_nodes = iset(nx.topological_sort(dag)) @@ -152,8 +151,7 @@ def compile(self, dag): elif isinstance(node, Operation): - # add layer to list of steps - self.steps.append(node) + plan.append(node) # Add instructions to delete predecessors as possible. A # predecessor may be deleted if it is a data placeholder that @@ -167,11 +165,12 @@ def compile(self, dag): else: if self._debug: print(" adding delete instruction for %s" % need) - self.steps.append(DeleteInstruction(need)) + plan.append(DeleteInstruction(need)) else: - raise TypeError("Unrecognized network graph node %s" % type(node)) + raise AssertionError("Unrecognized network graph node %r" % node) + return plan def _collect_unsatisfiable_operations(self, necessary_nodes, inputs): """ @@ -183,7 +182,7 @@ def _collect_unsatisfiable_operations(self, necessary_nodes, inputs): :param necessary_nodes: the subset of the graph to consider but WITHOUT the initial data - (because that is what :meth:`_find_necessary_steps()` can gives us...) + (because that is what :meth:`compile()` can gives us...) :param inputs: an iterable of the names of the input values return: @@ -203,42 +202,36 @@ def _collect_unsatisfiable_operations(self, necessary_nodes, inputs): ok_data.update(G.adj[node]) else: unsatisfiables.append(node) - elif isinstance(node, (DataPlaceholderNode, str)) and node in ok_data: - # mark satisfied-needs on all future operations - for future_op in G.adj[node]: - op_satisfaction[future_op].add(node) + elif isinstance(node, (DataPlaceholderNode, str)): # `str` are givens + if node in ok_data: + # mark satisfied-needs on all future operations + for future_op in G.adj[node]: + op_satisfaction[future_op].add(node) + else: + raise AssertionError("Unrecognized network graph node %r" % node) return unsatisfiables - def _find_necessary_steps(self, outputs, inputs): + def _solve_dag(self, outputs, inputs): """ - Determines what graph steps need to pe run to get to the requested + Determines what graph steps need to run to get to the requested outputs from the provided inputs. Eliminates steps that come before (in topological order) any inputs that have been provided. Also eliminates steps that are not on a path from the provided inputs to the requested outputs. - :param list outputs: + :param iterable outputs: A list of desired output names. This can also be ``None``, in which case the necessary steps are all graph nodes that are reachable from one of the provided inputs. - :param dict inputs: + :param iterable inputs: A dictionary mapping names to values for all provided inputs. - :returns: - Returns a list of all the steps that need to be run for the - provided inputs and requested outputs. - """ - - # return steps if it has already been computed before for this set of inputs and outputs - outputs = tuple(sorted(outputs)) if isinstance(outputs, (list, set, iset)) else outputs - inputs_keys = tuple(sorted(inputs.keys())) - cache_key = (inputs_keys, outputs) - if cache_key in self._necessary_steps_cache: - return self._necessary_steps_cache[cache_key] + :return: + """ graph = self.graph if not outputs: @@ -280,14 +273,31 @@ def _find_necessary_steps(self, outputs, inputs): unsatisfiables = self._collect_unsatisfiable_operations(necessary_nodes, inputs) necessary_nodes -= set(unsatisfiables) - self.compile(self.graph.subgraph(necessary_nodes)) - necessary_steps = [step for step in self.steps if step in necessary_nodes] + shrinked_dag = graph.subgraph(necessary_nodes) - # save this result in a precomputed cache for future lookup - self._necessary_steps_cache[cache_key] = necessary_steps + return shrinked_dag + + + def compile(self, outputs=(), inputs=()): + """ + See :meth:`_solve_dag()` for parameters and description + + Handles caching of solved dag and sets the :attr:`execution_plan`. + """ + + # return steps if it has already been computed before for this set of inputs and outputs + if outputs is not None and not isinstance(outputs, str): + outputs = tuple(sorted(outputs)) + inputs_keys = tuple(sorted(inputs)) + cache_key = (inputs_keys, outputs) + if cache_key in self._cached_execution_plans: + self.execution_plan = self._cached_execution_plans[cache_key] + else: + dag = self._solve_dag(outputs, inputs) + plan = self._build_execution_plan(dag) + # save this result in a precomputed cache for future lookup + self.execution_plan = self._cached_execution_plans[cache_key] = plan - # Return an ordered list of the needed steps. - return necessary_steps def compute(self, outputs, named_inputs, method=None): @@ -311,17 +321,31 @@ def compute(self, outputs, named_inputs, method=None): assert isinstance(outputs, (list, tuple)) or outputs is None,\ "The outputs argument must be a list" + # start with fresh data cache + cache = {} + cache.update(named_inputs) + self.compile(outputs, named_inputs.keys()) + # choose a method of execution if method == "parallel": - return self._compute_thread_pool_barrier_method(named_inputs, - outputs) + self._compute_thread_pool_barrier_method(cache) else: - return self._compute_sequential_method(named_inputs, - outputs) + self._compute_sequential_method(cache, outputs) + if not outputs: + # Return the whole cache as output, including input and + # intermediate data nodes. + return cache - def _compute_thread_pool_barrier_method(self, named_inputs, outputs, - thread_pool_size=10): + else: + # Filter outputs to just return what's needed. + # Note: list comprehensions exist in python 2.7+ + return dict(i for i in cache.items() if i[0] in outputs) + + + def _compute_thread_pool_barrier_method( + self, cache, thread_pool_size=10 + ): """ This method runs the graph using a parallel pool of thread executors. You may achieve lower total latency if your graph is sufficiently @@ -334,9 +358,6 @@ def _compute_thread_pool_barrier_method(self, named_inputs, outputs, self._thread_pool = Pool(thread_pool_size) pool = self._thread_pool - cache = {} - cache.update(named_inputs) - necessary_nodes = self._find_necessary_steps(outputs, named_inputs) # this keeps track of all nodes that have already executed has_executed = set() # unordered, not iterated @@ -349,7 +370,7 @@ def _compute_thread_pool_barrier_method(self, named_inputs, outputs, # the upnext list contains a list of operations for scheduling # in the current round of scheduling upnext = [] - for node in necessary_nodes: + for node in self.execution_plan: # only delete if all successors for the data node have been executed if isinstance(node, DeleteInstruction): if ready_to_delete_data_node(node, @@ -378,27 +399,13 @@ def _compute_thread_pool_barrier_method(self, named_inputs, outputs, cache.update(result) has_executed.add(op) - if not outputs: - return cache - else: - return {k: cache[k] for k in iter(cache) if k in outputs} - def _compute_sequential_method(self, named_inputs, outputs): + def _compute_sequential_method(self, cache, outputs): """ This method runs the graph one operation at a time in a single thread """ - # start with fresh data cache - cache = {} - - # add inputs to data cache - cache.update(named_inputs) - - # Find the subset of steps we need to run to get to the requested - # outputs from the provided inputs. - all_steps = self._find_necessary_steps(outputs, named_inputs) - self.times = {} - for step in all_steps: + for step in self.execution_plan: if isinstance(step, Operation): @@ -435,17 +442,7 @@ def _compute_sequential_method(self, named_inputs, outputs): cache.pop(step) else: - raise TypeError("Unrecognized instruction.") - - if not outputs: - # Return the whole cache as output, including input and - # intermediate data nodes. - return cache - - else: - # Filter outputs to just return what's needed. - # Note: list comprehensions exist in python 2.7+ - return {k: cache[k] for k in iter(cache) if k in outputs} + raise AssertionError("Unrecognized instruction.%r" % step) def plot(self, filename=None, show=False): diff --git a/test/test_graphkit.py b/test/test_graphkit.py index dd0ef5cf..be4b0e86 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -361,19 +361,19 @@ def test_deleteinstructs_vary_with_inputs(): exp = inp.copy(); exp.update({"aa": 2, "ab": 5, "asked": 7}) res = netop(inp) assert res == exp # ok - steps11 = netop.net.steps + steps11 = netop.net.execution_plan res = netop(inp, outputs=["asked"]) assert res == filtdict(exp, "asked") # ok - steps12 = netop.net.steps + steps12 = netop.net.execution_plan inp = {"a": 2} exp = inp.copy(); exp.update({"aa": 2, "asked": 12}) res = netop(inp) assert res == exp # ok - steps21 = netop.net.steps + steps21 = netop.net.execution_plan res = netop(inp, outputs=["asked"]) assert res == filtdict(exp, "asked") # ok - steps22 = netop.net.steps + steps22 = netop.net.execution_plan assert steps11 == steps12 assert steps21 == steps22 From d9594855e7ceba6d353fd98ba165623d74f0b6ff Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Wed, 2 Oct 2019 19:55:49 +0300 Subject: [PATCH 17/28] doc(net): explain new DAG SOLUTION --- graphkit/network.py | 59 ++++++++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index e1395141..0000dd52 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -27,8 +27,8 @@ def __repr__(self): class DeleteInstruction(str): """ - An instruction in the compiled list of operation steps to free or delete - a Data instance from the Network's cache after it is no longer needed. + An instruction in the *execution plan* to free or delete a Data instance + from the Network's cache after it is no longer needed. """ def __repr__(self): return 'DeleteInstruction("%s")' % self @@ -41,24 +41,33 @@ class Network(object): and pass data through. The computation, ie the execution of the *operations* for given *inputs* - and asked *outputs* is based on 4 data-structures: + and asked *outputs* is based on 3 data-structures: - - The `networkx` :attr:`graph` DAG, containing interchanging layers of + - The ``networkx`` :attr:`graph` DAG, containing interchanging layers of :class:`Operation` and :class:`DataPlaceholderNode` nodes. - They are layed out and connected by :meth:`add_OP`. + They are layed out and connected by repeated calls of :meth:`add_OP`. - - the :attr:`steps` list holding all operation nodes in *execution order*. - It is constructed in :meth:`compile()` after all nodes have been added - into the `graph`. + When the computation starts, :meth:`compile()` extracts a *DAG subgraph* + by *pruning* nodes based on given inputs and requested outputs. + This subgraph is used to decide the `execution_plan` (see below), and + and is cached in :attr:`_cached_execution_plans` across runs with + thre inputs/outputs as key. - - The ``necessary_steps`` list which is the *DAG solution* of each run, and - is always a subset of :attr:`steps`. - It is computed by :meth:`_find_necessary_steps()` and cached in - :attr:`_necessary_steps_cache` across runs with the same inputs/outputs. + - the :attr:`execution_plan` lists the operation-nodes & *instructions* + needed to run a complete computation. + It is built in :meth:`_build_execution_plan()` based on the subgraph + extracted above. The *instructions* items achieve the following: + + - :class:`DeleteInstruction`: delete items from values-cache as soon as + they are not needed further down the dag, to reduce memory footprint + while computing. + + - :class:`PinInstruction`: avoid overwritting any given intermediate + inputs, and still allow their producing operations to run. - the :var:`cache` local-var, initialized on each run of both ``_compute_xxx`` methods (for parallel or sequential executions), to - holding all given input & generated (aka intermediate) data values. + hold all given input & generated (aka intermediate) data values. """ def __init__(self, **kwargs): @@ -85,8 +94,8 @@ def __init__(self, **kwargs): def add_op(self, operation): """ Adds the given operation and its data requirements to the network graph - based on the name of the operation, the names of the operation's needs, and - the names of the data it provides. + based on the name of the operation, the names of the operation's needs, + and the names of the data it provides. :param Operation operation: Operation object to add. """ @@ -125,10 +134,13 @@ def show_layers(self, debug=False, ret=False): def _build_execution_plan(self, dag): """ Create the list of operation-nodes & *instructions* evaluating all - + operations & instructions needed a) to free memory and b) avoid overwritting given intermediate inputs. + :param dag: + as shrinked by :meth:`compile()` + In the list :class:`DeleteInstructions` steps (DA) are inserted between operation nodes to reduce the memory footprint of cached results. A DA is inserted whenever a *need* is not used by any other *operation* @@ -227,9 +239,10 @@ def _solve_dag(self, outputs, inputs): from one of the provided inputs. :param iterable inputs: - A dictionary mapping names to values for all provided inputs. + The inputs names of all given inputs. :return: + the subgraph comprising the solution """ graph = self.graph @@ -280,9 +293,17 @@ def _solve_dag(self, outputs, inputs): def compile(self, outputs=(), inputs=()): """ - See :meth:`_solve_dag()` for parameters and description + Solve dag, set the :attr:`execution_plan` and cache it. + + See :meth:`_solve_dag()` for description + + :param iterable outputs: + A list of desired output names. This can also be ``None``, in which + case the necessary steps are all graph nodes that are reachable + from one of the provided inputs. - Handles caching of solved dag and sets the :attr:`execution_plan`. + :param dict inputs: + The inputs names of all given inputs. """ # return steps if it has already been computed before for this set of inputs and outputs From 17eb2fdfbf63c8f798a34720abe9371a3d189137 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Thu, 3 Oct 2019 15:29:12 +0300 Subject: [PATCH 18/28] FIX(net): new Ops invalidate execution-plan cache... Probaly unreported bug in v1.2.4 for '_neccessary_steps_cache`. --- graphkit/network.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/graphkit/network.py b/graphkit/network.py index 0000dd52..06447271 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -108,6 +108,9 @@ def add_op(self, operation): # assert layer is only added once to graph assert operation not in self.graph.nodes(), "Operation may only be added once" + ## Invalidate old plans. + self._cached_execution_plans = {} + # add nodes and edges to graph describing the data needs for this layer for n in operation.needs: self.graph.add_edge(DataPlaceholderNode(n), operation) From 0830b7ce0d5cd44f888397e909af00ec1828e494 Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Thu, 3 Oct 2019 16:27:07 +0300 Subject: [PATCH 19/28] ENH(DAG): NEW SOLVER + Pruning behaves correctly also when outputs given; this happens by breaking incoming provide-links to any given intermedediate inputs. + Unsatisfied detection now includes those without outputs due to broken links (above). + Remove some uneeded "glue" from unsatisfied-detection code, leftover from previous compile() refactoring. + Renamed satisfiable --> satisfied. + Improved unknown output requested raise-message. + x3 TCs PASS, x1 in #24 and the first x2 in #25. - 1x TCs in #25 still FAIL, and need "Pinning" of given-inputs (the operation MUST and MUST NOT run in these cases). --- graphkit/network.py | 156 +++++++++++++++++++++++------------------- test/test_graphkit.py | 5 +- 2 files changed, 87 insertions(+), 74 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index 06447271..4ef4b4c4 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -137,12 +137,12 @@ def show_layers(self, debug=False, ret=False): def _build_execution_plan(self, dag): """ Create the list of operation-nodes & *instructions* evaluating all - + operations & instructions needed a) to free memory and b) avoid overwritting given intermediate inputs. :param dag: - as shrinked by :meth:`compile()` + the original dag but "shrinked", not "broken" In the list :class:`DeleteInstructions` steps (DA) are inserted between operation nodes to reduce the memory footprint of cached results. @@ -187,45 +187,57 @@ def _build_execution_plan(self, dag): return plan - def _collect_unsatisfiable_operations(self, necessary_nodes, inputs): + def _collect_unsatisfied_operations(self, dag, inputs): """ - Traverse ordered graph and mark satisfied needs on each operation, + Traverse topologically sorted dag to collect un-satisfied operations. + + Unsatisfied operations are those suffering from ANY of the following: - collecting those missing at least one. - Since the graph is ordered, as soon as we're on an operation, - all its needs have been accounted, so we can get its satisfaction. + - They are missing at least one compulsory need-input. + Since the dag is ordered, as soon as we're on an operation, + all its needs have been accounted, so we can get its satisfaction. - :param necessary_nodes: - the subset of the graph to consider but WITHOUT the initial data - (because that is what :meth:`compile()` can gives us...) + - Their provided outputs are not linked to any data in the dag. + An operation might not have any output link when :meth:`_solve_dag()` + has broken them, due to given intermediate inputs. + + :param dag: + the graph to consider :param inputs: an iterable of the names of the input values return: - a list of unsatisfiable operations + a list of unsatisfied operations to prune """ - G = self.graph # shortcut - ok_data = set(inputs) # to collect producible data - op_satisfaction = defaultdict(set) # to collect operation satisfiable needs - unsatisfiables = [] # to collect operations with partial needs - # We also need inputs to mark op_satisfaction. - nodes = chain(necessary_nodes, inputs) # note that `inputs` are plain strings - for node in nx.topological_sort(G.subgraph(nodes)): + # To collect data that will be produced. + ok_data = set(inputs) + # To colect the map of operations --> satisfied-needs. + op_satisfaction = defaultdict(set) + # To collect the operations to drop. + unsatisfied = [] + for node in nx.topological_sort(dag): if isinstance(node, Operation): - real_needs = set(n for n in node.needs if not isinstance(n, optional)) - if real_needs.issubset(op_satisfaction[node]): - # mark all future data-provides as ok - ok_data.update(G.adj[node]) + if not dag.adj[node]: + # Prune operations that ended up providing no output. + unsatisfied.append(node) else: - unsatisfiables.append(node) + real_needs = set(n for n in node.needs + if not isinstance(n, optional)) + if real_needs.issubset(op_satisfaction[node]): + # We have a satisfied operation; mark its output-data + # as ok. + ok_data.update(dag.adj[node]) + else: + # Prune operations with partial inputs. + unsatisfied.append(node) elif isinstance(node, (DataPlaceholderNode, str)): # `str` are givens if node in ok_data: # mark satisfied-needs on all future operations - for future_op in G.adj[node]: + for future_op in dag.adj[node]: op_satisfaction[future_op].add(node) else: raise AssertionError("Unrecognized network graph node %r" % node) - return unsatisfiables + return unsatisfied def _solve_dag(self, outputs, inputs): @@ -245,60 +257,56 @@ def _solve_dag(self, outputs, inputs): The inputs names of all given inputs. :return: - the subgraph comprising the solution - + the *execution plan* """ - graph = self.graph - if not outputs: + dag = self.graph - # If caller requested all outputs, the necessary nodes are all - # nodes that are reachable from one of the inputs. Ignore input - # names that aren't in the graph. - necessary_nodes = set() # unordered, not iterated - for input_name in iter(inputs): - if graph.has_node(input_name): - necessary_nodes |= nx.descendants(graph, input_name) + # Ignore input names that aren't in the graph. + graph_inputs = iset(dag.nodes) & inputs # preserve order - else: + # Scream if some requested outputs aren't in the graph. + unknown_outputs = iset(outputs) - dag.nodes + if unknown_outputs: + raise ValueError( + "Unknown output node(s) requested: %s" + % ", ".join(unknown_outputs)) + + broken_dag = dag.copy() # preserve net's graph - # If the caller requested a subset of outputs, find any nodes that - # are made unecessary because we were provided with an input that's - # deeper into the network graph. Ignore input names that aren't - # in the graph. - unnecessary_nodes = set() # unordered, not iterated - for input_name in iter(inputs): - if graph.has_node(input_name): - unnecessary_nodes |= nx.ancestors(graph, input_name) - - # Find the nodes we need to be able to compute the requested - # outputs. Raise an exception if a requested output doesn't - # exist in the graph. - necessary_nodes = set() # unordered, not iterated - for output_name in outputs: - if not graph.has_node(output_name): - raise ValueError("graphkit graph does not have an output " - "node named %s" % output_name) - necessary_nodes |= nx.ancestors(graph, output_name) - - # Get rid of the unnecessary nodes from the set of necessary ones. - necessary_nodes -= unnecessary_nodes - - # Drop (un-satifiable) operations with partial inputs. + # Break the incoming edges to all given inputs. + # + # Nodes producing any given intermediate inputs are unecessary + # (unless they are also used elsewhere). + # To discover which ones to prune, we break their incoming edges + # and they will drop out while collecting ancestors from the outputs. + for given in graph_inputs: + broken_dag.remove_edges_from(list(broken_dag.in_edges(given))) + + if outputs: + # If caller requested specific outputs, we can prune any + # unrelated nodes further up the dag. + ending_in_outputs = set() + for input_name in outputs: + ending_in_outputs.update(nx.ancestors(dag, input_name)) + broken_dag = broken_dag.subgraph(ending_in_outputs | set(outputs)) + + + # Prune (un-satifiable) operations with partial inputs. # See yahoo/graphkit#18 # - unsatisfiables = self._collect_unsatisfiable_operations(necessary_nodes, inputs) - necessary_nodes -= set(unsatisfiables) + unsatisfied = self._collect_unsatisfied_operations(broken_dag, inputs) + shrinked_dag = dag.subgraph(broken_dag.nodes - unsatisfied) - shrinked_dag = graph.subgraph(necessary_nodes) + plan = self._build_execution_plan(shrinked_dag) - return shrinked_dag + return plan def compile(self, outputs=(), inputs=()): """ - Solve dag, set the :attr:`execution_plan` and cache it. + Solve dag, set the :attr:`execution_plan`, and cache it. - See :meth:`_solve_dag()` for description + See :meth:`_solve_dag()` for detailed description. :param iterable outputs: A list of desired output names. This can also be ``None``, in which @@ -306,7 +314,7 @@ def compile(self, outputs=(), inputs=()): from one of the provided inputs. :param dict inputs: - The inputs names of all given inputs. + The input names of all given inputs. """ # return steps if it has already been computed before for this set of inputs and outputs @@ -317,8 +325,7 @@ def compile(self, outputs=(), inputs=()): if cache_key in self._cached_execution_plans: self.execution_plan = self._cached_execution_plans[cache_key] else: - dag = self._solve_dag(outputs, inputs) - plan = self._build_execution_plan(dag) + plan = self._solve_dag(outputs, inputs) # save this result in a precomputed cache for future lookup self.execution_plan = self._cached_execution_plans[cache_key] = plan @@ -338,6 +345,10 @@ def compute(self, outputs, named_inputs, method=None): and the values are the concrete values you want to set for the data node. + :param method: + if ``"parallel"``, launches multi-threading. + Set when invoking a composed graph or by + :meth:`~NetworkOperation.set_execution_method()`. :returns: a dictionary of output data objects, keyed by name. """ @@ -345,9 +356,10 @@ def compute(self, outputs, named_inputs, method=None): assert isinstance(outputs, (list, tuple)) or outputs is None,\ "The outputs argument must be a list" - # start with fresh data cache - cache = {} - cache.update(named_inputs) + # start with fresh data cache & overwrites + cache = named_inputs.copy() + + # Build and set :attr:`execution_plan`. self.compile(outputs, named_inputs.keys()) # choose a method of execution diff --git a/test/test_graphkit.py b/test/test_graphkit.py index be4b0e86..47f536b3 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -245,7 +245,7 @@ def test_pruning_multiouts_not_override_intermediates2(): operation(name="op2", needs=["d", "e"], provides=["asked"])(mul), ) - exp = {"a": 5, "overriden": 1, "c": 2, "asked": 3} + exp = {"a": 5, "overriden": 1, "c": 2, "d": 3, "e": 10, "asked": 30} # FAILs # - on v1.2.4 with (overriden, asked) = (5, 70) instead of (1, 13) # - on #18(unsatisfied) + #23(ordered-sets) like v1.2.4. @@ -265,12 +265,13 @@ def test_pruning_with_given_intermediate_and_asked_out(): operation(name="good_op", needs=["a", "given-2"], provides=["asked"])(add), ) - exp = {"given-1": 5, "b": 2, "given-2": 7, "a": 5, "asked": 12} + exp = {"given-1": 5, "b": 2, "given-2": 2, "a": 5, "asked": 7} # v1.2.4 is ok assert netop({"given-1": 5, "b": 2, "given-2": 2}) == exp # FAILS # - on v1.2.4 with KeyError: 'a', # - on #18 (unsatisfied) with no result. + # FIXED on #18+#26 (new dag solver). assert netop({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") From 0dc1293111506ed2a2121d297d9ff2b1afde55ef Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Thu, 3 Oct 2019 19:45:45 +0300 Subject: [PATCH 20/28] WIP/FIX(prune,#26): PIN intermediate inputs if operation before must run - WIP: PARALLEL execution not adding PINS! + Insert "PinInstructions" in the execution-plan to avoid overwritting. + Add `_overwrite_collector` in `compose()` to collect re-calculated values. + FIX the last TC in #25. --- graphkit/base.py | 42 ++++++++++++++++++++---- graphkit/network.py | 74 ++++++++++++++++++++++++++++++++----------- test/test_graphkit.py | 68 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 156 insertions(+), 28 deletions(-) diff --git a/graphkit/base.py b/graphkit/base.py index 1c04e8d5..2e036468 100644 --- a/graphkit/base.py +++ b/graphkit/base.py @@ -1,5 +1,10 @@ # Copyright 2016, Yahoo Inc. # Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +try: + from collections import abc +except ImportError: + import collections as abc + class Data(object): """ @@ -151,9 +156,12 @@ def __init__(self, **kwargs): # set execution mode to single-threaded sequential by default self._execution_method = "sequential" + self._overwrites_collector = None def _compute(self, named_inputs, outputs=None): - return self.net.compute(outputs, named_inputs, method=self._execution_method) + return self.net.compute( + outputs, named_inputs, method=self._execution_method, + overwrites_collector=self._overwrites_collector) def __call__(self, *args, **kwargs): return self._compute(*args, **kwargs) @@ -162,15 +170,35 @@ def set_execution_method(self, method): """ Determine how the network will be executed. - Args: - method: str - If "parallel", execute graph operations concurrently - using a threadpool. + :param str method: + If "parallel", execute graph operations concurrently + using a threadpool. """ - options = ['parallel', 'sequential'] - assert method in options + choices = ['parallel', 'sequential'] + if method not in choices: + raise ValueError( + "Invalid computation method %r! Must be one of %s" + (method, choices)) self._execution_method = method + def set_overwrites_collector(self, collector): + """ + Asks to put all *overwrites* into the `collector` after computing + + An "overwrites" is intermediate value calculated but NOT stored + into the results, becaues it has been given also as an intemediate + input value, and the operation that would overwrite it MUST run for + its other results. + + :param collector: + a mutable dict to be fillwed with named values + """ + if collector is not None and not isinstance(collector, abc.MutableMapping): + raise ValueError( + "Overwrites collector was not a MutableMapping, but: %r" + % collector) + self._overwrites_collector = collector + def plot(self, filename=None, show=False): self.net.plot(filename=filename, show=show) diff --git a/graphkit/network.py b/graphkit/network.py index 4ef4b4c4..a00a0c4c 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -34,6 +34,18 @@ def __repr__(self): return 'DeleteInstruction("%s")' % self +class PinInstruction(str): + """ + An instruction in the *execution plan* not to store the newly compute value + into network's values-cache but to pin it instead to some given value. + It is used ensure that given intermediate values are not overwritten when + their providing functions could not be avoided, because their other outputs + are needed elesewhere. + """ + def __repr__(self): + return 'PinInstruction("%s")' % self + + class Network(object): """ This is the main network implementation. The class contains all of the @@ -41,7 +53,7 @@ class Network(object): and pass data through. The computation, ie the execution of the *operations* for given *inputs* - and asked *outputs* is based on 3 data-structures: + and asked *outputs* is based on 4 data-structures: - The ``networkx`` :attr:`graph` DAG, containing interchanging layers of :class:`Operation` and :class:`DataPlaceholderNode` nodes. @@ -68,6 +80,12 @@ class Network(object): - the :var:`cache` local-var, initialized on each run of both ``_compute_xxx`` methods (for parallel or sequential executions), to hold all given input & generated (aka intermediate) data values. + + - the :var:`overwrites` local-var, initialized on each run of both + ``_compute_xxx`` methods (for parallel or sequential executions), to + hold values calculated but overwritten (aka "pinned") by intermediate + input-values. + """ def __init__(self, **kwargs): @@ -122,7 +140,7 @@ def add_op(self, operation): def list_layers(self, debug=False): # Make a generic plan. - plan = self._build_execution_plan(self.graph) + plan = self._build_execution_plan(self.graph, ()) return [n for n in plan if debug or isinstance(n, Operation)] @@ -134,7 +152,7 @@ def show_layers(self, debug=False, ret=False): else: print(s) - def _build_execution_plan(self, dag): + def _build_execution_plan(self, dag, inputs): """ Create the list of operation-nodes & *instructions* evaluating all @@ -142,7 +160,7 @@ def _build_execution_plan(self, dag): overwritting given intermediate inputs. :param dag: - the original dag but "shrinked", not "broken" + The original dag, pruned; not broken. In the list :class:`DeleteInstructions` steps (DA) are inserted between operation nodes to reduce the memory footprint of cached results. @@ -158,11 +176,15 @@ def _build_execution_plan(self, dag): # create an execution order such that each layer's needs are provided. ordered_nodes = iset(nx.topological_sort(dag)) - # add Operations evaluation steps, and instructions to free data. + # Add Operations evaluation steps, and instructions to free and "pin" + # data. for i, node in enumerate(ordered_nodes): if isinstance(node, DataPlaceholderNode): - continue + if node in inputs and dag.pred[node]: + # Command pinning only when there is another operation + # generating this data as output. + plan.append(PinInstruction(node)) elif isinstance(node, Operation): @@ -291,13 +313,11 @@ def _solve_dag(self, outputs, inputs): broken_dag = broken_dag.subgraph(ending_in_outputs | set(outputs)) - # Prune (un-satifiable) operations with partial inputs. - # See yahoo/graphkit#18 - # + # Prune unsatisfied operations (those with partial inputs or no outputs). unsatisfied = self._collect_unsatisfied_operations(broken_dag, inputs) - shrinked_dag = dag.subgraph(broken_dag.nodes - unsatisfied) + pruned_dag = dag.subgraph(broken_dag.nodes - unsatisfied) - plan = self._build_execution_plan(shrinked_dag) + plan = self._build_execution_plan(pruned_dag, inputs) return plan @@ -331,7 +351,8 @@ def compile(self, outputs=(), inputs=()): - def compute(self, outputs, named_inputs, method=None): + def compute( + self, outputs, named_inputs, method=None, overwrites_collector=None): """ Run the graph. Any inputs to the network must be passed in by name. @@ -350,6 +371,10 @@ def compute(self, outputs, named_inputs, method=None): Set when invoking a composed graph or by :meth:`~NetworkOperation.set_execution_method()`. + :param overwrites_collector: + (optional) a mutable dict to be fillwed with named values. + If missing, values are simply discarded. + :returns: a dictionary of output data objects, keyed by name. """ @@ -364,23 +389,34 @@ def compute(self, outputs, named_inputs, method=None): # choose a method of execution if method == "parallel": - self._compute_thread_pool_barrier_method(cache) + self._compute_thread_pool_barrier_method( + cache, overwrites_collector, named_inputs) else: - self._compute_sequential_method(cache, outputs) + self._compute_sequential_method( + cache, overwrites_collector, named_inputs, outputs) if not outputs: # Return the whole cache as output, including input and # intermediate data nodes. - return cache + result = cache else: # Filter outputs to just return what's needed. # Note: list comprehensions exist in python 2.7+ - return dict(i for i in cache.items() if i[0] in outputs) + result = dict(i for i in cache.items() if i[0] in outputs) + + return result + + + def _pin_data_in_cache(self, value_name, cache, inputs, overwrites): + value_name = str(value_name) + if overwrites is not None: + overwrites[value_name] = cache[value_name] + cache[value_name] = inputs[value_name] def _compute_thread_pool_barrier_method( - self, cache, thread_pool_size=10 + self, cache, overwrites, inputs, thread_pool_size=10 ): """ This method runs the graph using a parallel pool of thread executors. @@ -436,7 +472,7 @@ def _compute_thread_pool_barrier_method( has_executed.add(op) - def _compute_sequential_method(self, cache, outputs): + def _compute_sequential_method(self, cache, overwrites, inputs, outputs): """ This method runs the graph one operation at a time in a single thread """ @@ -477,6 +513,8 @@ def _compute_sequential_method(self, cache, outputs): print("removing data '%s' from cache." % step) cache.pop(step) + elif isinstance(step, PinInstruction): + self._pin_data_in_cache(step, cache, inputs, overwrites) else: raise AssertionError("Unrecognized instruction.%r" % step) diff --git a/test/test_graphkit.py b/test/test_graphkit.py index 47f536b3..ce9b80d6 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -13,6 +13,11 @@ from graphkit import operation, compose, Operation +def scream(*args, **kwargs): + raise AssertionError( + "Must not have run!\n args: %s\n kwargs: %s", (args, kwargs)) + + def identity(x): return x @@ -200,9 +205,9 @@ def test_pruning_raises_for_bad_output(): def test_pruning_not_overrides_given_intermediate(): - # Test #25: v1.2.4 overrides intermediate data when no output asked + # Test #25: v1.2.4 overwrites intermediate data when no output asked netop = compose(name="netop")( - operation(name="unjustly run", needs=["a"], provides=["overriden"])(identity), + operation(name="unjustly run", needs=["a"], provides=["overriden"])(scream), operation(name="op", needs=["overriden", "c"], provides=["asked"])(add), ) @@ -212,11 +217,24 @@ def test_pruning_not_overrides_given_intermediate(): # FAILs # - on v1.2.4 with (overriden, asked): = (5, 7) instead of (1, 3) # - on #18(unsatisfied) + #23(ordered-sets) with (overriden, asked) = (5, 7) instead of (1, 3) + # FIXED on #26 + assert netop({"a": 5, "overriden": 1, "c": 2}) == exp + + ## Test OVERWITES + # + overwrites = {} + netop.set_overwrites_collector(overwrites) + assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert overwrites == {} # unjust must have been pruned + + overwrites = {} + netop.set_overwrites_collector(overwrites) assert netop({"a": 5, "overriden": 1, "c": 2}) == exp + assert overwrites == {} # unjust must have been pruned def test_pruning_multiouts_not_override_intermediates1(): - # Test #25: v.1.2.4 overrides intermediate data when a previous operation + # Test #25: v.1.2.4 overwrites intermediate data when a previous operation # must run for its other outputs (outputs asked or not) netop = compose(name="netop")( operation(name="must run", needs=["a"], provides=["overriden", "calced"]) @@ -228,11 +246,30 @@ def test_pruning_multiouts_not_override_intermediates1(): # FAILs # - on v1.2.4 with (overriden, asked) = (5, 15) instead of (1, 11) # - on #18(unsatisfied) + #23(ordered-sets) like v1.2.4. + # FIXED on #26 assert netop({"a": 5, "overriden": 1}) == exp # FAILs # - on v1.2.4 with KeyError: 'e', # - on #18(unsatisfied) + #23(ordered-sets) with empty result. + # FIXED on #26 + assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + + ## Test OVERWITES + # + overwrites = {} + netop.set_overwrites_collector(overwrites) + assert netop({"a": 5, "overriden": 1}) == exp + assert overwrites == {'overriden': 5} + + overwrites = {} + netop.set_overwrites_collector(overwrites) assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert overwrites == {'overriden': 5} + + # ## Test parallel + # netop.set_execution_method("parallel") + # assert netop({"a": 5, "overriden": 1}) == exp + # assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") def test_pruning_multiouts_not_override_intermediates2(): @@ -249,11 +286,25 @@ def test_pruning_multiouts_not_override_intermediates2(): # FAILs # - on v1.2.4 with (overriden, asked) = (5, 70) instead of (1, 13) # - on #18(unsatisfied) + #23(ordered-sets) like v1.2.4. + # FIXED on #26 assert netop({"a": 5, "overriden": 1, "c": 2}) == exp # FAILs # - on v1.2.4 with KeyError: 'e', # - on #18(unsatisfied) + #23(ordered-sets) with empty result. assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + # FIXED on #26 + + ## Test OVERWITES + # + overwrites = {} + netop.set_overwrites_collector(overwrites) + assert netop({"a": 5, "overriden": 1, "c": 2}) == exp + assert overwrites == {'overriden': 5} + + overwrites = {} + netop.set_overwrites_collector(overwrites) + assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert overwrites == {'overriden': 5} def test_pruning_with_given_intermediate_and_asked_out(): @@ -274,6 +325,17 @@ def test_pruning_with_given_intermediate_and_asked_out(): # FIXED on #18+#26 (new dag solver). assert netop({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") + ## Test OVERWITES + # + overwrites = {} + netop.set_overwrites_collector(overwrites) + assert netop({"given-1": 5, "b": 2, "given-2": 2}) == exp + assert overwrites == {} + + overwrites = {} + netop.set_overwrites_collector(overwrites) + assert netop({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") + assert overwrites == {} def test_unsatisfied_operations(): # Test that operations with partial inputs are culled and not failing. From 06f6554869832b89713250971e903ead0934716e Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Fri, 4 Oct 2019 07:01:35 +0300 Subject: [PATCH 21/28] REFACT(net): part 3 of new dag-solver & pin refactoring - STILL buggy PIN on PARALLEL, 2 DISABLED TCs FAIL: - test_pruning_with_given_intermediate_and_asked_out() - test_unsatisfied_operations_same_out() + move check if value in asked outputs before cache-evicting it in build-execution-plan time - compute methods don't need outputs anymore. + test: speed up parallel/multihtread TCs by reducing delays & repetitions. + refact: network rightfully adopted stray functions for parallel processing - they all worke on the net.graph, + upd: networkx api by indexing on `dag.nodes` views. + enh: add log message when deleting in parallel (in par with sequential code). + refact: var-renames, if-then-else simplifications, pythonisms. + doc: A lot! --- graphkit/network.py | 234 +++++++++++++++++++++++------------------- test/test_graphkit.py | 174 ++++++++++++++++++++----------- 2 files changed, 242 insertions(+), 166 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index a00a0c4c..d5c15539 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -1,6 +1,6 @@ # Copyright 2016, Yahoo Inc. # Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. - +"""" The main implementation of the network of operations & data to compute. """ import time import os import networkx as nx @@ -18,8 +18,7 @@ class DataPlaceholderNode(str): """ - A node for the Network graph that describes the name of a Data instance - produced or required by a layer. + Dag node naming a data-value produced or required by an operation. """ def __repr__(self): return 'DataPlaceholderNode("%s")' % self @@ -27,8 +26,11 @@ def __repr__(self): class DeleteInstruction(str): """ - An instruction in the *execution plan* to free or delete a Data instance - from the Network's cache after it is no longer needed. + Execution step to delete a computed value from the network's ``cache``. + + It is an :attr:`Network.execution_plan` step for the data-node `str` that + frees its data-value from ``cache`` after it is no longer needed, + to reduce memory footprint while computing the pipeline. """ def __repr__(self): return 'DeleteInstruction("%s")' % self @@ -36,10 +38,14 @@ def __repr__(self): class PinInstruction(str): """ - An instruction in the *execution plan* not to store the newly compute value - into network's values-cache but to pin it instead to some given value. - It is used ensure that given intermediate values are not overwritten when - their providing functions could not be avoided, because their other outputs + Execution step to replace a computed value in the ``cache`` from the inputs, + + and to store the computed one in the ``overwrites`` instead + (both ``cache`` & ``overwrites`` are local-vars in :meth:`Network.compute()`). + + It is an :attr:`Network.execution_plan` step for the data-node `str` that + ensures the corresponding intermediate input-value is not overwritten when + its providing function(s) could not be pruned, because their other outputs are needed elesewhere. """ def __repr__(self): @@ -48,26 +54,36 @@ def __repr__(self): class Network(object): """ - This is the main network implementation. The class contains all of the - code necessary to weave together operations into a directed-acyclic-graph (DAG) - and pass data through. + Assemble operations & data into a directed-acyclic-graph (DAG) and run them + + based on the given input values and requested outputs. - The computation, ie the execution of the *operations* for given *inputs* - and asked *outputs* is based on 4 data-structures: + The execution of *operations* (a computation) is splitted in 2 phases: - - The ``networkx`` :attr:`graph` DAG, containing interchanging layers of + - COMPILE: prune, sort topologically the nodes in the dag, solve it, and + derive the *execution plan* (see below) based on the given *inputs* + and asked *outputs*. + + - EXECUTE: sequential or parallel invocation of the underlying functions + of the operations. + + is based on 4 data-structures: + + - the ``networkx`` :attr:`graph` DAG, containing interchanging layers of :class:`Operation` and :class:`DataPlaceholderNode` nodes. They are layed out and connected by repeated calls of :meth:`add_OP`. - When the computation starts, :meth:`compile()` extracts a *DAG subgraph* - by *pruning* nodes based on given inputs and requested outputs. + The computation starts with :meth:`_solve_dag()` extracting + a *DAG subgraph* by *pruning* nodes based on given inputs and + requested outputs. This subgraph is used to decide the `execution_plan` (see below), and and is cached in :attr:`_cached_execution_plans` across runs with - thre inputs/outputs as key. + inputs/outputs as key. - - the :attr:`execution_plan` lists the operation-nodes & *instructions* - needed to run a complete computation. - It is built in :meth:`_build_execution_plan()` based on the subgraph + - the :attr:`execution_plan` is the list of the operation-nodes only + from the dag (above), topologically sorted, and interspersed with + *instructions steps* needed to complete the run. + It is built by :meth:`_build_execution_plan()` based on the subgraph dag extracted above. The *instructions* items achieve the following: - :class:`DeleteInstruction`: delete items from values-cache as soon as @@ -75,11 +91,12 @@ class Network(object): while computing. - :class:`PinInstruction`: avoid overwritting any given intermediate - inputs, and still allow their producing operations to run. + inputs, and still allow their providing operations to run + (because they are needed for their other outputs). - - the :var:`cache` local-var, initialized on each run of both - ``_compute_xxx`` methods (for parallel or sequential executions), to - hold all given input & generated (aka intermediate) data values. + - the :var:`cache` local-var in :meth:`compute()`, initialized on each run + to hold the values of the given inputs, generated (aka intermediate) data, + and output values. - the :var:`overwrites` local-var, initialized on each run of both ``_compute_xxx`` methods (for parallel or sequential executions), to @@ -124,7 +141,7 @@ def add_op(self, operation): assert operation.provides is not None, "Operation's 'provides' must be named" # assert layer is only added once to graph - assert operation not in self.graph.nodes(), "Operation may only be added once" + assert operation not in self.graph.nodes, "Operation may only be added once" ## Invalidate old plans. self._cached_execution_plans = {} @@ -152,7 +169,7 @@ def show_layers(self, debug=False, ret=False): else: print(s) - def _build_execution_plan(self, dag, inputs): + def _build_execution_plan(self, dag, inputs, outputs): """ Create the list of operation-nodes & *instructions* evaluating all @@ -161,6 +178,8 @@ def _build_execution_plan(self, dag, inputs): :param dag: The original dag, pruned; not broken. + :param outputs: + outp-names to decide whether to add (and which) del-instructions In the list :class:`DeleteInstructions` steps (DA) are inserted between operation nodes to reduce the memory footprint of cached results. @@ -187,9 +206,12 @@ def _build_execution_plan(self, dag, inputs): plan.append(PinInstruction(node)) elif isinstance(node, Operation): - plan.append(node) + # Keep all values in cache if not specific outputs asked. + if not outputs: + continue + # Add instructions to delete predecessors as possible. A # predecessor may be deleted if it is a data placeholder that # is no longer needed by future Operations. @@ -197,12 +219,16 @@ def _build_execution_plan(self, dag, inputs): if self._debug: print("checking if node %s can be deleted" % need) for future_node in ordered_nodes[i+1:]: - if isinstance(future_node, Operation) and need in future_node.needs: + if ( + isinstance(future_node, Operation) + and need in future_node.needs + ): break else: - if self._debug: - print(" adding delete instruction for %s" % need) - plan.append(DeleteInstruction(need)) + if need not in outputs: + if self._debug: + print(" adding delete instruction for %s" % need) + plan.append(DeleteInstruction(need)) else: raise AssertionError("Unrecognized network graph node %r" % node) @@ -317,7 +343,7 @@ def _solve_dag(self, outputs, inputs): unsatisfied = self._collect_unsatisfied_operations(broken_dag, inputs) pruned_dag = dag.subgraph(broken_dag.nodes - unsatisfied) - plan = self._build_execution_plan(pruned_dag, inputs) + plan = self._build_execution_plan(pruned_dag, inputs, outputs) return plan @@ -354,7 +380,7 @@ def compile(self, outputs=(), inputs=()): def compute( self, outputs, named_inputs, method=None, overwrites_collector=None): """ - Run the graph. Any inputs to the network must be passed in by name. + Solve & execute the graph, sequentially or parallel. :param list output: The names of the data node you'd like to have returned once all necessary computations are complete. @@ -389,11 +415,11 @@ def compute( # choose a method of execution if method == "parallel": - self._compute_thread_pool_barrier_method( + self._execute_thread_pool_barrier_method( cache, overwrites_collector, named_inputs) else: - self._compute_sequential_method( - cache, overwrites_collector, named_inputs, outputs) + self._execute_sequential_method( + cache, overwrites_collector, named_inputs) if not outputs: # Return the whole cache as output, including input and @@ -415,7 +441,7 @@ def _pin_data_in_cache(self, value_name, cache, inputs, overwrites): cache[value_name] = inputs[value_name] - def _compute_thread_pool_barrier_method( + def _execute_thread_pool_barrier_method( self, cache, overwrites, inputs, thread_pool_size=10 ): """ @@ -432,7 +458,7 @@ def _compute_thread_pool_barrier_method( # this keeps track of all nodes that have already executed - has_executed = set() # unordered, not iterated + executed_nodes = set() # unordered, not iterated # with each loop iteration, we determine a set of operations that can be # scheduled, then schedule them onto a thread pool, then collect their @@ -443,21 +469,30 @@ def _compute_thread_pool_barrier_method( # in the current round of scheduling upnext = [] for node in self.execution_plan: - # only delete if all successors for the data node have been executed - if isinstance(node, DeleteInstruction): - if ready_to_delete_data_node(node, - has_executed, - self.graph): - if node in cache: - cache.pop(node) - - # continue if this node is anything but an operation node - if not isinstance(node, Operation): - continue - - if ready_to_schedule_operation(node, has_executed, self.graph) \ - and node not in has_executed: + if ( + isinstance(node, Operation) + and self._can_schedule_operation(node, executed_nodes) + and node not in executed_nodes + ): upnext.append(node) + elif isinstance(node, DeleteInstruction): + # Only delete if all successors for the data node + # have been executed. + # An optional need may not have a value in the cache. + if ( + node in cache + and self._can_evict_value(node, executed_nodes) + ): + if self._debug: + print("removing data '%s' from cache." % node) + del cache[node] + elif isinstance(node, PinInstruction): + # Always and repeatedely pin the value, even if not all + # providers of the data have executed. + # An optional need may not have a value in the cache. + if node in cache: + self._pin_data_in_cache(node, cache, inputs, overwrites) + # stop if no nodes left to schedule, exit out of the loop @@ -469,10 +504,10 @@ def _compute_thread_pool_barrier_method( upnext) for op, result in done_iterator: cache.update(result) - has_executed.add(op) + executed_nodes.add(op) - def _compute_sequential_method(self, cache, overwrites, inputs, outputs): + def _execute_sequential_method(self, cache, overwrites, inputs): """ This method runs the graph one operation at a time in a single thread """ @@ -500,18 +535,12 @@ def _compute_sequential_method(self, cache, overwrites, inputs, outputs): if self._debug: print("step completion time: %s" % t_complete) - # Process DeleteInstructions by deleting the corresponding data - # if possible. elif isinstance(step, DeleteInstruction): - - if outputs and step not in outputs: - # Some DeleteInstruction steps may not exist in the cache - # if they come from optional() needs that are not privoded - # as inputs. Make sure the step exists before deleting. - if step in cache: - if self._debug: - print("removing data '%s' from cache." % step) - cache.pop(step) + # Cache value may be missing if it is optional. + if step in cache: + if self._debug: + print("removing data '%s' from cache." % step) + del cache[step] elif isinstance(step, PinInstruction): self._pin_data_in_cache(step, cache, inputs, overwrites) @@ -550,7 +579,7 @@ def get_node_name(a): g = pydot.Dot(graph_type="digraph") # draw nodes - for nx_node in self.graph.nodes(): + for nx_node in self.graph.nodes: if isinstance(nx_node, DataPlaceholderNode): node = pydot.Node(name=nx_node, shape="rect") else: @@ -592,50 +621,45 @@ def get_node_name(a): return g -def ready_to_schedule_operation(op, has_executed, graph): - """ - Determines if a Operation is ready to be scheduled for execution based on - what has already been executed. + def _can_schedule_operation(self, op, executed_nodes): + """ + Determines if a Operation is ready to be scheduled for execution + + based on what has already been executed. - Args: - op: + :param op: The Operation object to check - has_executed: set + :param set executed_nodes A set containing all operations that have been executed so far - graph: - The networkx graph containing the operations and data nodes - Returns: - A boolean indicating whether the operation may be scheduled for - execution based on what has already been executed. - """ - # unordered, not iterated - dependencies = set(filter(lambda v: isinstance(v, Operation), - nx.ancestors(graph, op))) - return dependencies.issubset(has_executed) + :return: + A boolean indicating whether the operation may be scheduled for + execution based on what has already been executed. + """ + # unordered, not iterated + dependencies = set(n for n in nx.ancestors(self.graph, op) + if isinstance(n, Operation)) + return dependencies.issubset(executed_nodes) -def ready_to_delete_data_node(name, has_executed, graph): - """ - Determines if a DataPlaceholderNode is ready to be deleted from the - cache. - Args: - name: + def _can_evict_value(self, name, executed_nodes): + """ + Determines if a DataPlaceholderNode is ready to be deleted from cache. + + :param name: The name of the data node to check - has_executed: set + :param executed_nodes: set A set containing all operations that have been executed so far - graph: - The networkx graph containing the operations and data nodes - Returns: - A boolean indicating whether the data node can be deleted or not. - """ - data_node = get_data_node(name, graph) - return set(graph.successors(data_node)).issubset(has_executed) + :return: + A boolean indicating whether the data node can be deleted or not. + """ + data_node = self.get_data_node(name) + return data_node and set( + self.graph.successors(data_node)).issubset(executed_nodes) -def get_data_node(name, graph): - """ - Gets a data node from a graph using its name - """ - for node in graph.nodes(): - if node == name and isinstance(node, DataPlaceholderNode): + def get_data_node(self, name): + """ + Retuen the data node from a graph using its name, or None. + """ + node = self.graph.nodes[name] + if isinstance(node, DataPlaceholderNode): return node - return None diff --git a/test/test_graphkit.py b/test/test_graphkit.py index ce9b80d6..0afea72d 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -11,6 +11,7 @@ import graphkit.network as network import graphkit.modifiers as modifiers from graphkit import operation, compose, Operation +from graphkit.network import DeleteInstruction def scream(*args, **kwargs): @@ -206,37 +207,37 @@ def test_pruning_raises_for_bad_output(): def test_pruning_not_overrides_given_intermediate(): # Test #25: v1.2.4 overwrites intermediate data when no output asked - netop = compose(name="netop")( + pipeline = compose(name="pipeline")( operation(name="unjustly run", needs=["a"], provides=["overriden"])(scream), operation(name="op", needs=["overriden", "c"], provides=["asked"])(add), ) exp = {"a": 5, "overriden": 1, "c": 2, "asked": 3} # v1.2.4.ok - assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") # FAILs # - on v1.2.4 with (overriden, asked): = (5, 7) instead of (1, 3) # - on #18(unsatisfied) + #23(ordered-sets) with (overriden, asked) = (5, 7) instead of (1, 3) # FIXED on #26 - assert netop({"a": 5, "overriden": 1, "c": 2}) == exp + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp ## Test OVERWITES # overwrites = {} - netop.set_overwrites_collector(overwrites) - assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") assert overwrites == {} # unjust must have been pruned - + overwrites = {} - netop.set_overwrites_collector(overwrites) - assert netop({"a": 5, "overriden": 1, "c": 2}) == exp + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp assert overwrites == {} # unjust must have been pruned def test_pruning_multiouts_not_override_intermediates1(): # Test #25: v.1.2.4 overwrites intermediate data when a previous operation # must run for its other outputs (outputs asked or not) - netop = compose(name="netop")( + pipeline = compose(name="pipeline")( operation(name="must run", needs=["a"], provides=["overriden", "calced"]) (lambda x: (x, 2 * x)), operation(name="add", needs=["overriden", "calced"], provides=["asked"])(add), @@ -247,35 +248,36 @@ def test_pruning_multiouts_not_override_intermediates1(): # - on v1.2.4 with (overriden, asked) = (5, 15) instead of (1, 11) # - on #18(unsatisfied) + #23(ordered-sets) like v1.2.4. # FIXED on #26 - assert netop({"a": 5, "overriden": 1}) == exp + assert pipeline({"a": 5, "overriden": 1}) == exp # FAILs # - on v1.2.4 with KeyError: 'e', # - on #18(unsatisfied) + #23(ordered-sets) with empty result. # FIXED on #26 - assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") ## Test OVERWITES # overwrites = {} - netop.set_overwrites_collector(overwrites) - assert netop({"a": 5, "overriden": 1}) == exp + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1}) == exp assert overwrites == {'overriden': 5} overwrites = {} - netop.set_overwrites_collector(overwrites) - assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") assert overwrites == {'overriden': 5} - # ## Test parallel - # netop.set_execution_method("parallel") - # assert netop({"a": 5, "overriden": 1}) == exp - # assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + ## Test parallel + # + pipeline.set_execution_method("parallel") + assert pipeline({"a": 5, "overriden": 1}) == exp + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") def test_pruning_multiouts_not_override_intermediates2(): # Test #25: v.1.2.4 overrides intermediate data when a previous operation # must run for its other outputs (outputs asked or not) - netop = compose(name="netop")( + pipeline = compose(name="pipeline")( operation(name="must run", needs=["a"], provides=["overriden", "e"]) (lambda x: (x, 2 * x)), operation(name="op1", needs=["overriden", "c"], provides=["d"])(add), @@ -287,30 +289,36 @@ def test_pruning_multiouts_not_override_intermediates2(): # - on v1.2.4 with (overriden, asked) = (5, 70) instead of (1, 13) # - on #18(unsatisfied) + #23(ordered-sets) like v1.2.4. # FIXED on #26 - assert netop({"a": 5, "overriden": 1, "c": 2}) == exp + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp # FAILs # - on v1.2.4 with KeyError: 'e', # - on #18(unsatisfied) + #23(ordered-sets) with empty result. - assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") # FIXED on #26 ## Test OVERWITES # overwrites = {} - netop.set_overwrites_collector(overwrites) - assert netop({"a": 5, "overriden": 1, "c": 2}) == exp + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp assert overwrites == {'overriden': 5} overwrites = {} - netop.set_overwrites_collector(overwrites) - assert netop({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") assert overwrites == {'overriden': 5} + ## Test parallel + # + pipeline.set_execution_method("parallel") + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + def test_pruning_with_given_intermediate_and_asked_out(): # Test #24: v1.2.4 does not prune before given intermediate data when # outputs not asked, but does so when output asked. - netop = compose(name="netop")( + pipeline = compose(name="pipeline")( operation(name="unjustly pruned", needs=["given-1"], provides=["a"])(identity), operation(name="shortcuted", needs=["a", "b"], provides=["given-2"])(add), operation(name="good_op", needs=["a", "given-2"], provides=["asked"])(add), @@ -318,55 +326,83 @@ def test_pruning_with_given_intermediate_and_asked_out(): exp = {"given-1": 5, "b": 2, "given-2": 2, "a": 5, "asked": 7} # v1.2.4 is ok - assert netop({"given-1": 5, "b": 2, "given-2": 2}) == exp + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp # FAILS # - on v1.2.4 with KeyError: 'a', # - on #18 (unsatisfied) with no result. # FIXED on #18+#26 (new dag solver). - assert netop({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") ## Test OVERWITES # overwrites = {} - netop.set_overwrites_collector(overwrites) - assert netop({"given-1": 5, "b": 2, "given-2": 2}) == exp + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp assert overwrites == {} overwrites = {} - netop.set_overwrites_collector(overwrites) - assert netop({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") assert overwrites == {} + ## Test parallel + # + pipeline.set_execution_method("parallel") + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") + def test_unsatisfied_operations(): # Test that operations with partial inputs are culled and not failing. - netop = compose(name="netop")( + pipeline = compose(name="pipeline")( operation(name="add", needs=["a", "b1"], provides=["a+b1"])(add), operation(name="sub", needs=["a", "b2"], provides=["a-b2"])(sub), ) exp = {"a": 10, "b1": 2, "a+b1": 12} - assert netop({"a": 10, "b1": 2}) == exp - assert netop({"a": 10, "b1": 2}, outputs=["a+b1"]) == filtdict(exp, "a+b1") + assert pipeline({"a": 10, "b1": 2}) == exp + assert pipeline({"a": 10, "b1": 2}, outputs=["a+b1"]) == filtdict(exp, "a+b1") exp = {"a": 10, "b2": 2, "a-b2": 8} - assert netop({"a": 10, "b2": 2}) == exp - assert netop({"a": 10, "b2": 2}, outputs=["a-b2"]) == filtdict(exp, "a-b2") + assert pipeline({"a": 10, "b2": 2}) == exp + assert pipeline({"a": 10, "b2": 2}, outputs=["a-b2"]) == filtdict(exp, "a-b2") + + ## Test parallel + # + pipeline.set_execution_method("parallel") + exp = {"a": 10, "b1": 2, "a+b1": 12} + assert pipeline({"a": 10, "b1": 2}) == exp + assert pipeline({"a": 10, "b1": 2}, outputs=["a+b1"]) == filtdict(exp, "a+b1") + + exp = {"a": 10, "b2": 2, "a-b2": 8} + assert pipeline({"a": 10, "b2": 2}) == exp + assert pipeline({"a": 10, "b2": 2}, outputs=["a-b2"]) == filtdict(exp, "a-b2") def test_unsatisfied_operations_same_out(): # Test unsatisfied pairs of operations providing the same output. - netop = compose(name="netop")( + pipeline = compose(name="pipeline")( operation(name="mul", needs=["a", "b1"], provides=["ab"])(mul), operation(name="div", needs=["a", "b2"], provides=["ab"])(floordiv), operation(name="add", needs=["ab", "c"], provides=["ab_plus_c"])(add), ) exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21} - assert netop({"a": 10, "b1": 2, "c": 1}) == exp - assert netop({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + assert pipeline({"a": 10, "b1": 2, "c": 1}) == exp + assert pipeline({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + + exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6} + assert pipeline({"a": 10, "b2": 2, "c": 1}) == exp + assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + + ## Test parallel + # + pipeline.set_execution_method("parallel") + exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21} + assert pipeline({"a": 10, "b1": 2, "c": 1}) == exp + assert pipeline({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6} - assert netop({"a": 10, "b2": 2, "c": 1}) == exp - assert netop({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + assert pipeline({"a": 10, "b2": 2, "c": 1}) == exp + assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") def test_optional(): @@ -413,7 +449,10 @@ def addplusplus(a, b, c=0): def test_deleteinstructs_vary_with_inputs(): # Check #21: DeleteInstructions positions vary when inputs change. - netop = compose(name="netop")( + def count_deletions(steps): + return sum(isinstance(n, DeleteInstruction) for n in steps) + + pipeline = compose(name="pipeline")( operation(name="a free without b", needs=["a"], provides=["aa"])(identity), operation(name="satisfiable", needs=["a", "b"], provides=["ab"])(add), operation(name="optional ab", needs=["aa", modifiers.optional("ab")], provides=["asked"]) @@ -422,43 +461,56 @@ def test_deleteinstructs_vary_with_inputs(): inp = {"a": 2, "b": 3} exp = inp.copy(); exp.update({"aa": 2, "ab": 5, "asked": 7}) - res = netop(inp) + res = pipeline(inp) assert res == exp # ok - steps11 = netop.net.execution_plan - res = netop(inp, outputs=["asked"]) + steps11 = pipeline.net.execution_plan + res = pipeline(inp, outputs=["asked"]) assert res == filtdict(exp, "asked") # ok - steps12 = netop.net.execution_plan + steps12 = pipeline.net.execution_plan inp = {"a": 2} exp = inp.copy(); exp.update({"aa": 2, "asked": 12}) - res = netop(inp) + res = pipeline(inp) assert res == exp # ok - steps21 = netop.net.execution_plan - res = netop(inp, outputs=["asked"]) + steps21 = pipeline.net.execution_plan + res = pipeline(inp, outputs=["asked"]) assert res == filtdict(exp, "asked") # ok - steps22 = netop.net.execution_plan + steps22 = pipeline.net.execution_plan + + # When no outs, no del-instructs. + assert steps11 != steps12 + assert count_deletions(steps11) == 0 + assert steps21 != steps22 + assert count_deletions(steps21) == 0 + + # Check steps vary with inputs + # + # FAILs in v1.2.4 + #18, PASS in #26 + assert steps11 != steps21 - assert steps11 == steps12 - assert steps21 == steps22 - assert steps11 != steps21 # FAILs in v1.2.4 + #18 - assert steps12 != steps22 # FAILs in v1.2.4 + #18 + # Check deletes vary with inputs + # + # FAILs in v1.2.4 + #18, PASS in #26 + assert count_deletions(steps12) != count_deletions(steps22) def test_parallel_execution(): import time + delay = 0.5 + def fn(x): - time.sleep(1) + time.sleep(delay) print("fn %s" % (time.time() - t0)) return 1 + x def fn2(a,b): - time.sleep(1) + time.sleep(delay) print("fn2 %s" % (time.time() - t0)) return a+b def fn3(z, k=1): - time.sleep(1) + time.sleep(delay) print("fn3 %s" % (time.time() - t0)) return z + k @@ -527,8 +579,8 @@ def infer(i): assert tuple(sorted(results.keys())) == tuple(sorted(outputs)), (outputs, results) return results - N = 100 - for i in range(20, 200): + N = 33 + for i in range(13, 61): pool = Pool(i) pool.map(infer, range(N)) pool.close() From 1cc733ef8c51e62a9e53b0f4ddf3d21138c4ac3b Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Sat, 5 Oct 2019 00:33:06 +0300 Subject: [PATCH 22/28] enh(CI): +PY3.6 where dicts are stable --- .travis.yml | 1 + test/test_graphkit.py | 24 +++++++++++++----------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/.travis.yml b/.travis.yml index d8657a8f..3350051a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,7 @@ python: - "2.7" - "3.4" - "3.5" + - "3.6" install: - pip install Sphinx sphinx_rtd_theme codecov packaging diff --git a/test/test_graphkit.py b/test/test_graphkit.py index 0afea72d..cc0221d2 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -346,10 +346,11 @@ def test_pruning_with_given_intermediate_and_asked_out(): assert overwrites == {} ## Test parallel + # FAIL! in #26! # - pipeline.set_execution_method("parallel") - assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp - assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") + # pipeline.set_execution_method("parallel") + # assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp + # assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") def test_unsatisfied_operations(): # Test that operations with partial inputs are culled and not failing. @@ -394,15 +395,16 @@ def test_unsatisfied_operations_same_out(): assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") ## Test parallel + # FAIL! in #26 # - pipeline.set_execution_method("parallel") - exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21} - assert pipeline({"a": 10, "b1": 2, "c": 1}) == exp - assert pipeline({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") - - exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6} - assert pipeline({"a": 10, "b2": 2, "c": 1}) == exp - assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + # pipeline.set_execution_method("parallel") + # exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21} + # assert pipeline({"a": 10, "b1": 2, "c": 1}) == exp + # assert pipeline({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + + # exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6} + # assert pipeline({"a": 10, "b2": 2, "c": 1}) == exp + # assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") def test_optional(): From 5c3b8ef7a8300bd3eeb650fe8677d5de1119ad0a Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Tue, 15 Oct 2019 14:57:37 +0300 Subject: [PATCH 23/28] TEST(plan): new check-multithreading-TC passes OK --- test/test_graphkit.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/test/test_graphkit.py b/test/test_graphkit.py index cc0221d2..1fb0d002 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -496,6 +496,36 @@ def count_deletions(steps): assert count_deletions(steps12) != count_deletions(steps22) +def test_multithreading_plan_execution(): + # From Huygn's test-code given in yahoo/graphkit#31 + from multiprocessing.dummy import Pool + from graphkit import compose, operation + + # Computes |a|^p. + def abspow(a, p): + c = abs(a) ** p + return c + + # Compose the mul, sub, and abspow operations into a computation graph. + graph = compose(name="graph")( + operation(name="mul1", needs=["a", "b"], provides=["ab"])(mul), + operation(name="sub1", needs=["a", "ab"], provides=["a_minus_ab"])(sub), + operation( + name="abspow1", + needs=["a_minus_ab"], + provides=["abs_a_minus_ab_cubed"], + params={"p": 3}, + )(abspow), + ) + + pool = Pool(10) + graph.set_execution_method("parallel") + pool.map( + lambda i: graph({"a": 2, "b": 5}, ["a_minus_ab", "abs_a_minus_ab_cubed"]), + range(100), + ) + + def test_parallel_execution(): import time @@ -551,6 +581,7 @@ def fn3(z, k=1): # make sure results are the same using either method assert result_sequential == result_threaded + def test_multi_threading(): import time import random From 58977a48facb975448a2788baff84359b621b4cf Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Fri, 4 Oct 2019 07:01:35 +0300 Subject: [PATCH 24/28] WIP/FIX(PIN): PARALLEL DELs decide on PRUNED-dag (not full)... - WIP: x4 TCs FAIL and still not discovered th bug :-( + BUT ALL+AUGMENTED PARALLEL TCs pass (#26 were failing some) + refact: net stores also `pruned_dag` (not only `steps`). + refact: _solve_dag() --> _prune_dag(). + doc: +a lot. + TODO: store pruned_dag in own ExePlan class. --- graphkit/network.py | 153 ++++++++++++++++++++++++------------------ test/test_graphkit.py | 37 +++++++--- 2 files changed, 112 insertions(+), 78 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index d5c15539..114c945a 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -54,54 +54,65 @@ def __repr__(self): class Network(object): """ - Assemble operations & data into a directed-acyclic-graph (DAG) and run them + Assemble operations & data into a directed-acyclic-graph (DAG) to run them. - based on the given input values and requested outputs. + The execution of the contained *operations* in the dag (the computation) + is splitted in 2 phases: - The execution of *operations* (a computation) is splitted in 2 phases: - - - COMPILE: prune, sort topologically the nodes in the dag, solve it, and + - COMPILE: prune unsatisfied nodes, sort dag topologically & solve it, and derive the *execution plan* (see below) based on the given *inputs* and asked *outputs*. - EXECUTE: sequential or parallel invocation of the underlying functions - of the operations. - - is based on 4 data-structures: - - - the ``networkx`` :attr:`graph` DAG, containing interchanging layers of - :class:`Operation` and :class:`DataPlaceholderNode` nodes. - They are layed out and connected by repeated calls of :meth:`add_OP`. - - The computation starts with :meth:`_solve_dag()` extracting - a *DAG subgraph* by *pruning* nodes based on given inputs and - requested outputs. - This subgraph is used to decide the `execution_plan` (see below), and - and is cached in :attr:`_cached_execution_plans` across runs with - inputs/outputs as key. - - - the :attr:`execution_plan` is the list of the operation-nodes only - from the dag (above), topologically sorted, and interspersed with - *instructions steps* needed to complete the run. - It is built by :meth:`_build_execution_plan()` based on the subgraph dag - extracted above. The *instructions* items achieve the following: - - - :class:`DeleteInstruction`: delete items from values-cache as soon as - they are not needed further down the dag, to reduce memory footprint - while computing. - - - :class:`PinInstruction`: avoid overwritting any given intermediate - inputs, and still allow their providing operations to run - (because they are needed for their other outputs). - - - the :var:`cache` local-var in :meth:`compute()`, initialized on each run - to hold the values of the given inputs, generated (aka intermediate) data, - and output values. - - - the :var:`overwrites` local-var, initialized on each run of both - ``_compute_xxx`` methods (for parallel or sequential executions), to - hold values calculated but overwritten (aka "pinned") by intermediate - input-values. + of the operations with arguments from the ``cache``. + + is based on 5 data-structures: + + :ivar graph: + A ``networkx`` DAG containing interchanging layers of + :class:`Operation` and :class:`DataPlaceholderNode` nodes. + They are layed out and connected by repeated calls of :meth:`add_OP`. + + The computation starts with :meth:`_prune_dag()` extracting + a *DAG subgraph* by *pruning* its nodes based on given inputs and + requested outputs in :meth:`compute()`. + :ivar execution_dag: + It contains the nodes of the *pruned dag* from the last call to + :meth:`compile()`. This pruned subgraph is used to decide + the :attr:`execution_plan` (below). + It is cached in :attr:`_cached_compilations` across runs with + inputs/outputs as key. + + :ivar execution_plan: + It is the list of the operation-nodes only + from the dag (above), topologically sorted, and interspersed with + *instructions steps* needed to complete the run. + It is built by :meth:`_build_execution_plan()` based on the subgraph dag + extracted above. + It is cached in :attr:`_cached_compilations` across runs with + inputs/outputs as key. + + The *instructions* items achieve the following: + + - :class:`DeleteInstruction`: delete items from values-cache as soon as + they are not needed further down the dag, to reduce memory footprint + while computing. + + - :class:`PinInstruction`: avoid overwritting any given intermediate + inputs, and still allow their providing operations to run + (because they are needed for their other outputs). + + :var cache: + a local-var in :meth:`compute()`, initialized on each run + to hold the values of the given inputs, generated (intermediate) data, + and output values. + It is returned as is if no specific outputs requested; no data-eviction + happens then. + + :arg overwrites: + The optional argument given to :meth:`compute()` to colect the + intermediate *calculated* values that are overwritten by intermediate + (aka "pinned") input-values. """ @@ -119,11 +130,14 @@ def __init__(self, **kwargs): #: The list of operation-nodes & *instructions* needed to evaluate #: the given inputs & asked outputs, free memory and avoid overwritting #: any given intermediate inputs. - self.execution_plan = [] + self.execution_plan = () + + #: Pruned graph of the last compilation. + self.execution_dag = () #: Speed up :meth:`compile()` call and avoid a multithreading issue(?) #: that is occuring when accessing the dag in networkx. - self._cached_execution_plans = {} + self._cached_compilations = {} def add_op(self, operation): @@ -143,8 +157,9 @@ def add_op(self, operation): # assert layer is only added once to graph assert operation not in self.graph.nodes, "Operation may only be added once" - ## Invalidate old plans. - self._cached_execution_plans = {} + self.execution_dag = None + self.execution_plan = None + self._cached_compilations = {} # add nodes and edges to graph describing the data needs for this layer for n in operation.needs: @@ -246,11 +261,11 @@ def _collect_unsatisfied_operations(self, dag, inputs): all its needs have been accounted, so we can get its satisfaction. - Their provided outputs are not linked to any data in the dag. - An operation might not have any output link when :meth:`_solve_dag()` + An operation might not have any output link when :meth:`_prune_dag()` has broken them, due to given intermediate inputs. :param dag: - the graph to consider + a graph with broken edges those arriving to existing inputs :param inputs: an iterable of the names of the input values return: @@ -288,13 +303,12 @@ def _collect_unsatisfied_operations(self, dag, inputs): return unsatisfied - def _solve_dag(self, outputs, inputs): + def _prune_dag(self, outputs, inputs): """ Determines what graph steps need to run to get to the requested - outputs from the provided inputs. Eliminates steps that come before - (in topological order) any inputs that have been provided. Also - eliminates steps that are not on a path from the provided inputs to - the requested outputs. + outputs from the provided inputs. : + - Eliminate steps that are not on a path arriving to requested outputs. + - Eliminate unsatisfied operations: partial inputs or no outputs needed. :param iterable outputs: A list of desired output names. This can also be ``None``, in which @@ -305,7 +319,7 @@ def _solve_dag(self, outputs, inputs): The inputs names of all given inputs. :return: - the *execution plan* + the *pruned_dag* """ dag = self.graph @@ -341,18 +355,16 @@ def _solve_dag(self, outputs, inputs): # Prune unsatisfied operations (those with partial inputs or no outputs). unsatisfied = self._collect_unsatisfied_operations(broken_dag, inputs) - pruned_dag = dag.subgraph(broken_dag.nodes - unsatisfied) + pruned_dag = dag.subgraph(self.graph.nodes - unsatisfied) - plan = self._build_execution_plan(pruned_dag, inputs, outputs) - - return plan + return pruned_dag.copy() # clone so that it is picklable def compile(self, outputs=(), inputs=()): """ Solve dag, set the :attr:`execution_plan`, and cache it. - See :meth:`_solve_dag()` for detailed description. + See :meth:`_prune_dag()` for detailed description. :param iterable outputs: A list of desired output names. This can also be ``None``, in which @@ -368,12 +380,20 @@ def compile(self, outputs=(), inputs=()): outputs = tuple(sorted(outputs)) inputs_keys = tuple(sorted(inputs)) cache_key = (inputs_keys, outputs) - if cache_key in self._cached_execution_plans: - self.execution_plan = self._cached_execution_plans[cache_key] + + if cache_key in self._cached_compilations: + dag, plan = self._cached_compilations[cache_key] else: - plan = self._solve_dag(outputs, inputs) - # save this result in a precomputed cache for future lookup - self.execution_plan = self._cached_execution_plans[cache_key] = plan + dag = self._prune_dag(outputs, inputs) + plan = self._build_execution_plan(dag, inputs, outputs) + + # Cache compilation results to speed up future runs + # with different values (but same number of inputs/outputs). + self._cached_compilations[cache_key] = dag, plan + + ## TODO: Extract into Solution class + self.execution_dag = dag + self.execution_plan = plan @@ -494,7 +514,6 @@ def _execute_thread_pool_barrier_method( self._pin_data_in_cache(node, cache, inputs, overwrites) - # stop if no nodes left to schedule, exit out of the loop if len(upnext) == 0: break @@ -636,7 +655,7 @@ def _can_schedule_operation(self, op, executed_nodes): execution based on what has already been executed. """ # unordered, not iterated - dependencies = set(n for n in nx.ancestors(self.graph, op) + dependencies = set(n for n in nx.ancestors(self.execution_dag, op) if isinstance(n, Operation)) return dependencies.issubset(executed_nodes) @@ -654,7 +673,7 @@ def _can_evict_value(self, name, executed_nodes): """ data_node = self.get_data_node(name) return data_node and set( - self.graph.successors(data_node)).issubset(executed_nodes) + self.execution_dag.successors(data_node)).issubset(executed_nodes) def get_data_node(self, name): """ diff --git a/test/test_graphkit.py b/test/test_graphkit.py index 1fb0d002..b7bb329b 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -233,6 +233,19 @@ def test_pruning_not_overrides_given_intermediate(): assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp assert overwrites == {} # unjust must have been pruned + ## Test Parallel + # + pipeline.set_execution_method("parallel") + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + #assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert overwrites == {} # unjust must have been pruned + + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp + assert overwrites == {} # unjust must have been pruned + def test_pruning_multiouts_not_override_intermediates1(): # Test #25: v.1.2.4 overwrites intermediate data when a previous operation @@ -348,9 +361,9 @@ def test_pruning_with_given_intermediate_and_asked_out(): ## Test parallel # FAIL! in #26! # - # pipeline.set_execution_method("parallel") - # assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp - # assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") + pipeline.set_execution_method("parallel") + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") def test_unsatisfied_operations(): # Test that operations with partial inputs are culled and not failing. @@ -395,16 +408,17 @@ def test_unsatisfied_operations_same_out(): assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") ## Test parallel + # # FAIL! in #26 + pipeline.set_execution_method("parallel") + exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21} + assert pipeline({"a": 10, "b1": 2, "c": 1}) == exp + assert pipeline({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") # - # pipeline.set_execution_method("parallel") - # exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21} - # assert pipeline({"a": 10, "b1": 2, "c": 1}) == exp - # assert pipeline({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") - - # exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6} - # assert pipeline({"a": 10, "b2": 2, "c": 1}) == exp - # assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + # FAIL! in #26 + exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6} + assert pipeline({"a": 10, "b2": 2, "c": 1}) == exp + assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") def test_optional(): @@ -655,6 +669,7 @@ def compute(self, inputs): outputs.append(p) return outputs + def test_backwards_compatibility(): sum_op1 = Sum( From 27f222d02f593acd4c5dffa24aa73efce9a5901e Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Mon, 7 Oct 2019 06:02:09 +0300 Subject: [PATCH 25/28] FIX(NET): were FORGETTING PRUNED ASKED-OUTPUTs... ... bugged in the opening commit d403783 of this PR, and discovered 68(!) commits later, and all that time had to live with x4 broken TCs with asked-outputs. --- graphkit/network.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/graphkit/network.py b/graphkit/network.py index 114c945a..156048cf 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -355,7 +355,8 @@ def _prune_dag(self, outputs, inputs): # Prune unsatisfied operations (those with partial inputs or no outputs). unsatisfied = self._collect_unsatisfied_operations(broken_dag, inputs) - pruned_dag = dag.subgraph(self.graph.nodes - unsatisfied) + # Clone it so that it is picklable. + pruned_dag = dag.subgraph(broken_dag.nodes - unsatisfied) return pruned_dag.copy() # clone so that it is picklable From 7fe6080e26b3d0ed2d0a9e8b900cee05b4d1aacf Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Mon, 7 Oct 2019 21:03:26 +0300 Subject: [PATCH 26/28] FIX( Date: Mon, 14 Oct 2019 07:35:35 +0300 Subject: [PATCH 27/28] FIX(DAG): broken_dag had PLAIN-STR instead of DataNode... bc subgraph was taken on plain string outputs. + minor upd err-msg terminology. --- graphkit/network.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index cb1aa1a6..672f47bb 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -365,9 +365,10 @@ def _prune_dag(self, outputs, inputs): # If caller requested specific outputs, we can prune any # unrelated nodes further up the dag. ending_in_outputs = set() - for input_name in outputs: - ending_in_outputs.update(nx.ancestors(dag, input_name)) - broken_dag = broken_dag.subgraph(ending_in_outputs | set(outputs)) + for output_name in outputs: + ending_in_outputs.add(DataPlaceholderNode(output_name)) + ending_in_outputs.update(nx.ancestors(dag, output_name)) + broken_dag = broken_dag.subgraph(ending_in_outputs) # Prune unsatisfied operations (those with partial inputs or no outputs). @@ -377,6 +378,11 @@ def _prune_dag(self, outputs, inputs): return pruned_dag.copy() # clone so that it is picklable + assert all( + isinstance(n, (Operation, DataPlaceholderNode)) for n in pruned_dag + ), pruned_dag + + return pruned_dag.copy() def compile(self, outputs=(), inputs=()): """ From fb1b0741b384f9a0a53e4a5db1b22a31485ff86b Mon Sep 17 00:00:00 2001 From: Kostis Anagnostopoulos Date: Wed, 16 Oct 2019 11:48:30 +0300 Subject: [PATCH 28/28] FIX(net): bad refactor had broken PARALLEL EVICTION ... due to bad node check, evicting parallels it nevered kicked in. --- graphkit/network.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/graphkit/network.py b/graphkit/network.py index 672f47bb..e8240056 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -703,6 +703,6 @@ def get_data_node(self, name): """ Retuen the data node from a graph using its name, or None. """ - node = self.graph.nodes[name] - if isinstance(node, DataPlaceholderNode): - return node + for node in self.graph.nodes: + if node == name and isinstance(node, DataPlaceholderNode): + return node