Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: refactor Network and DAG SOLVER to fix bad pruning #26

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
617e577
FIX(build): py2 needs pinning networkx-2.2
ankostis Sep 28, 2019
f58d148
FIX(#13): BUG in plot-diagram writtin from PY2 era,
ankostis Sep 29, 2019
52c0d77
enh(test): + x2 TC breaking UNSATISFIED operations...
ankostis Sep 30, 2019
bc4c221
ENH(net,#18): ignore UN-SATISFIABLE operations with partial inputs
ankostis Sep 29, 2019
b8daa07
refact(net): drop old `dag` nx-package
ankostis Oct 1, 2019
12bdfe4
ENH(core): ORDERED SETs for DETERMINISTIC results
ankostis Oct 1, 2019
b8377ca
merge UNASTIFIABLE + ORDERED_SETs
ankostis Oct 1, 2019
489b32c
refact(net): simpilify del-instruction loop
ankostis Oct 1, 2019
b102d44
REFACT(unsatisfied): doubly-recursive func --> loop on topo-sorted
ankostis Oct 1, 2019
de02885
test(dag,#25): FAILing TC for overriding intermediate data
ankostis Oct 2, 2019
e1454fd
test(dag,#24): FAILing TC for over-pruning inetermediates when outs a…
ankostis Oct 2, 2019
3736738
MERGE (prune_unsatified, ordered_sets) into fix-pruning ...
ankostis Oct 2, 2019
c273068
DOC(net): explain DAG solution & compilation...
ankostis Oct 2, 2019
16d42f1
TEST(prune): +Failing x2 TCs multi-out must run but not...
ankostis Oct 2, 2019
b92f103
refact(dag): call compile() before compute.compute...
ankostis Oct 2, 2019
6d1884e
test(dag): +TC checking DeleteInst vary when inputs change
ankostis Oct 2, 2019
619cae7
ENH(net): move compile() after SOLVE DAG ...
ankostis Oct 2, 2019
eff351d
REFACT(NET) COMPILE+COMPUTE...
ankostis Oct 2, 2019
d959485
doc(net): explain new DAG SOLUTION
ankostis Oct 2, 2019
17eb2fd
FIX(net): new Ops invalidate execution-plan cache...
ankostis Oct 3, 2019
0830b7c
ENH(DAG): NEW SOLVER
ankostis Oct 3, 2019
0dc1293
WIP/FIX(prune,#26): PIN intermediate inputs if operation before must run
ankostis Oct 3, 2019
06f6554
REFACT(net): part 3 of new dag-solver & pin refactoring
ankostis Oct 4, 2019
1cc733e
enh(CI): +PY3.6 where dicts are stable
ankostis Oct 4, 2019
5c3b8ef
TEST(plan): new check-multithreading-TC passes OK
ankostis Oct 15, 2019
58977a4
WIP/FIX(PIN): PARALLEL DELs decide on PRUNED-dag (not full)...
ankostis Oct 4, 2019
27f222d
FIX(NET): were FORGETTING PRUNED ASKED-OUTPUTs...
ankostis Oct 7, 2019
7fe6080
FIX(<PY3.5): ORDERED DiGRAPH for old Python to fix TCs
ankostis Oct 7, 2019
67aaadc
FIX(DAG): broken_dag had PLAIN-STR instead of DataNode...
ankostis Oct 14, 2019
fb1b074
FIX(net): bad refactor had broken PARALLEL EVICTION ...
ankostis Oct 16, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"

install:
- pip install Sphinx sphinx_rtd_theme codecov packaging
Expand Down
42 changes: 35 additions & 7 deletions graphkit/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Copyright 2016, Yahoo Inc.
# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms.
try:
from collections import abc
except ImportError:
import collections as abc


class Data(object):
"""
Expand Down Expand Up @@ -151,9 +156,12 @@ def __init__(self, **kwargs):

# set execution mode to single-threaded sequential by default
self._execution_method = "sequential"
self._overwrites_collector = None

def _compute(self, named_inputs, outputs=None):
return self.net.compute(outputs, named_inputs, method=self._execution_method)
return self.net.compute(
outputs, named_inputs, method=self._execution_method,
overwrites_collector=self._overwrites_collector)

def __call__(self, *args, **kwargs):
return self._compute(*args, **kwargs)
Expand All @@ -162,15 +170,35 @@ def set_execution_method(self, method):
"""
Determine how the network will be executed.

Args:
method: str
If "parallel", execute graph operations concurrently
using a threadpool.
:param str method:
If "parallel", execute graph operations concurrently
using a threadpool.
"""
options = ['parallel', 'sequential']
assert method in options
choices = ['parallel', 'sequential']
if method not in choices:
raise ValueError(
"Invalid computation method %r! Must be one of %s"
(method, choices))
self._execution_method = method

def set_overwrites_collector(self, collector):
"""
Asks to put all *overwrites* into the `collector` after computing

An "overwrites" is intermediate value calculated but NOT stored
into the results, becaues it has been given also as an intemediate
input value, and the operation that would overwrite it MUST run for
its other results.

:param collector:
a mutable dict to be fillwed with named values
"""
if collector is not None and not isinstance(collector, abc.MutableMapping):
raise ValueError(
"Overwrites collector was not a MutableMapping, but: %r"
% collector)
self._overwrites_collector = collector

def plot(self, filename=None, show=False):
self.net.plot(filename=filename, show=show)

Expand Down
20 changes: 12 additions & 8 deletions graphkit/functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from itertools import chain

from boltons.setutils import IndexedSet as iset

from .base import Operation, NetworkOperation
from .network import Network
from .modifiers import optional
Expand All @@ -28,7 +30,7 @@ def _compute(self, named_inputs, outputs=None):

result = zip(self.provides, result)
if outputs:
outputs = set(outputs)
outputs = sorted(set(outputs))
result = filter(lambda x: x[0] in outputs, result)

return dict(result)
Expand Down Expand Up @@ -185,27 +187,29 @@ def __call__(self, *operations):

# If merge is desired, deduplicate operations before building network
if self.merge:
merge_set = set()
merge_set = iset() # Preseve given node order.
for op in operations:
if isinstance(op, NetworkOperation):
net_ops = filter(lambda x: isinstance(x, Operation), op.net.steps)
op.net.compile()
net_ops = filter(lambda x: isinstance(x, Operation),
op.net.execution_plan)
merge_set.update(net_ops)
else:
merge_set.add(op)
operations = list(merge_set)
operations = merge_set

def order_preserving_uniquifier(seq, seen=None):
seen = seen if seen else set()
seen = seen if seen else set() # unordered, not iterated
seen_add = seen.add
return [x for x in seq if not (x in seen or seen_add(x))]

provides = order_preserving_uniquifier(chain(*[op.provides for op in operations]))
needs = order_preserving_uniquifier(chain(*[op.needs for op in operations]), set(provides))
needs = order_preserving_uniquifier(chain(*[op.needs for op in operations]),
set(provides)) # unordered, not iterated

# compile network
# Build network
net = Network()
for op in operations:
net.add_op(op)
net.compile()

return NetworkOperation(name=self.name, needs=needs, provides=provides, params={}, net=net)
Loading