Source code for openmdao.main.dataflow
""" A workflow where the execution order is automatically inferred from the
data connections."""
import networkx as nx
from networkx.algorithms.components import strongly_connected_components
from networkx.algorithms.dag import is_directed_acyclic_graph
from openmdao.main.sequentialflow import SequentialWorkflow
from openmdao.main.interfaces import IDriver
from openmdao.main.mp_support import has_interface
__all__ = ['Dataflow']
[docs]class Dataflow(SequentialWorkflow):
"""
A Dataflow consists of a collection of Components which are executed in
data flow order.
"""
def __init__(self, parent=None, scope=None, members=None):
""" Create an empty flow. """
super(Dataflow, self).__init__(parent, scope, members)
self.config_changed()
def __iter__(self):
"""Iterate through the nodes in dataflow order."""
# resolve all of the components up front so if there's a problem
# it will fail early and not waste time running components
scope = self.scope
return [getattr(scope, n) for n in self._get_topsort()].__iter__()
[docs] def check_config(self):
"""Check for cyclic graph."""
super(Dataflow, self).check_config()
graph = self._get_collapsed_graph()
if not is_directed_acyclic_graph(graph):
# do a little extra work here to give more info to the user
# in the error message
strcon = strongly_connected_components(graph)
self.scope.raise_exception('circular dependency found between'
' the following: %s'
% str(strcon[0]), RuntimeError)
[docs] def config_changed(self):
"""Notifies the Workflow that its configuration (dependencies, etc.)
has changed.
"""
super(Dataflow, self).config_changed()
self._collapsed_graph = None
self._topsort = None
self._duplicates = None
def _get_topsort(self):
if self._topsort is None:
graph = self._get_collapsed_graph()
try:
self._topsort = nx.topological_sort(graph)
except nx.NetworkXUnfeasible:
# do a little extra work here to give more info to the user
# in the error message
strcon = strongly_connected_components(graph)
self.scope.raise_exception('circular dependency found between'
' the following: %s'
% str(strcon[0]), RuntimeError)
if self._duplicates:
self._insert_duplicates()
return self._topsort
def _get_collapsed_graph(self):
"""Get a dependency graph with only our workflow components
in it, with additional edges added to it from sub-workflows
of any Driver components in our workflow, and from any ExprEvaluators
in any components in our workflow.
"""
if self._collapsed_graph:
return self._collapsed_graph
to_add = []
scope = self.scope
graph = scope._depgraph
# find all of the incoming and outgoing edges to/from all of the
# components in each driver's iteration set so we can add edges to/from
# the driver in our collapsed graph
comps = self.get_components(full=True)
cnames = set([c.name for c in comps])
removes = set()
itersets = {}
graph_with_subs = graph.component_graph()
collapsed_graph = graph_with_subs.subgraph(cnames)
# TODO - Is this recursive?
for comp in comps:
cname = comp.name
if has_interface(comp, IDriver):
iterset = [c.name for c in comp.iteration_set()]
itersets[cname] = iterset
removes.update(iterset)
for u,v in graph_with_subs.edges_iter(nbunch=iterset): # outgoing edges
if v != cname and v not in iterset and not v.startswith('_pseudo_'):
collapsed_graph.add_edge(cname, v)
for u,v in graph_with_subs.in_edges_iter(nbunch=iterset): # incoming edges
if u != cname and u not in iterset and not u.startswith('_pseudo_'):
collapsed_graph.add_edge(u, cname)
# connect all of the edges from each driver's iterset members to itself
# For this, we need the graph with the subdriver itersets all still in it.
to_add = []
for drv, iterset in itersets.items():
for cname in iterset:
for u, v in graph_with_subs.edges_iter(cname):
if v != drv:
to_add.append((drv, v))
for u, v in graph_with_subs.in_edges_iter(cname):
if u != drv:
to_add.append((u, drv))
collapsed_graph.add_edges_from(to_add)
collapsed_graph = collapsed_graph.subgraph(cnames-removes)
# now add some fake dependencies for degree 0 nodes in an attempt to
# mimic a SequentialWorkflow in cases where nodes aren't connected.
# Edges are added from each degree 0 node to all nodes after it in
# sequence order.
self._duplicates = set()
last = len(self._names)-1
if last > 0:
to_add = []
for i, cname in enumerate(self._names):
if collapsed_graph.degree(cname) == 0:
if self._names.count(cname) > 1:
# Don't introduce circular dependencies.
self._duplicates.add(cname)
else:
if i < last:
for n in self._names[i+1:]:
to_add.append((cname, n))
else:
for n in self._names[0:i]:
to_add.append((n, cname))
collapsed_graph.add_edges_from([(u,v) for u,v in to_add
if u in collapsed_graph and v in collapsed_graph])
self._collapsed_graph = collapsed_graph
return self._collapsed_graph
def _insert_duplicates(self):
"""We have some duplicate unconnected components. Adjust order
to include duplicates in 'sequential' order.
"""
# Remove (single instance of) duplicates from topo sort.
topsort = self._topsort
for cname in self._duplicates:
topsort.remove(cname)
# For each name in sequential order, if it's a duplicate we need
# to insert it after all of its (possibly duplicated) predecessors.
for i, cname in enumerate(self._names):
if cname in self._duplicates:
predecessors = self._names[0:i]
max_index = -1
for pname in predecessors:
start = 0
index = -1
for j in range(predecessors.count(pname)):
index = topsort.index(pname, start)
start = index + 1
max_index = max(index, max_index)
topsort.insert(max_index+1, cname)