Skip to content

Commit

Permalink
Subgraphs (#108)
Browse files Browse the repository at this point in the history
subgraphs working
- Graph input and output plugs
- Cycle Errors are caught
- Serialization
  • Loading branch information
PaulSchweizer authored Nov 27, 2019
1 parent 41ea1ee commit 6ea0913
Show file tree
Hide file tree
Showing 12 changed files with 688 additions and 177 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
[![Codacy_Badge_Grade](https://api.codacy.com/project/badge/Grade/6ac650d8580d43dbaf7de96a3171e76f)](https://www.codacy.com/app/paulschweizer/flowpipe?utm_source=github.com&utm_medium=referral&utm_content=PaulSchweizer/flowpipe&utm_campaign=Badge_Grade)
[![Codacy_Badge_Coverage](https://api.codacy.com/project/badge/Coverage/6ac650d8580d43dbaf7de96a3171e76f)](https://www.codacy.com/app/paulschweizer/flowpipe?utm_source=github.com&utm_medium=referral&utm_content=PaulSchweizer/flowpipe&utm_campaign=Badge_Coverage) [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE) [![python 2.7](https://img.shields.io/badge/python-2.7%2B-blue.svg)](https://www.python.org/downloads/) [![python 3.6+](https://img.shields.io/badge/python-3.6%2B-blue.svg)](https://www.python.org/downloads/)

![Flowpipe Logo](logo.png)

# Flow-based Programming
A lightweight framework for flow-based programming in python.
Expand Down Expand Up @@ -222,6 +221,9 @@ The code for these examples:
Another simple example:
[world_clock.py](examples/world_clock.py)!

How to make use of nested subgraphs:
[nested_graphs.py](examples/nested_graphs.py)!

Using the command pattern with flowpipe successfully:
[workflow_design_pattern.py](examples/workflow_design_pattern.py)!

Expand Down
65 changes: 65 additions & 0 deletions examples/nested_graphs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Nested graphs are supported in flowpipe."""
from flowpipe import Graph, Node


@Node(outputs=['file'])
def MyNode(file):
# Something is done in here ...
return {'file': file}


# A graph that fixes an incoming file, cleaning up messy names etc.
#
# +-----------------------+ +-------------------------+
# | Cleanup Filename | | Change Lineendings |
# |-----------------------| |-------------------------|
# o file<> | +--->o file<> |
# | file o-----+ | file o
# +-----------------------+ +-------------------------+
fix_file = Graph(name="fix_file")
cleanup_filename = MyNode(name="Cleanup Filename", graph=fix_file)
change_lineendings = MyNode(name="Change Lineendings", graph=fix_file)
cleanup_filename.outputs["file"].connect(change_lineendings.inputs["file"])


# A second graph reads finds files, and extracts their contents into a database
# +----------------+ +----------------------------+ +----------------+
# | Find File | | Read Values from File | | Update DB |
# |----------------| |----------------------------| |----------------|
# o file<> | +--->o file<> | +--->o file<> |
# | file o-----+ | file o-----+ | file o
# +----------------+ +----------------------------+ +----------------+
udpate_db_from_file = Graph(name="udpate_db_from_file")
find_file = MyNode(name="Find File", graph=udpate_db_from_file)
values_from_file = MyNode(name="Read Values from File", graph=udpate_db_from_file)
update_db = MyNode(name="Update DB", graph=udpate_db_from_file)
find_file.outputs["file"].connect(values_from_file.inputs["file"])
values_from_file.outputs["file"].connect(update_db.inputs["file"])


# The second graph however relies on clean input files so the first graph can
# be used within the second "udpate db" graph.
# For this purpose, graphs can promote input and output plugs from their nodes
# to the graph level, making other graphs aware of them:
fix_file["Cleanup Filename"].inputs["file"].promote_to_graph(name="file_to_clean")
fix_file["Change Lineendings"].outputs["file"].promote_to_graph(name="clean_file")

# Now the update_db graph can connect nodes to the fix_file graph
find_file.outputs["file"].connect(fix_file.inputs["file_to_clean"])
fix_file.outputs["clean_file"].connect(udpate_db_from_file["Read Values from File"].inputs["file"])


# The result now looks like this:
#
# +---udpate_db_from_file----+ +-------fix_file--------+ +--------fix_file---------+ +----udpate_db_from_file-----+ +---udpate_db_from_file----+
# | Find File | | Cleanup Filename | | Change Lineendings | | Read Values from File | | Update DB |
# |--------------------------| |-----------------------| |-------------------------| |----------------------------| |--------------------------|
# o file<> | +--->o file<> | +--->o file<> | +--->o file<> | +--->o file<> |
# | file o-----+ | file o-----+ | file o-----+ | file o-----+ | file o
# +--------------------------+ +-----------------------+ +-------------------------+ +----------------------------+ +--------------------------+
print(fix_file)


# Subgraphs can be accessed by their name from any participating graph
assert udpate_db_from_file.subgraphs["fix_file"] is fix_file
assert fix_file.subgraphs["udpate_db_from_file"] is udpate_db_from_file
12 changes: 6 additions & 6 deletions examples/world_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
| converted_time o-----+
+---------------------+
"""
import datetime
from datetime import datetime
from time import time

from flowpipe import Graph, INode, Node, InputPlug, OutputPlug

Expand All @@ -36,8 +37,7 @@ def CurrentTime():
Any arguments to the function are used as input plugs to the Node.
The outputs are defined in the decorator explicitely.
"""
utc_now = datetime.datetime.utcnow()
return {'time': utc_now}
return {'time': time()}


class ConvertTime(INode):
Expand All @@ -55,7 +55,7 @@ def __init__(self, time=None, timezone=0, **kwargs):

def compute(self, time, timezone):
return {
'converted_time': time + datetime.timedelta(hours=timezone)
'converted_time': time + timezone * 60 * 60
}


Expand All @@ -64,8 +64,8 @@ def ShowTimes(times):
"""Nodes do not necessarily have to define output and input plugs."""
print('-- World Clock -------------------')
for location, t in times.items():
print('It is now: {time} in {location}'.format(
time=t.strftime("%Y-%m-%d %H:%M:%S"), location=location))
print('It is now: {time:%H:%M} in {location}'.format(
time=datetime.fromtimestamp(t), location=location))
print('----------------------------------')


Expand Down
5 changes: 5 additions & 0 deletions flowpipe/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Exceptions raised by flowpipe."""


class CycleError(Exception):
"""Raised when an action would result in a cycle in a graph."""
148 changes: 127 additions & 21 deletions flowpipe/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from ascii_canvas import canvas
from ascii_canvas import item

from .errors import CycleError
from .plug import InputPlug, OutputPlug
from .utilities import deserialize_graph


Expand All @@ -25,9 +27,11 @@ class Graph(object):
"""A graph of Nodes."""

def __init__(self, name=None, nodes=None):
"""Initialize the list of Nodes."""
"""Initialize the list of Nodes, inputs and outpus."""
self.name = name or self.__class__.__name__
self._nodes = nodes or []
self.nodes = nodes or []
self.inputs = {}
self.outputs = {}

def __unicode__(self):
"""Display the Graph."""
Expand All @@ -46,15 +50,36 @@ def __getitem__(self, key):
"Graph does not contain a Node named '{0}'".format(key))

@property
def nodes(self):
"""Aggregate the Nodes of this Graph and all it's sub graphs."""
nodes = []
for node in self._nodes:
if isinstance(node, Graph):
nodes += node.nodes
else:
nodes.append(node)
return nodes
def all_nodes(self):
"""Expand the graph with all its subgraphs into a flat list of nodes.
Please note that in this expanded list, the node names are no longer
guaranteed to be unique!
Returns:
(list of INode): All nodes, including the nodes from subgraphs
"""
nodes = [n for n in self.nodes]
for subgraph in self.subgraphs.values():
nodes += subgraph.nodes
return list(set(nodes))

@property
def subgraphs(self):
"""All other graphs that the nodes of this graph are connected to.
Returns:
A dict in the form of {graph.name: graph}
"""
subgraphs = {}
for node in self.nodes:
for downstream in node.downstream_nodes:
if downstream.graph is not self:
subgraphs[downstream.graph.name] = downstream.graph
for upstream in node.upstream_nodes:
if upstream.graph is not self:
subgraphs[upstream.graph.name] = upstream.graph
return subgraphs

@property
def evaluation_matrix(self):
Expand All @@ -70,7 +95,7 @@ def evaluation_matrix(self):
"""
levels = {}

for node in self.nodes:
for node in self.all_nodes:
self._sort_node(node, levels, level=0)

matrix = []
Expand Down Expand Up @@ -105,11 +130,80 @@ def add_node(self, node):
"Can not add Node of name '{0}', a Node with this "
"name already exists on this Graph. Node names on "
"a Graph have to be unique.".format(node.name))
self._nodes.append(node)
self.nodes.append(node)
node.graph = self
else:
log.warning(
'Node "{0}" is already part of this Graph'.format(node.name))

def add_plug(self, plug, name=None):
"""Promote the given plug this graph.
Args:
plug (flowpipe.plug.IPlug): The plug to promote to this graph
name (str): Optionally use the given name instead of the name of
the given plug
"""
if isinstance(plug, InputPlug):
if plug not in self.inputs.values():
self.inputs[name or plug.name] = plug
else:
key = list(self.inputs.keys())[
list(self.inputs.values()).index(plug)]
raise ValueError(
"The given plug '{0}' has already been promoted to this "
"Graph und the key '{1}'".format(plug.name, key))
elif isinstance(plug, OutputPlug):
if plug not in self.outputs.values():
self.outputs[name or plug.name] = plug
else:
key = list(self.outputs.keys())[
list(self.outputs.values()).index(plug)]
raise ValueError(
"The given plug {0} has already been promoted to this "
"Graph und the key '{1}'".format(plug.name, key))
else:
raise TypeError(
"Plugs of type '{0}' can not be promoted directly to a Graph. "
"Only plugs of type '{1}' or '{2}' can be promoted.".format(
type(plug), InputPlug, OutputPlug))

def accepts_connection(self, output_plug, input_plug):
"""Raise exception if new connection would violate integrity of graph.
Args:
output_plug (flowpipe.plug.OutputPlug): The output plug
input_plug (flowpipe.plug.InputPlug): The input plug
Raises:
CycleError and ValueError
Returns:
True if the connection is accepted
"""
out_node = output_plug.node
in_node = input_plug.node

# Plugs can't be connected to other plugs on their own node
if in_node is out_node:
raise CycleError(
'Can\'t connect plugs that are part of the same node.')

# If that is downstream of this
if out_node in in_node.downstream_nodes:
raise CycleError(
'Can\'t connect OutputPlugs to plugs of an upstream node.')

# Names of subgraphs have to be unique
if (
in_node.graph.name in self.subgraphs and
in_node.graph not in self.subgraphs.values()):
raise ValueError(
"This node is part of graph '{0}', but a different "
"graph with the same name is already part of this "
"graph. Subgraph names on a Graph have to "
"be unique".format(in_node.graph.name))

return True

def evaluate(self, mode="linear", skip_clean=False,
submission_delay=0.1, raise_after=None):
"""Evaluate all Nodes in the graph.
Expand Down Expand Up @@ -268,7 +362,7 @@ def to_json(self):
return self._serialize()

def serialize(self): # pragma: no cover
"""Serialize the graph in it's grid form.
"""Serialize the graph in its grid form.
Deprecated.
"""
Expand All @@ -278,13 +372,22 @@ def serialize(self): # pragma: no cover

return self._serialize()

def _serialize(self):
"""Serialize the graph in it's grid form."""
def _serialize(self, with_subgraphs=True):
"""Serialize the graph in its grid form.
Args:
with_subgraphs (bool): Set to false to avoid infinite recursion
"""
data = OrderedDict(
module=self.__module__,
cls=self.__class__.__name__,
name=self.name)
data['nodes'] = [node.to_json() for node in self.nodes]
if with_subgraphs:
data['subgraphs'] = [
graph._serialize(with_subgraphs=False)
for graph in sorted(
self.subgraphs.values(), key=lambda g: g.name)]
return data

@staticmethod
Expand Down Expand Up @@ -320,10 +423,13 @@ def node_repr(self):
"""Format to visualize the Graph."""
canvas_ = canvas.Canvas()
x = 0
for row in self.evaluation_matrix:

evaluation_matrix = self.evaluation_matrix

for row in evaluation_matrix:
y = 0
x_diff = 0
for j, node in enumerate(row):
for node in row:
item_ = item.Item(str(node), [x, y])
node.item = item_
x_diff = (item_.bbox[2] - item_.bbox[0] + 4 if
Expand All @@ -332,13 +438,13 @@ def node_repr(self):
canvas_.add_item(item_)
x += x_diff

for node in self.nodes:
for j, plug in enumerate(node._sort_plugs(node.all_outputs())):
for node in self.all_nodes:
for i, plug in enumerate(node._sort_plugs(node.all_outputs())):
for connection in node._sort_plugs(
node.all_outputs())[plug].connections:
dnode = connection.node
start = [node.item.position[0] + node.item.bbox[2],
node.item.position[1] + 3 + len(node.all_inputs()) + j]
node.item.position[1] + 3 + len(node.all_inputs()) + i]
end = [dnode.item.position[0],
dnode.item.position[1] + 3 +
list(dnode._sort_plugs(
Expand Down
Loading

0 comments on commit 6ea0913

Please sign in to comment.