"""
Functions in the HasConstraints, HasEqConstraints, and HasIneqConstraints
interfaces.
"""
# pylint: disable-msg=E0611,F0401
import operator
import ordereddict
from openmdao.main.expreval import ExprEvaluator
from openmdao.main.pseudocomp import PseudoComponent, _remove_spaces
_ops = {
'>': operator.gt,
'<': operator.lt,
'>=': operator.ge,
'<=': operator.le,
'=': operator.eq,
}
def _check_expr(expr):
""" force checking for existence of vars referenced in expression """
if not expr.check_resolve():
raise ValueError("Invalid expression '%s'" % str(expr))
def _parse_constraint(expr_string):
""" Parses the constraint expression string and returns the lhs string,
the rhs string, and comparator
"""
for comparator in ['==', '>=', '<=', '>', '<', '=']:
parts = expr_string.split(comparator)
if len(parts) > 1:
if comparator == '==': # check for == because otherwise they get a cryptic error msg
break
return (parts[0].strip(), comparator, parts[1].strip())
msg = "Constraints require an explicit comparator (=, <, >, <=, or >=)"
raise ValueError(msg)
def _get_scope(obj, scope=None):
if scope is None:
try:
return obj._parent.get_expr_scope()
except AttributeError:
pass
return scope
[docs]class Constraint(object):
""" Object that stores info for a single constraint. """
def __init__(self, lhs, comparator, rhs, scope):
self.lhs = ExprEvaluator(lhs, scope=scope)
if not self.lhs.check_resolve():
raise ValueError("Constraint '%s' has an invalid left-hand-side."
% ' '.join([lhs, comparator, rhs]))
self.comparator = comparator
self.rhs = ExprEvaluator(rhs, scope=scope)
if not self.rhs.check_resolve():
raise ValueError("Constraint '%s' has an invalid right-hand-side."
% ' '.join([lhs, comparator, rhs]))
self.pcomp_name = None
[docs] def activate(self):
"""Make this constraint active by creating the appropriate
connections in the dependency graph.
"""
if self.pcomp_name is None:
pseudo = PseudoComponent(self.lhs.scope, self._combined_expr(),
pseudo_type='constraint')
self.pcomp_name = pseudo.name
self.lhs.scope.add(pseudo.name, pseudo)
getattr(self.lhs.scope, pseudo.name).make_connections(self.lhs.scope)
[docs] def deactivate(self):
"""Remove this constraint from the dependency graph and remove
its pseudocomp from the scoping object.
"""
if self.pcomp_name:
scope = self.lhs.scope
try:
pcomp = getattr(scope, self.pcomp_name)
except AttributeError:
pass
else:
# pcomp.remove_connections(scope)
# if hasattr(scope, pcomp.name):
scope.remove(pcomp.name)
finally:
self.pcomp_name = None
def _combined_expr(self):
"""Given a constraint object, take the lhs, operator, and
rhs and combine them into a single expression by moving rhs
terms over to the lhs. For example,
for the constraint 'C1.x < C2.y + 7', return the expression
'C1.x - C2.y - 7'. Depending on the direction of the operator,
the sign of the expression may be flipped. The final form of
the constraint, when evaluated, will be considered to be satisfied
if it evaluates to a value <= 0.
"""
scope = self.lhs.scope
if self.comparator.startswith('>'):
first = self.rhs.text
second = self.lhs.text
else:
first = self.lhs.text
second = self.rhs.text
first_zero = False
try:
f = float(first)
except:
pass
else:
if f == 0:
first_zero = True
second_zero = False
try:
f = float(second)
except:
pass
else:
if f == 0:
second_zero = True
if first_zero:
newexpr = "-(%s)" % second
elif second_zero:
newexpr = "%s" % first
else:
newexpr = '%s-(%s)' % (first, second)
return ExprEvaluator(newexpr, scope)
[docs] def copy(self):
cnst = Constraint(str(self.lhs), self.comparator, str(self.rhs),
scope=self.lhs.scope)
return cnst
[docs] def evaluate(self, scope):
"""Returns the value of the constraint."""
pcomp = getattr(scope, self.pcomp_name)
if not pcomp.is_valid():
pcomp.update_outputs(['out0'])
return pcomp.out0
[docs] def evaluate_gradient(self, scope, stepsize=1.0e-6, wrt=None):
"""Returns the gradient of the constraint eq/ineq as a tuple of the
form (lhs, rhs, comparator, is_violated)."""
lhs = self.lhs.evaluate_gradient(scope=scope, stepsize=stepsize, wrt=wrt)
if isinstance(self.rhs, float):
rhs = 0.
else:
rhs = self.rhs.evaluate_gradient(scope=scope, stepsize=stepsize, wrt=wrt)
return (lhs, rhs, self.comparator, not _ops[self.comparator](lhs, rhs))
[docs] def get_referenced_compnames(self):
if isinstance(self.rhs, float):
return self.lhs.get_referenced_compnames()
else:
return self.lhs.get_referenced_compnames().union(self.rhs.get_referenced_compnames())
[docs] def get_referenced_varpaths(self, copy=True):
if isinstance(self.rhs, float):
return self.lhs.get_referenced_varpaths(copy=copy)
else:
return self.lhs.get_referenced_varpaths(copy=copy).union(
self.rhs.get_referenced_varpaths(copy=copy))
def __str__(self):
return ' '.join([str(self.lhs), self.comparator, str(self.rhs)])
def __eq__(self, other):
if not isinstance(other, Constraint):
return False
return (self.lhs, self.comparator, self.rhs) == \
(other.lhs, other.comparator, other.rhs)
class _HasConstraintsBase(object):
_do_not_promote = ['get_expr_depends', 'get_referenced_compnames',
'get_referenced_varpaths']
def __init__(self, parent, allowed_types=None):
self._parent = parent
self._constraints = ordereddict.OrderedDict()
def remove_constraint(self, key):
"""Removes the constraint with the given string."""
key = _remove_spaces(key)
cnst = self._constraints.get(key)
if cnst:
cnst.deactivate()
del self._constraints[key]
else:
msg = "Constraint '%s' was not found. Remove failed." % key
self._parent.raise_exception(msg, AttributeError)
self._parent.config_changed()
def get_references(self, name):
"""Return references to component `name` in
preparation for subsequent :meth:`restore_references`
call.
name: string
Name of component being referenced.
"""
refs = ordereddict.OrderedDict()
for cname, constraint in self._constraints.items():
if name in constraint.get_referenced_compnames():
refs[cname] = constraint
return refs
def remove_references(self, name):
"""Remove references to component `name`.
name: string
Name of component being removed.
"""
to_remove = []
for cname, constraint in self._constraints.items():
if name in constraint.get_referenced_compnames():
to_remove.append(cname)
for cname in to_remove:
self.remove_constraint(cname)
def restore_references(self, refs):
"""Restore references to component `name` from `refs`.
refs: object
Value returned by :meth:`get_references`.
Note: this is called from the replace() method, where
the replacing object may be missing variables that were
found in the target object, so no restore_references
call should raise an exception when restoring a reference
fails.
"""
for name, constraint in refs.items():
if name in self._constraints:
self.remove_constraint(name)
try:
self.add_constraint(str(constraint), name,
constraint.lhs.scope)
except Exception as err:
self._parent._logger.warning("Couldn't restore constraint '%s': %s"
% (name, str(err)))
def clear_constraints(self):
"""Removes all constraints."""
for name, cnst in self._constraints.items():
self.remove_constraint(name)
def list_constraints(self):
"""Return a list of strings containing constraint expressions."""
return self._constraints.keys()
def copy_constraints(self):
"""Returns a copy of our constraints dict."""
dct = ordereddict.OrderedDict()
for key, val in self._constraints.items():
dct[key] = val.copy()
return dct
def list_pseudocomps(self):
"""Returns a list of pseudocomponent names associated with our
parameters.
"""
return [c.pcomp_name for c in self._constraints.values()
if c.pcomp_name]
def get_expr_depends(self):
"""Returns a list of tuples of the form (comp_name, self_name)
for each component name referenced by a constraint.
"""
conn_list = []
pname = self._parent.name
for name, constraint in self._constraints.items():
conn_list.extend([(c, pname)
for c in constraint.get_referenced_compnames()])
return conn_list
def get_referenced_compnames(self):
"""Returns a set of names of each component referenced by a
constraint.
"""
names = set()
for constraint in self._constraints.values():
names.update(constraint.get_referenced_compnames())
return names
def get_referenced_varpaths(self):
"""Returns a set of variable names referenced by a
constraint.
"""
names = set()
for constraint in self._constraints.values():
names.update(constraint.get_referenced_varpaths(copy=False))
return names
def mimic(self, target):
"""Tries to mimic the target object's constraints. Target constraints that
are incompatible with this object are ignored.
"""
old = self._constraints
self._constraints = ordereddict.OrderedDict()
scope = _get_scope(target)
for name, cnst in target.copy_constraints().items():
try:
self.add_existing_constraint(scope, cnst, name)
except Exception:
self._constraints = old
raise
def _item_count(self):
"""This is used by the replace function to determine if a delegate from the
target object is 'empty' or not. If it's empty then it's not an error if the
replacing object doesn't have this delegate.
"""
return len(self._constraints)
[docs]class HasEqConstraints(_HasConstraintsBase):
"""Add this class as a delegate if your Driver supports equality
constraints but does not support inequality constraints.
"""
[docs] def add_constraint(self, expr_string, name=None, scope=None):
"""Adds a constraint in the form of a boolean expression string
to the driver.
*Parameters:*
expr_string: str
Expression string containing the constraint.
name: str (optional)
Name to be used to refer to the constraint rather than its
expression string.
scope: object (optional)
The object to be used as the scope when evaluating the expression.
"""
try:
lhs, rel, rhs = _parse_constraint(expr_string)
except Exception as err:
self._parent.raise_exception(str(err), type(err))
if rel == '=':
self._add_eq_constraint(lhs, rhs, name, scope)
else:
msg = "Inequality constraints are not supported on this driver"
self._parent.raise_exception(msg, ValueError)
def _add_eq_constraint(self, lhs, rhs, name=None, scope=None):
"""Adds an equality constraint as two strings, a left-hand side and
a right-hand side.
"""
if not isinstance(lhs, basestring):
msg = "Constraint left-hand side (%s) is not a string" % lhs
raise ValueError(msg)
if not isinstance(rhs, basestring):
msg = "Constraint right-hand-side (%s) is not a string" % rhs
raise ValueError(msg)
ident = _remove_spaces('='.join([lhs, rhs]))
if ident in self._constraints:
self._parent.raise_exception('A constraint of the form "%s" already exists '
'in the driver. Add failed.' % ident, ValueError)
elif name is not None and name in self._constraints:
self._parent.raise_exception('A constraint named "%s" already exists '
'in the driver. Add failed.' % name, ValueError)
constraint = Constraint(lhs, '=', rhs, scope=_get_scope(self, scope))
constraint.activate()
name = ident if name is None else name
self._constraints[name] = constraint
self._parent.config_changed()
[docs] def add_existing_constraint(self, scope, constraint, name=None):
"""Adds an existing Constraint object to the driver.
scope: container object where constraint expression will
be evaluated.
constraint: Constraint object
name: str (optional)
Name to be used to refer to the constraint rather than its
expression string.
"""
if constraint.comparator == '=':
constraint.activate()
self._constraints[name] = constraint
else:
self._parent.raise_exception("Inequality constraint '%s' is not supported on this driver" %
str(constraint), ValueError)
self._parent.config_changed()
[docs] def get_eq_constraints(self):
"""Returns an ordered dict of constraint objects."""
return self._constraints
[docs] def get_constraints(self):
"""Returns an ordered dict of constraint objects"""
return self._constraints
[docs] def eval_eq_constraints(self, scope=None):
"""Returns a list of constraint values.
"""
return [c.evaluate(_get_scope(self, scope)) for c in self._constraints.values()]
[docs] def list_eq_constraint_targets(self):
"""Returns a list of outputs suitable for calc_gradient()."""
return ["%s.out0" % c.pcomp_name for c in self._constraints.values()]
[docs]class HasIneqConstraints(_HasConstraintsBase):
"""Add this class as a delegate if your Driver supports inequality
constraints but does not support equality constraints.
"""
[docs] def add_constraint(self, expr_string, name=None, scope=None):
"""Adds a constraint in the form of a boolean expression string
to the driver.
expr_string: str
Expression string containing the constraint.
name: str (optional)
Name to be used to refer to the constraint rather than its
expression string.
scope: object (optional)
The object to be used as the scope when evaluating the expression.
"""
try:
lhs, rel, rhs = _parse_constraint(expr_string)
except Exception as err:
self._parent.raise_exception(str(err), type(err))
self._add_ineq_constraint(lhs, rel, rhs, name, scope)
def _add_ineq_constraint(self, lhs, rel, rhs, name=None, scope=None):
"""Adds an inequality constraint as three strings; a left-hand side,
a comparator ('<','>','<=', or '>='), and a right-hand side.
"""
if rel == '=':
msg = "Equality constraints are not supported on this driver"
self._parent.raise_exception(msg, ValueError)
if not isinstance(lhs, basestring):
msg = "Constraint left-hand-side (%s) is not a string" % lhs
raise ValueError(msg)
if not isinstance(rhs, basestring):
msg = "Constraint right-hand-side (%s) is not a string" % rhs
raise ValueError(msg)
ident = _remove_spaces(rel.join([lhs, rhs]))
if ident in self._constraints:
self._parent.raise_exception('A constraint of the form "%s" already exists in '
'the driver. Add failed.' % ident, ValueError)
elif name is not None and name in self._constraints:
self._parent.raise_exception('A constraint named "%s" already exists '
'in the driver. Add failed.' % name, ValueError)
constraint = Constraint(lhs, rel, rhs, scope=_get_scope(self, scope))
constraint.activate()
if name is None:
self._constraints[ident] = constraint
else:
self._constraints[name] = constraint
self._parent.config_changed()
[docs] def add_existing_constraint(self, scope, constraint, name=None):
"""Adds an existing Constraint object to the driver.
scope: container object where constraint expression will
be evaluated.
constraint: Constraint object
name: str (optional)
Name to be used to refer to the constraint rather than its
expression string.
"""
if constraint.comparator != '=':
self._constraints[name] = constraint
constraint.activate()
else:
self._parent.raise_exception("Equality constraint '%s' is not supported on this driver" %
str(constraint), ValueError)
self._parent.config_changed()
[docs] def get_ineq_constraints(self):
"""Returns an ordered dict of inequality constraint objects."""
return self._constraints
[docs] def get_constraints(self):
"""Returns an ordered dict of constraint objects"""
return self._constraints
[docs] def eval_ineq_constraints(self, scope=None):
"""Returns a list of constraint values"""
return [c.evaluate(_get_scope(self, scope)) for c in self._constraints.values()]
[docs] def list_ineq_constraint_targets(self):
"""Returns a list of outputs suitable for calc_gradient()."""
return ["%s.out0" % c.pcomp_name for c in self._constraints.values()]
[docs]class HasConstraints(object):
"""Add this class as a delegate if your Driver supports both equality
and inequality constraints.
"""
_do_not_promote = ['get_expr_depends', 'get_referenced_compnames',
'get_referenced_varpaths']
def __init__(self, parent):
self._parent = parent
self._eq = HasEqConstraints(parent)
self._ineq = HasIneqConstraints(parent)
def _item_count(self):
"""This is used by the replace function to determine if a delegate from the
target object is 'empty' or not. If it's empty then it's not an error if the
replacing object doesn't have this delegate.
"""
return self._eq._item_count() + self._ineq._item_count()
[docs] def add_constraint(self, expr_string, name=None, scope=None):
"""Adds a constraint in the form of a boolean expression string
to the driver.
expr_string: str
Expression string containing the constraint.
name: str (optional)
Name to be used to refer to the constraint rather than its
expression string.
scope: object (optional)
The object to be used as the scope when evaluating the expression.
"""
try:
lhs, rel, rhs = _parse_constraint(expr_string)
except Exception as err:
self._parent.raise_exception(str(err), type(err))
if rel == '=':
self._eq._add_eq_constraint(lhs, rhs, name, scope)
else:
self._ineq._add_ineq_constraint(lhs, rel, rhs, name, scope)
[docs] def add_existing_constraint(self, scope, constraint, name=None):
"""Adds an existing Constraint object to the driver.
scope: container object where constraint expression will
be evaluated.
constraint: Constraint object
name: str (optional)
Name to be used to refer to the constraint rather than its
expression string.
"""
if constraint.comparator == '=':
self._eq.add_existing_constraint(scope, constraint, name)
else:
self._ineq.add_existing_constraint(scope, constraint, name)
[docs] def remove_constraint(self, expr_string):
"""Removes the constraint with the given string."""
ident = _remove_spaces(expr_string)
if ident in self._eq._constraints:
self._eq.remove_constraint(expr_string)
else:
self._ineq.remove_constraint(expr_string)
[docs] def get_references(self, name):
"""Return references to component `name` in preparation for subsequent
:meth:`restore_references` call.
name: string
Name of component being removed.
"""
return (self._eq.get_references(name),
self._ineq.get_references(name))
[docs] def remove_references(self, name):
"""Remove references to component `name`.
name: string
Name of component being removed.
"""
self._eq.remove_references(name)
self._ineq.remove_references(name)
[docs] def restore_references(self, refs):
"""Restore references to component `name` from `refs`.
refs: dict
References returned by :meth:`get_references`.
"""
if isinstance(refs, tuple) and len(refs) == 2:
self._eq.restore_references(refs[0])
self._ineq.restore_references(refs[1])
else:
raise TypeError('refs should be tuple of ordereddict.OrderedDict, got %r'
% refs)
[docs] def clear_constraints(self):
"""Removes all constraints."""
self._eq.clear_constraints()
self._ineq.clear_constraints()
[docs] def copy_constraints(self):
dct = self._eq.copy_constraints()
dct.update(self._ineq.copy_constraints())
return dct
def _add_ineq_constraint(self, lhs, comparator, rhs, scaler, adder, name=None,
scope=None):
"""Adds an inequality constraint as three strings; a left-hand side,
a comparator ('<','>','<=', or '>='), and a right hand side.
"""
self._ineq._add_ineq_constraint(lhs, comparator, rhs, scaler, adder, name,
scope)
def _add_eq_constraint(self, lhs, rhs, scaler, adder, name=None, scope=None):
"""Adds an equality constraint as two strings, a left-hand side and
a right-hand side.
"""
self._eq._add_eq_constraint(lhs, rhs, scaler, adder, name, scope)
[docs] def get_eq_constraints(self):
"""Returns an ordered dict of equality constraint objects."""
return self._eq.get_eq_constraints()
[docs] def get_ineq_constraints(self):
"""Returns an ordered dict of inequality constraint objects."""
return self._ineq.get_ineq_constraints()
[docs] def get_constraints(self):
"""Returns an ordered dict of constraint objects"""
return ordereddict.OrderedDict(self._eq.get_eq_constraints().items() +
self._ineq.get_ineq_constraints().items())
[docs] def eval_eq_constraints(self, scope=None):
"""Returns a list of constraint values.
"""
return self._eq.eval_eq_constraints(scope)
[docs] def eval_ineq_constraints(self, scope=None):
"""Returns a list of constraint values.
"""
return self._ineq.eval_ineq_constraints(scope)
[docs] def eval_constraints(self, scope=None):
"""Returns a list of constraint values.
"""
return self._eq.eval_eq_constraints(scope) + \
self._ineq.eval_ineq_constraints(scope)
[docs] def list_constraints(self):
"""Return a list of strings containing constraint expressions."""
lst = self._ineq.list_constraints()
lst.extend(self._eq.list_constraints())
return lst
[docs] def list_pseudocomps(self):
"""Returns a list of pseudocomponent names associated with our
parameters.
"""
return self._eq.list_pseudocomps() + self._ineq.list_pseudocomps()
[docs] def list_eq_constraint_targets(self):
"""Returns a list of outputs suitable for calc_gradient()."""
return self._eq.list_eq_constraint_targets()
[docs] def list_ineq_constraint_targets(self):
"""Returns a list of outputs suitable for calc_gradient()."""
return self._ineq.list_ineq_constraint_targets()
[docs] def list_constraint_targets(self):
"""Returns a list of outputs suitable for calc_gradient()."""
return self._eq.list_eq_constraint_targets() + \
self._ineq.list_ineq_constraint_targets()
[docs] def get_expr_depends(self):
"""Returns a list of tuples of the form (src_comp_name, dest_comp_name)
for each dependency introduced by a constraint.
"""
conn_list = self._eq.get_expr_depends()
conn_list.extend(self._ineq.get_expr_depends())
return conn_list
[docs] def get_referenced_compnames(self):
"""Returns a set of names of each component referenced by a
constraint.
"""
names = set(self._eq.get_referenced_compnames())
names.update(self._ineq.get_referenced_compnames())
return names
[docs] def get_referenced_varpaths(self):
"""Returns a set of names of each component referenced by a
constraint.
"""
names = set(self._eq.get_referenced_varpaths())
names.update(self._ineq.get_referenced_varpaths())
return names
[docs] def mimic(self, target):
"""Tries to mimic the target object's constraints. Target constraints that
are incompatible with raise an exception.
"""
self.clear_constraints()
scope = _get_scope(self)
for name, cnst in target.copy_constraints().items():
self.add_existing_constraint(scope, cnst, name)