Source code for openmdao.main.workflow
""" Base class for all workflows. """
# pylint: disable-msg=E0611,F0401
from openmdao.main.exceptions import RunStopped
from openmdao.main.pseudocomp import PseudoComponent
__all__ = ['Workflow']
[docs]class Workflow(object):
"""
A Workflow consists of a collection of Components which are to be executed
in some order.
"""
def __init__(self, parent=None, scope=None, members=None):
"""Create a Workflow.
parent: Driver (optional)
The Driver that contains this Workflow. This option is normally
passed instead of scope because scope usually isn't known at
initialization time. If scope is not provided, it will be
set to parent.parent, which should be the Assembly that contains
the parent Driver.
scope: Component (optional)
The scope can be explicitly specified here, but this is not
typically known at initialization time.
members: list of str (optional)
A list of names of Components to add to this workflow.
"""
self._iterator = None
self._stop = False
self._parent = parent
self._scope = scope
self._exec_count = 0 # Workflow executions since reset.
self._initial_count = 0 # Value to reset to (typically zero).
self._comp_count = 0 # Component index in workflow.
if members:
for member in members:
if not isinstance(member, basestring):
raise TypeError("Components must be added to a workflow by name.")
self.add(member)
@property
def scope(self):
"""The scoping Component that is used to resolve the Component names in
this Workflow.
"""
if self._scope is None and self._parent is not None:
self._scope = self._parent.get_expr_scope()
if self._scope is None:
raise RuntimeError("workflow has no scope!")
return self._scope
@scope.setter
[docs] def scope(self, scope):
self._scope = scope
self.config_changed()
@property
[docs] def itername(self):
return self._iterbase('')
[docs] def check_config(self):
"""Perform any checks that we need prior to run. Specific workflows
should override this."""
pass
[docs] def set_initial_count(self, count):
"""
Set initial value for execution count. Only needed if the iteration
coordinates must be initialized, such as for CaseIterDriverBase.
count: int
Initial value for workflow execution count.
"""
self._initial_count = count - 1 # run() and step() will increment.
[docs] def reset(self):
""" Reset execution count. """
self._exec_count = self._initial_count
[docs] def run(self, ffd_order=0, case_id=''):
""" Run the Components in this Workflow. """
self._stop = False
self._iterator = self.__iter__()
self._exec_count += 1
self._comp_count = 0
iterbase = self._iterbase(case_id)
for comp in self._iterator:
if isinstance(comp, PseudoComponent):
comp.run(ffd_order=ffd_order, case_id=case_id)
else:
self._comp_count += 1
comp.set_itername('%s-%d' % (iterbase, self._comp_count))
comp.run(ffd_order=ffd_order, case_id=case_id)
if self._stop:
raise RunStopped('Stop requested')
self._iterator = None
def _iterbase(self, case_id):
""" Return base for 'iteration coordinates'. """
if self._parent is None:
return str(self._exec_count) # An unusual case.
else:
prefix = self._parent.get_itername()
if not prefix:
prefix = case_id
if prefix:
prefix += '.'
return '%s%d' % (prefix, self._exec_count)
[docs] def step(self, ffd_order=0, case_id=''):
"""Run a single component in this Workflow."""
if self._iterator is None:
self._iterator = self.__iter__()
self._exec_count += 1
self._comp_count = 0
comp = self._iterator.next()
self._comp_count += 1
iterbase = self._iterbase(case_id)
comp.set_itername('%s-%d' % (iterbase, self._comp_count))
try:
comp.run(ffd_order=ffd_order, case_id=case_id)
except StopIteration, err:
self._iterator = None
raise err
raise RunStopped('Step complete')
[docs] def stop(self):
"""
Stop all Components in this Workflow.
We assume it's OK to to call stop() on something that isn't running.
"""
for comp in self.get_components(full=True):
comp.stop()
self._stop = True
[docs] def add(self, compnames, index=None, check=False):
""" Add new component(s) to the workflow by name."""
raise NotImplementedError("This Workflow has no 'add' function")
[docs] def config_changed(self):
"""Notifies the Workflow that workflow configuration
(dependencies, etc.) has changed.
"""
pass
[docs] def remove(self, comp):
"""Remove a component from this Workflow by name."""
raise NotImplementedError("This Workflow has no 'remove' function")
[docs] def get_names(self, full=False):
"""Return a list of component names in this workflow."""
raise NotImplementedError("This Workflow has no 'get_names' function")
[docs] def get_components(self, full=False):
"""Returns a list of all component objects in the workflow. No ordering
is assumed.
"""
scope = self.scope
return [getattr(scope, name) for name in self.get_names(full)]
def __iter__(self):
"""Returns an iterator over the components in the workflow in
some order.
"""
raise NotImplementedError("This Workflow has no '__iter__' function")
def __len__(self):
raise NotImplementedError("This Workflow has no '__len__' function")