""" A workflow that contains cyclic graphs. Note that a special solver is
required to converge this workflow in order to execute it. """
import networkx as nx
from ordereddict import OrderedDict
from networkx.algorithms.components import strongly_connected_components
try:
from numpy import ndarray
except ImportError as err:
import logging
logging.warn("In %s: %r", __file__, err)
from openmdao.main.numpy_fallback import ndarray
from openmdao.main.array_helpers import flattened_value
from openmdao.main.interfaces import IDriver
from openmdao.main.mp_support import has_interface
from openmdao.main.depgraph import edge_dict_to_comp_list
from openmdao.main.pseudoassembly import from_PA_var, to_PA_var
from openmdao.main.sequentialflow import SequentialWorkflow
from openmdao.main.vartree import VariableTree
__all__ = ['CyclicWorkflow']
# SequentialWorkflow gives us the add and remove methods.
[docs]class CyclicWorkflow(SequentialWorkflow):
"""A CyclicWorkflow consists of a collection of Components that contains
loops in the graph.
"""
def __init__(self, parent=None, scope=None, members=None):
""" Create an empty flow. """
super(CyclicWorkflow, self).__init__(parent, scope, members)
self.config_changed()
[docs] def config_changed(self):
"""Notifies the Workflow that its configuration (dependencies, etc.)
has changed.
"""
super(CyclicWorkflow, self).config_changed()
self._workflow_graph = None
self._topsort = None
self._severed_edges = []
self._mapped_severed_edges = []
[docs] def check_config(self):
"""Any checks we need. For now, drivers are not allowed. You can get
around this by placing them in an assembly."""
super(CyclicWorkflow, self).check_config()
for comp in self.get_components():
if has_interface(comp, IDriver):
msg = 'Subdriver not supported in a cyclicflow. Please ' \
'place it in a subassembly.'
self.scope.raise_exception(msg, RuntimeError)
[docs] def add(self, compnames, index=None, check=False):
""" Add new component(s) to the workflow by name. """
super(CyclicWorkflow, self).add(compnames, index, check)
self.config_changed()
[docs] def remove(self, compname):
"""Remove a component from this Workflow by name."""
super(CyclicWorkflow, self).remove(compname)
self.config_changed()
def __iter__(self):
"""Iterate through the nodes in some proper order."""
# resolve all of the components up front so if there's a problem it'll
# fail early and not waste time running components
scope = self.scope
return [getattr(scope, n) for n in self._get_topsort()].__iter__()
def _get_topsort(self):
""" Return a sorted list of components in the workflow.
"""
if self._topsort is None:
graph = nx.DiGraph(self._get_collapsed_graph())
cyclic = True
self._severed_edges = []
while cyclic:
try:
self._topsort = nx.topological_sort(graph)
cyclic = False
except nx.NetworkXUnfeasible:
strong = strongly_connected_components(graph)
# We may have multiple loops. We only deal with one at
# a time because multiple loops create some non-unique
# paths.
strong = strong[0]
# Break one edge of the loop.
# For now, just break the first edge.
# TODO: smarter ways to choose edge to break.
graph.remove_edge(strong[-1], strong[0])
# Keep a list of the edges we break, so that a solver
# can use them as its independents/dependents.
depgraph = self.scope._depgraph
edge_set = set(depgraph.get_directional_interior_edges(strong[-1],
strong[0]))
self._severed_edges += list(edge_set)
return self._topsort
def _get_collapsed_graph(self):
"""Get a dependency graph with only our workflow components
in it. This graph can be cyclic."""
# Cached
if self._workflow_graph is None:
contents = self.get_components()
# get the parent assembly's component graph
scope = self.scope
compgraph = scope._depgraph.component_graph()
graph = compgraph.subgraph([c.name for c in contents])
# add any dependencies due to ExprEvaluators
for comp in contents:
graph.add_edges_from([tup for tup in comp.get_expr_depends()])
self._workflow_graph = graph
return self._workflow_graph
[docs] def initialize_residual(self):
"""Creates the array that stores the residual. Also returns the
number of edges.
"""
dgraph = self.derivative_graph()
# We need to map any of our edges if they are in a
# pseudo-assy
comps = edge_dict_to_comp_list(dgraph, self.edge_list())
pa_keys = [name for name in comps if '~' in name]
if len(pa_keys) == 0:
self._mapped_severed_edges = self._severed_edges
else:
self._mapped_severed_edges = []
for src, target in self._severed_edges:
compname, _, varname = src.partition('.')
for pa_key in pa_keys:
pseudo = dgraph.node[pa_key]['pa_object']
if src in pseudo.outputs:
src = to_PA_var(src, pseudo.name)
break
compname, _, varname = target.partition('.')
for pa_key in pa_keys:
pseudo = dgraph.node[pa_key]['pa_object']
flat_inputs = set()
for item in pseudo.inputs:
subset = set(item)
flat_inputs = flat_inputs.union(subset)
if target in flat_inputs:
target = to_PA_var(target, pseudo.name)
break
self._mapped_severed_edges.append((src, target))
return super(CyclicWorkflow, self).initialize_residual()
[docs] def derivative_graph(self, inputs=None, outputs=None, fd=False):
"""Returns the local graph that we use for derivatives. For cyclic flows,
we need to sever edges and use them as inputs/outputs.
"""
if self._derivative_graph is None:
# Cyclic flows need to be severed before derivatives are calculated.
self._get_topsort()
inputs = []
outputs = []
for src, target in self._severed_edges:
inputs.append(target)
outputs.append(src)
super(CyclicWorkflow, self).derivative_graph(inputs, outputs, fd,
self._severed_edges)
return self._derivative_graph
[docs] def edge_list(self):
""" Return the list of edges for the derivatives of this workflow. """
self._edges = super(CyclicWorkflow, self).edge_list()
# TODO: Shouldn't have to do this everytime.
if len(self._mapped_severed_edges) > 0:
cyclic_edges = OrderedDict()
for edge in self._mapped_severed_edges:
cyclic_edges[edge[0]] = edge[1]
# Finally, modify our edge list to include the severed edges, and exclude
# the boundary edges.
for src, targets in self._edges.iteritems():
if '@in' not in src:
if isinstance(targets, str):
targets = [targets]
newtargets = []
for target in targets:
if '@out' not in target:
newtargets.append(target)
if len(newtargets) > 0:
cyclic_edges[src] = newtargets
self._edges = cyclic_edges
return self._edges
[docs] def set_new_state(self, dv):
"""Adds a vector of new values to the current model state at the
input edges.
dv: ndarray (nEdge, 1)
Array of values to add to the model inputs.
"""
for src, targets in self._mapped_severed_edges:
i1, i2 = self.get_bounds(src)
if isinstance(targets, str):
targets = [targets]
for target in targets:
target = from_PA_var(target)
old_val = self.scope.get(target)
if isinstance(old_val, float):
new_val = old_val + float(dv[i1:i2])
elif isinstance(old_val, ndarray):
shape = old_val.shape
if len(shape) > 1:
new_val = old_val.flatten() + dv[i1:i2]
new_val = new_val.reshape(shape)
else:
new_val = old_val + dv[i1:i2]
elif isinstance(old_val, VariableTree):
new_val = old_val.copy()
self._update(target, new_val, dv[i1:i2])
else:
msg = "Variable %s is of type %s." % (target, type(old_val)) + \
" This type is not supported by the MDA Solver."
self.scope.raise_exception(msg, RuntimeError)
# Poke new value into the input end of the edge.
self.scope.set(target, new_val, force=True)
# Prevent OpenMDAO from stomping on our poked input.
self.scope.set_valid([target.split('[',1)[0]], True)
#(An alternative way to prevent the stomping. This is more
#concise, but setting an output and allowing OpenMDAO to pull it
#felt hackish.)
#self.scope.set(src, new_val, force=True)
[docs] def calculate_residuals(self):
"""Calculate and return the vector of residuals based on the current
state of the system in our workflow."""
for src, targets in self._edges.iteritems():
i1, i2 = self.get_bounds(src)
src_val = self.scope.get(from_PA_var(src))
src_val = flattened_value(src, src_val).reshape(-1, 1)
if isinstance(targets, str):
targets = [targets]
for target in targets:
target_val = self.scope.get(from_PA_var(target))
target_val = flattened_value(target, target_val).reshape(-1, 1)
self.res[i1:i2] = src_val - target_val
return self.res