Source code for openmdao.main.workflow

""" Base class for all workflows. """

# pylint: disable-msg=E0611,F0401
from openmdao.main.exceptions import RunStopped
from openmdao.main.pseudocomp import PseudoComponent

__all__ = ['Workflow']


[docs]class Workflow(object): """ A Workflow consists of a collection of Components which are to be executed in some order. """ def __init__(self, parent=None, scope=None, members=None): """Create a Workflow. parent: Driver (optional) The Driver that contains this Workflow. This option is normally passed instead of scope because scope usually isn't known at initialization time. If scope is not provided, it will be set to parent.parent, which should be the Assembly that contains the parent Driver. scope: Component (optional) The scope can be explicitly specified here, but this is not typically known at initialization time. members: list of str (optional) A list of names of Components to add to this workflow. """ self._iterator = None self._stop = False self._parent = parent self._scope = scope self._exec_count = 0 # Workflow executions since reset. self._initial_count = 0 # Value to reset to (typically zero). self._comp_count = 0 # Component index in workflow. if members: for member in members: if not isinstance(member, basestring): raise TypeError("Components must be added to a workflow by name.") self.add(member) @property def scope(self): """The scoping Component that is used to resolve the Component names in this Workflow. """ if self._scope is None and self._parent is not None: self._scope = self._parent.get_expr_scope() if self._scope is None: raise RuntimeError("workflow has no scope!") return self._scope @scope.setter
[docs] def scope(self, scope): self._scope = scope self.config_changed()
@property
[docs] def itername(self): return self._iterbase('')
[docs] def check_config(self): """Perform any checks that we need prior to run. Specific workflows should override this.""" pass
[docs] def set_initial_count(self, count): """ Set initial value for execution count. Only needed if the iteration coordinates must be initialized, such as for CaseIterDriverBase. count: int Initial value for workflow execution count. """ self._initial_count = count - 1 # run() and step() will increment.
[docs] def reset(self): """ Reset execution count. """ self._exec_count = self._initial_count
[docs] def run(self, ffd_order=0, case_id=''): """ Run the Components in this Workflow. """ self._stop = False self._iterator = self.__iter__() self._exec_count += 1 self._comp_count = 0 iterbase = self._iterbase(case_id) for comp in self._iterator: if isinstance(comp, PseudoComponent): comp.run(ffd_order=ffd_order, case_id=case_id) else: self._comp_count += 1 comp.set_itername('%s-%d' % (iterbase, self._comp_count)) comp.run(ffd_order=ffd_order, case_id=case_id) if self._stop: raise RunStopped('Stop requested') self._iterator = None
def _iterbase(self, case_id): """ Return base for 'iteration coordinates'. """ if self._parent is None: return str(self._exec_count) # An unusual case. else: prefix = self._parent.get_itername() if not prefix: prefix = case_id if prefix: prefix += '.' return '%s%d' % (prefix, self._exec_count)
[docs] def step(self, ffd_order=0, case_id=''): """Run a single component in this Workflow.""" if self._iterator is None: self._iterator = self.__iter__() self._exec_count += 1 self._comp_count = 0 comp = self._iterator.next() self._comp_count += 1 iterbase = self._iterbase(case_id) comp.set_itername('%s-%d' % (iterbase, self._comp_count)) try: comp.run(ffd_order=ffd_order, case_id=case_id) except StopIteration, err: self._iterator = None raise err raise RunStopped('Step complete')
[docs] def stop(self): """ Stop all Components in this Workflow. We assume it's OK to to call stop() on something that isn't running. """ for comp in self.get_components(full=True): comp.stop() self._stop = True
[docs] def add(self, compnames, index=None, check=False): """ Add new component(s) to the workflow by name.""" raise NotImplementedError("This Workflow has no 'add' function")
[docs] def config_changed(self): """Notifies the Workflow that workflow configuration (dependencies, etc.) has changed. """ pass
[docs] def remove(self, comp): """Remove a component from this Workflow by name.""" raise NotImplementedError("This Workflow has no 'remove' function")
[docs] def get_names(self, full=False): """Return a list of component names in this workflow.""" raise NotImplementedError("This Workflow has no 'get_names' function")
[docs] def get_components(self, full=False): """Returns a list of all component objects in the workflow. No ordering is assumed. """ scope = self.scope return [getattr(scope, name) for name in self.get_names(full)]
def __iter__(self): """Returns an iterator over the components in the workflow in some order. """ raise NotImplementedError("This Workflow has no '__iter__' function") def __len__(self): raise NotImplementedError("This Workflow has no '__len__' function")
OpenMDAO Home