"""
This module defines modified versions of some classes and functions defined in
:mod:`multiprocessing.managers` to support OpenMDAO distributed simulations.
Channel communication can be secured by setting `authkey` to 'PublicKey'.
When `authkey` is 'PublicKey', an additional session establishment protocol
is used between the Proxy and Server:
1. Proxy sends session request containing the proxy's public key, encrypted with
the server's public key (obtained when told what server to connect to).
2. Server responds with random session key, encrypted with proxy's public key.
3. Subsequent communication is encrypted with the session key (which presumably
is quicker than public/private key encryption).
If `authkey` is not 'PublicKey', then the above session protocol is not used,
and channel data is in the clear.
Public methods of an object are determined by a role-based access control
attribute associated with the method. The server will verify that the current
role is allowed access. The current role is determined by an
:class:`AccessController` based on a :class:`Credentials` object received from
the proxy.
Assuming the credentials check passes, the server will set its credentials
to those specified by the :class:`AccessController` during the execution of the
method.
"""
# Unfortunately, there's a lot of multiprocessing package code duplication here.
# And a lot of this should be done at a lower layer. But then we'd be
# duplicating connection.py code as well as (more) managers.py code. Another
# alternative is to just use our own conection.py and managers.py (and whatever
# else might bleed in) as our private multiprocessing package.
# No obvious 'best' alternative.
import errno
import glob
import hashlib
import inspect
import logging
import os
import socket
import sys
import threading
import time
import traceback
from Crypto import Random
from multiprocessing import Process, current_process, connection, util
from multiprocessing.forking import Popen
from multiprocessing.managers import BaseManager, BaseProxy, RebuildProxy, \
Server, State, Token, convert_to_error, \
dispatch
if sys.platform == 'win32': #pragma no cover
from _multiprocessing import win32
from traits.trait_handlers import TraitDictObject
from openmdao.main.interfaces import obj_has_interface
from openmdao.main.mp_util import decrypt, encrypt, is_legal_connection, \
keytype, make_typeid, public_methods, \
tunnel_address, SPECIALS
from openmdao.main.rbac import AccessController, RoleError, check_role, \
need_proxy, Credentials, \
get_credentials, set_credentials
from openmdao.util.log import install_remote_handler, remove_remote_handlers, \
logging_port, LOG_DEBUG2, LOG_DEBUG3
from openmdao.util.publickey import decode_public_key, encode_public_key, \
get_key_pair, HAVE_PYWIN32, \
pk_encrypt, pk_decrypt
# Classes which require proxying (used by default AccessController)
# Used to break import loop between this and openmdao.main.container.
CLASSES_TO_PROXY = []
# Cache of proxies created by _make_proxy_type().
_PROXY_CACHE = {}
[docs]def is_instance(obj, type_info):
"""
:func:`isinstance` replacement for when `obj` might be a proxy.
obj: object
Object to be tested.
type_info: class or tuple of classes
Class(es) to be tested against.
Returns True if `obj` is an instance of `type_info` or the object `obj`
refers to is an instance of `type_info`.
"""
if isinstance(obj, OpenMDAO_Proxy):
try:
type_info[0]
except TypeError:
type_info = (type_info,)
for typ in type_info:
typename = '%s.%s' % (typ.__module__, typ.__name__)
if obj.__is_instance__(typename):
return True
return False
else:
return isinstance(obj, type_info)
[docs]def has_interface(obj, *ifaces):
"""
:func:`obj_has_interface` replacement for when `obj` might be a proxy.
obj: object
Object to be tested.
ifaces: list[Interface]
Interfaces to be tested against.
Returns True if `obj` or the object `obj` refers to supports at least one
:class:`Interface` in `ifaces`.
"""
if isinstance(obj, OpenMDAO_Proxy):
for typ in ifaces:
typename = '%s.%s' % (typ.__module__, typ.__name__)
if obj.__has_interface__(typename):
return True
return False
else:
return obj_has_interface(obj, *ifaces)
[docs]class OpenMDAO_Server(Server):
"""
A :class:`Server` that supports dynamic proxy generation and credential
checking.
registry: dict
Manager's proxy registry.
address: tuple or string
A :mod:`multiprocessing` address specifying an Internet address or
a pipe.
authkey: string
Authorization key. Inherited from the current :class:`Process`
object if not specified.
serializer: string
Which serialization method to use.
name: string
Name for server, used in log files, etc.
allowed_hosts: list(string)
Host address patterns to check against.
Optional if `allowed_users` is specified.
allowed_users: dict
Dictionary of users and corresponding public keys allowed access.
If None, any user may access. If empty, no user may access.
The host portions of user strings are used for address patterns.
allow_tunneling: bool
If True, allow connections from the local host, even if not listed
otherwise.
"""
def __init__(self, registry, address, authkey, serializer, name=None,
allowed_hosts=None, allowed_users=None, allow_tunneling=False):
super(OpenMDAO_Server, self).__init__(registry, address, authkey,
serializer)
self.name = name or ('OMS_%d' % os.getpid())
self._logger = logging.getLogger(self.name)
self._logger.info('OpenMDAO_Server process %d started, %r',
os.getpid(), keytype(authkey))
Random.atfork() # Get our own PRNG.
self.host = socket.gethostname()
self._allowed_users = allowed_users
if self._allowed_users is not None:
hosts = set()
for user_host in self._allowed_users.keys():
user, host = user_host.split('@')
try:
ip_addr = socket.gethostbyname(host)
except socket.gaierror:
self._logger.warning('No address for %r', host)
else:
hosts.add(ip_addr)
if host == socket.getfqdn() or host == socket.gethostname():
hosts.add('127.0.0.1') # localhost
if allowed_hosts:
hosts |= set(allowed_hosts)
self._allowed_hosts = list(hosts)
else:
self._allowed_hosts = allowed_hosts or []
self._allow_tunneling = allow_tunneling
if allow_tunneling:
self._allowed_hosts.append('127.0.0.1') # localhost
self._allowed_hosts.append(socket.gethostbyname(address[0]))
if self._allowed_users is None:
self._logger.warning(' allowed_users: ANY')
else:
self._logger.info(' allowed_users: %s',
sorted(self._allowed_users.keys()))
if self._allowed_hosts:
self._logger.info(' allowed_hosts: %s',
sorted(self._allowed_hosts))
else:
self._logger.warning(' allowed_hosts: ANY')
self._authkey = authkey
if authkey == 'PublicKey':
# While we may be 'owned' by some remote user, use default
# credentials for getting our key pair. This avoids generation
# overhead and also issues with propagating our public key
# back through a proxy.
self._key_pair = get_key_pair(Credentials.user_host, self._logger)
else:
self._key_pair = None
self._id_to_controller = {}
self._access_controller = AccessController()
for cls in CLASSES_TO_PROXY:
self._access_controller.class_proxy_required(cls)
self._address_type = connection.address_type(self.address)
@property
[docs] def public_key(self):
""" Public key for session establishment. """
if self._authkey == 'PublicKey':
return self._key_pair.publickey()
# Just being defensive.
else: #pragma no cover
raise RuntimeError('No public key available')
# This happens on the remote server side and we'll check when connecting.
@property
[docs] def public_key_text(self): #pragma no cover
""" Text representation of public key. """
if self._authkey == 'PublicKey':
return encode_public_key(self.public_key)
else:
return ''
[docs] def serve_forever(self):
"""
Run the server forever.
This version supports host connection filtering.
Connection filtering allows for PublicKey servers which aren't
accessible by just any host.
"""
current_process()._manager_server = self
try:
try:
while not self.stop:
try:
conn = self.listener.accept()
# Comment-out the line above and use this equivalent
# to debug connectivity issues.
#conn = self.listener._listener.accept()
#self._logger.critical('connection attempt from %r',
# self.listener.last_accepted)
#if self.listener._authkey:
# connection.deliver_challenge(conn, self.listener._authkey)
# connection.answer_challenge(conn, self.listener._authkey)
# Hard to cause this to happen.
except (OSError, IOError): #pragma no cover
if self.stop:
break
else:
continue
address = self.listener.last_accepted
if address:
if not is_legal_connection(address, self._allowed_hosts,
self._logger):
conn.close()
continue
t = threading.Thread(target=self.handle_request,
args=(conn,))
t.daemon = True
try:
t.start()
# Don't want to cause this to happen.
except Exception as exc: #pragma no cover
self._logger.error("Can't start server thread: %r", exc)
conn.close()
continue
# Don't want to cause this to happen.
except (KeyboardInterrupt, SystemExit): #pragma no cover
pass
finally:
self.stop = 999
try:
self.listener.close()
except Exception as exc:
self._logger.error('Exception closing listener: %r', exc)
[docs] def handle_request(self, conn):
"""
Handle a new connection.
conn: socket or pipe
Connection to process.
This version filters host connections and avoids getting upset if it
can't deliver a challenge. This is to deal with immediately closed
connections caused by :meth:`manager_is_alive` which are used to avoid
getting hung trying to connect to a manager which is no longer there.
"""
funcname = result = request = None
try:
connection.deliver_challenge(conn, self.authkey)
except (EOFError, IOError) as exc:
conn.close()
return
# Hard to cause this to happen. It rarely happens, and then at shutdown.
except Exception as exc: #pragma no cover
msg = ('#TRACEBACK', 'Exception delivering challenge: %r' % exc)
try:
conn.send(msg)
except Exception as exc:
pass
util.info('Failure to send message: %r', msg)
util.info(' ... request was %r', request)
util.info(' ... exception was %r', exc)
conn.close()
return
try:
connection.answer_challenge(conn, self.authkey)
request = conn.recv()
ignore, funcname, args, kwds = request
assert funcname in self.public, '%r unrecognized' % funcname
func = getattr(self, funcname)
# Hard to cause this to happen. It rarely happens, and then at shutdown.
except Exception as exc: #pragma no cover
msg = ('#TRACEBACK', 'Exception answering challenge: %r' % exc)
else:
try:
result = func(conn, *args, **kwds)
# Hard to cause this to happen. It rarely happens, and then at shutdown.
except Exception as exc: #pragma no cover
try: # Sometimes at shutdown 'traceback' is None!?
msg = ('#TRACEBACK', traceback.format_exc())
except Exception:
msg = ('#TRACEBACK',
'Exception from %r: %r' % (funcname, exc))
else:
msg = ('#RETURN', result)
try:
conn.send(msg)
# Hard to cause this to happen. It rarely happens, and then at shutdown.
except Exception as exc: #pragma no cover
try:
conn.send(('#TRACEBACK', 'Exception sending reply: %r' % exc))
except Exception:
pass
util.info('Failure to send message: %r', msg)
util.info(' ... request was %r', request)
util.info(' ... exception was %r', exc)
conn.close()
[docs] def serve_client(self, conn):
"""
Handle requests from the proxies in a particular process/thread.
conn: socket or pipe
Connection to process.
This version supports dynamic proxy generation and credential checking.
"""
self._logger.log(LOG_DEBUG2, 'starting server thread to service %r, %s',
threading.current_thread().name, keytype(self._authkey))
recv = conn.recv
send = conn.send
id_to_obj = self.id_to_obj
id_to_controller = self._id_to_controller
if self._authkey == 'PublicKey':
client_key, session_key = self._init_session(conn)
else:
client_key = ''
session_key = ''
while not self.stop:
try:
ident = methodname = args = kwds = credentials = None
obj = exposed = gettypeid = None
data = recv()
try:
request = decrypt(data, session_key)
except Exception as exc:
trace = traceback.format_exc()
msg = "Can't decrypt/unpack request. This could be the" \
" result of referring to a dead server."
self._logger.error(msg)
self._logger.error(trace)
raise RuntimeError(msg)
ident, methodname, args, kwds, credentials = request
self._logger.log(LOG_DEBUG3, 'request %s %s', ident, methodname)
# self._logger.log(LOG_DEBUG3, 'credentials %s', credentials)
# self._logger.log(LOG_DEBUG3, 'id_to_obj:\n%s',
# self.debug_info(conn))
# Decode and verify valid credentials.
try:
credentials = Credentials.verify(credentials,
self._allowed_users)
except Exception as exc:
self._logger.error('%r' % exc)
raise
try:
obj, exposed, gettypeid = id_to_obj[ident]
# Hard to cause this to happen.
except KeyError: #pragma no cover
msg = 'No object for ident %s' % ident
self._logger.error(msg)
raise KeyError('%s %r: %s' % (self.host, self.name, msg))
if methodname not in exposed:
# Try to raise with a useful error message.
if methodname == '__getattr__':
try:
val = getattr(obj, args[0])
except AttributeError:
raise AttributeError(
'attribute %r of %r object does not exist'
% (args[0], type(obj)))
if inspect.ismethod(val):
methodname = args[0]
else:
raise AttributeError(
'attribute %r of %r is not accessible'
% (args[0], type(obj)))
raise AttributeError(
'method %r of %r object is not in exposed=%r'
% (methodname, type(obj), exposed))
# Set correct credentials for function lookup.
set_credentials(credentials)
function = getattr(obj, methodname)
# Proxy pass-through only happens remotely.
if isinstance(obj, BaseProxy): #pragma no cover
role = None
access_controller = None
else:
# Check for allowed access.
role, credentials, access_controller = \
self._check_access(ident, methodname, function, args,
credentials)
if methodname != 'echo':
# 'echo' is used for performance tests, keepalives, etc.
self._logger.log(LOG_DEBUG2, "Invoke %s %s '%s'",
methodname, role, credentials)
self._logger.log(LOG_DEBUG3, ' %s %s', args, kwds)
# Invoke function.
try:
try:
res = function(*args, **kwds)
self._logger.log(LOG_DEBUG3, ' res %r', res)
except AttributeError as exc:
if isinstance(obj, BaseProxy) and \
methodname == '__getattribute__':
# Avoid an extra round-trip.
res = obj.__getattr__(*args, **kwds)
else:
raise
except Exception as exc:
self._logger.exception('%s %s %s failed:',
methodname, role, credentials)
msg = ('#TRACEBACK', traceback.format_exc())
else:
msg = self._form_reply(res, ident, methodname, function,
args, access_controller, conn)
except AttributeError:
# Just being defensive, this should never happen.
if methodname is None: #pragma no cover
msg = ('#TRACEBACK', traceback.format_exc())
else:
orig_traceback = traceback.format_exc()
try:
fallback_func = self.fallback_mapping[methodname]
self._logger.log(LOG_DEBUG2, 'Fallback %s', methodname)
result = fallback_func(self, conn, ident, obj,
*args, **kwds)
msg = ('#RETURN', result)
except Exception:
msg = ('#TRACEBACK', orig_traceback)
except EOFError:
util.debug('got EOF -- exiting thread serving %r',
threading.current_thread().name)
sys.exit(0)
# Just being defensive, this should never happen.
except Exception: #pragma no cover
trace = traceback.format_exc()
self._logger.error('serve_client exception, method %s',
methodname)
self._logger.error(trace)
msg = ('#TRACEBACK', trace)
try:
try:
send(encrypt(msg, session_key))
except Exception:
send(encrypt(('#UNSERIALIZABLE', repr(msg)), session_key))
# Just being defensive, this should never happen.
except Exception as exc: #pragma no cover
self._logger.error('exception in thread serving %r',
threading.current_thread().name)
self._logger.error(' ... message was %r', msg)
self._logger.error(' ... exception was %r', exc)
conn.close()
sys.exit(1)
def _init_session(self, conn):
""" Receive client public key, send session key. """
# Hard to cause exceptions to happen where we'll see them.
try:
client_data = conn.recv()
except Exception as exc: #pragma no cover
self._logger.error("Can't receive client data: %r", exc)
raise
client_version = client_data[0]
if client_version != 1: #pragma no cover
msg = 'Expected client protocol version 1, got %r' % client_version
self._logger.error(msg)
raise RuntimeError(msg)
n, e, encrypted = client_data[1:]
if e != self._key_pair.e or n != self._key_pair.n: #pragma no cover
msg = 'Server key mismatch'
self._logger.error(msg)
raise RuntimeError(msg)
try:
text = pk_decrypt(encrypted, self._key_pair)
client_key = decode_public_key(text)
except Exception as exc: #pragma no cover
self._logger.error("Can't recreate client key: %r", exc)
raise
server_version = 1
try:
session_key = hashlib.sha1(str(id(conn))).hexdigest()
data = client_key.encrypt(session_key, '')
conn.send((server_version, data))
except Exception as exc: #pragma no cover
self._logger.error("Can't send session key: %r", exc)
raise
return (client_key, session_key)
def _check_access(self, ident, methodname, function, args, credentials):
""" Check for valid access, return (role, credentials, controller). """
obj, exposed, gettypeid = self.id_to_obj[ident]
# Get access controller for obj.
access_controller = self._id_to_controller.get(ident)
if access_controller is None:
try:
get_access_controller = getattr(obj, 'get_access_controller')
except AttributeError:
access_controller = self._access_controller
# Only happens on remote protected object.
else: #pragma no cover
if get_access_controller is None:
access_controller = self._access_controller
else:
access_controller = get_access_controller()
self._id_to_controller[ident] = access_controller
# Get role based on credentials.
role = access_controller.get_role(credentials)
if methodname in SPECIALS:
# Check for valid access based on role.
access_controller.check_access(role, methodname, obj, args[0])
else:
# Check for valid role.
try:
check_role(role, function)
except RoleError as exc:
raise RoleError('%s(): %s' % (methodname, exc))
# Set credentials for execution of function. Typically
# these are just the credentials of the caller, but
# sometimes a function needs to execute with different
# credentials.
credentials = \
access_controller.get_proxy_credentials(function, credentials)
set_credentials(credentials)
return (role, credentials, access_controller)
def _form_reply(self, res, ident, methodname, function, args,
access_controller, conn):
""" Return reply message for `res`. """
obj, exposed, gettypeid = self.id_to_obj[ident]
msg = None
typeid = None
if inspect.ismethod(res) and \
(methodname == '__getattribute__' or methodname == '__getattr__'):
# More informative for common case of missing RBAC.
raise AttributeError('method %r of %r object is not in exposed=%r'
% (args[0], type(obj), exposed))
# Proxy pass-through only happens remotely.
if isinstance(res, OpenMDAO_Proxy): #pragma no cover
res_address = res._token.address
res_type = connection.address_type(res_address)
proxy_proxy = False
if res_type != self._address_type:
proxy_proxy = True # Different type of connection.
elif res_type == 'AF_INET':
if res_address[0] != self.address[0]:
proxy_proxy = True # Different network.
elif res_address[0] == '127.0.0.1' and self._allow_tunneling:
proxy_proxy = True # Access through tunnel.
if proxy_proxy:
# Create proxy for proxy.
typeid = res._token.typeid
proxyid = make_typeid(res)
self._logger.log(LOG_DEBUG2, 'Creating proxy for proxy %s',
proxyid)
if proxyid not in self.registry:
self.registry[proxyid] = (None, None, None, _auto_proxy)
else:
# Propagate the proxy info.
res._close.cancel() # Don't decref when reaped.
msg = ('#PROXY', (res._exposed_, res._token, res._pubkey))
elif access_controller is not None:
if methodname in SPECIALS:
if access_controller.need_proxy(obj, args[0], res):
# Create proxy if in declared proxy types.
typeid = make_typeid(res)
proxyid = typeid
if typeid not in self.registry:
self.registry[typeid] = (None, None, None, None)
elif need_proxy(function, res, access_controller):
# Create proxy if in declared proxy types.
typeid = make_typeid(res)
proxyid = typeid
if typeid not in self.registry:
self.registry[typeid] = (None, None, None, None)
# Proxy pass-through only happens remotely.
else: #pragma no cover
# Create proxy if registered.
typeid = gettypeid and gettypeid.get(methodname, None)
proxyid = typeid
if msg is None:
if typeid:
rident, rexposed = self.create(conn, proxyid, res)
token = Token(typeid, self.address, rident)
self._logger.log(LOG_DEBUG2, 'Returning proxy for %s at %s',
typeid, self.address)
if self._key_pair is None:
pubkey = None
else:
pubkey = self._key_pair.publickey()
msg = ('#PROXY', (rexposed, token, pubkey))
else:
msg = ('#RETURN', res)
return msg
def _fallback_isinstance(self, conn, ident, obj, typename):
""" Check if `obj` is an instance of `typename`. """
# It's very difficult to get an arbitrary `typename` here.
dot = typename.rfind('.')
module_name = typename[:dot]
class_name = typename[dot+1:]
try:
module = __import__(module_name)
except ImportError: #pragma no cover
return False
module = sys.modules[module_name]
try:
cls = getattr(module, class_name)
except AttributeError: #pragma no cover
return False
return isinstance(obj, cls)
Server.fallback_mapping['__is_instance__'] = _fallback_isinstance
def _fallback_hasinterface(self, conn, ident, obj, typename):
""" Check if `obj` supports `typename`. """
# It's very difficult to get an arbitrary `typename` here.
dot = typename.rfind('.')
module_name = typename[:dot]
class_name = typename[dot+1:]
try:
module = __import__(module_name)
except ImportError: #pragma no cover
return False
module = sys.modules[module_name]
try:
cls = getattr(module, class_name)
except AttributeError: #pragma no cover
return False
return obj_has_interface(obj, cls)
Server.fallback_mapping['__has_interface__'] = _fallback_hasinterface
# This is for low-level debugging of servers.
[docs] def debug_info(self, conn): #pragma no cover
"""
Return string representing state of id_to_obj mapping.
conn: socket or pipe
Unused.
This version handles proxies in a special manner.
"""
self.mutex.acquire()
try:
result = []
keys = self.id_to_obj.keys()
keys.sort()
for ident in keys:
if ident != '0':
obj = self.id_to_obj[ident][0]
if isinstance(obj, BaseProxy):
obj_str = '%s proxy for %s %s' \
% (keytype(obj._authkey), obj._id,
obj._token.typeid)
else:
obj_str = str(obj)[:75]
result.append(' %s: refcount=%s\n %s' %
(ident, self.id_to_refcount[ident], obj_str))
return '\n'.join(result)
finally:
self.mutex.release()
[docs] def create(self, conn, typeid, *args, **kwds):
"""
Create a new shared object and return its id.
conn: socket or pipe
Connection to process.
typeid: string
Identifier string for type of object to be created.
This version uses :func:`public_methods`.
"""
self.mutex.acquire()
try:
try:
callable, exposed, method_to_typeid, proxytype = \
self.registry[typeid]
# Just being defensive.
except KeyError: #pragma no cover
logging.error('mp_support.create: %r registry', typeid)
for key, value in self.registry.items():
logging.error(' %s: %s', key, value)
raise
if callable is None:
assert len(args) == 1 and not kwds
obj = args[0]
else:
obj = callable(*args, **kwds)
if exposed is None:
exposed = public_methods(obj)
if method_to_typeid is not None:
assert type(method_to_typeid) is dict
exposed = list(exposed) + list(method_to_typeid)
ident = '%x' % id(obj) # convert to string because xmlrpclib
# only has 32 bit signed integers
util.debug('%r callable returned object with id %r', typeid, ident)
self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
if ident not in self.id_to_refcount:
self.id_to_refcount[ident] = 0
# increment the reference count immediately, to avoid
# this object being garbage collected before a Proxy
# object for it can be created. The caller of create()
# is responsible for doing a decref once the Proxy object
# has been created.
self.incref(conn, ident)
return ident, tuple(exposed)
finally:
self.mutex.release()
# Will only be seen on remote.
[docs] def shutdown(self, conn): #pragma no cover
""" Shutdown this process. """
self.stop = 888
msg = 'received shutdown request, running exit functions'
print msg
sys.stdout.flush()
self._logger.info(msg)
remove_remote_handlers()
# Deprecated, but marginally better than atexit._run_exitfuncs()
# Don't try to log here, logging shuts-down via atexit.
if hasattr(sys, 'exitfunc'):
try:
sys.exitfunc()
except Exception as exc:
print 'sys.exitfunc(): %s' % exc
sys.stdout.flush()
print ' exit functions complete'
sys.stdout.flush()
super(OpenMDAO_Server, self).shutdown(conn)
[docs]class OpenMDAO_Manager(BaseManager):
"""
Uses :class:`OpenMDAO_Server`, retains the public key, and puts some slack
in shutdown timing.
address: tuple or string
A :mod:`multiprocessing` address specifying an Internet address or
a pipe.
authkey: string
Authorization key. Inherited from the current :class:`Process`
object if not specified.
serializer: string
Which serialization method to use.
pubkey: public key
Public portion of server's public/private key pair.
Needed for client/proxy side.
name: string
Name for server, used in log files, etc.
allowed_hosts: list(string)
Host address patterns to check against.
allowed_users: dict
Dictionary of users and corresponding public keys allowed access.
If None, any user may access. If empty, no user may access.
allow_tunneling: bool
If True, allow connections from 127.0.0.1 (localhost), even if not
listed otherwise.
"""
_Server = OpenMDAO_Server
def __init__(self, address=None, authkey=None, serializer='pickle',
pubkey=None, name=None, allowed_hosts=None, allowed_users=None,
allow_tunneling=False):
super(OpenMDAO_Manager, self).__init__(address, authkey, serializer)
self._pubkey = pubkey
self._name = name
self._allowed_hosts = allowed_hosts
self._allowed_users = allowed_users
self._allow_tunneling = allow_tunneling
[docs] def get_server(self):
"""
Return a server object with :meth:`serve_forever` and address attribute.
"""
assert self._state.value == State.INITIAL
return OpenMDAO_Server(self._registry, self._address, self._authkey,
self._serializer, self._name,
self._allowed_hosts, self._allowed_users,
self._allow_tunneling)
[docs] def start(self, cwd=None, log_level=logging.DEBUG):
"""
Spawn a server process for this manager object.
cwd: string
Directory to start in.
log_level: int
Initial root logging level for the server process.
This version retrieves the server's public key.
"""
assert self._state.value == State.INITIAL
# Pipe over which we will retrieve address of server.
reader, writer = connection.Pipe(duplex=False)
if sys.platform == 'win32': #pragma no cover
# Make registry pickleable.
registry = {}
for typeid, info in self._registry.items():
callable, exposed, method_to_typeid, proxytype = info
if proxytype and proxytype != _auto_proxy:
registry[typeid] = \
(callable, exposed, method_to_typeid, 'rebuild')
else:
registry = self._registry
# Flush logs before cloning.
for handler in logging._handlerList:
try:
handler.flush()
except AttributeError:
h = handler() # WeakRef
if h:
h.flush()
# Spawn process which runs a server.
credentials = get_credentials()
log_port = logging_port('localhost', 'localhost')
self._process = Process(
target=type(self)._run_server,
args=(registry, self._address, self._authkey,
self._serializer, self._name, self._allowed_hosts,
self._allowed_users, self._allow_tunneling,
writer, credentials, cwd, log_port, log_level),
)
ident = ':'.join(str(i) for i in self._process._identity)
self._process.name = type(self).__name__ + '-' + ident
self._process.start()
pid = self._process.pid
# Get address of server.
if self._authkey == 'PublicKey':
if sys.platform == 'win32' and not HAVE_PYWIN32: #pragma no cover
timeout = 120
else:
timeout = 10
else:
timeout = 10
writer.close()
start = time.time()
error_msg = None
for retry in range(timeout):
if reader.poll(1):
break
if not self._process.is_alive():
error_msg = 'Server process %d exited: %s' \
% (pid, self._process.exitcode)
break
# Hard to cause a timeout.
else: #pragma no cover
if error_msg is None:
et = time.time() - start
self._process.terminate()
error_msg = 'Server process %d startup timed-out in %.2f' \
% (pid, et)
if error_msg is None:
try:
reply = reader.recv()
except Exception as exc: # str(Exception()) is null, repr() isn't.
error_msg = 'Server process %d read failed: %s' \
% (pid, str(exc) or repr(exc))
else:
if isinstance(reply, Exception):
error_msg = 'Server process %d startup failed: %s' \
% (pid, str(reply) or repr(reply))
if error_msg:
logging.error(error_msg)
if cwd:
logging.error(' in dir %r', cwd)
for name in ('stdout', 'stderr', 'openmdao_log*.txt'):
for path in glob.glob(os.path.join(cwd, name)):
name = os.path.basename(path)
with open(path, 'r') as inp:
logging.error(' %s:\n%s', name, inp.read())
raise RuntimeError(error_msg)
self._address = reply
if self._authkey == 'PublicKey':
self._pubkey = reader.recv()
reader.close()
et = time.time() - start
logging.debug('Server process %d startup in %.2f', pid, et)
# Register a finalizer.
self._state.value = State.STARTED
self.shutdown = util.Finalize(
self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey,
self._state, self._Client),
exitpriority=0
)
# This happens on the remote server side and we'll check when using it.
@classmethod
def _run_server(cls, registry, address, authkey, serializer, name,
allowed_hosts, allowed_users, allow_tunneling,
writer, credentials, cwd, log_port, log_level): #pragma no cover
"""
Create a server, report its address and public key, and run it.
"""
try:
if sys.platform == 'win32':
set_credentials(credentials)
# Recreate registry proxytypes.
for typeid, info in registry.items():
callable, exposed, method_to_typeid, proxytype = info
if proxytype == 'rebuild':
registry[typeid] = (callable, exposed, method_to_typeid,
_make_proxy_type(typeid, exposed))
# If specified, move to new directory.
if cwd is not None:
os.chdir(cwd)
# Cleanup cloned logging environment.
del logging.root.handlers[:]
del logging._handlerList[:]
logging._handlers.clear()
# Reset stdout & stderr.
sys.stdout.flush()
sys.stderr.flush()
sys.stdout = open('stdout', 'w')
sys.stderr = open('stderr', 'w')
# Reset logging.
logging.basicConfig(level=log_level,
datefmt='%b %d %H:%M:%S',
format='%(asctime)s %(levelname)s %(name)s: %(message)s',
filename='openmdao_log.txt', filemode='w')
# Connect to remote logging server.
if log_port:
install_remote_handler('localhost', log_port, name)
# Create server.
server = cls._Server(registry, address, authkey, serializer, name,
allowed_hosts, allowed_users, allow_tunneling)
except Exception as exc:
traceback.print_exc()
writer.send(exc)
return
else:
# Inform parent process of the server's address.
writer.send(server.address)
if authkey == 'PublicKey':
writer.send(server.public_key)
finally:
writer.close()
# Run the manager.
util.info('manager serving at %r', server.address)
server.serve_forever()
@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
"""
Shutdown the manager process; will be registered as a finalizer.
This version uses relaxed timing.
"""
if process.is_alive():
logging.debug('sending shutdown message to manager')
try:
conn = _get_connection(_Client, address, authkey)
try:
dispatch(conn, None, 'shutdown')
finally:
conn.close()
# Just being defensive here.
except Exception: #pragma no cover
pass
process.join(timeout=2)
# No good way to cause process to not shut down.
if process.is_alive(): #pragma no cover
logging.warning('manager still alive after shutdown request')
if hasattr(process, 'terminate'):
logging.warning('trying to `terminate()` manager process')
process.terminate()
process.join(timeout=1)
if process.is_alive():
logging.warning('manager still alive after terminate')
if process.is_alive():
if sys.platform != 'win32':
os.kill(process.pid. signal.SIGKILL)
process.join(timeout=1)
if process.is_alive():
logging.warning('manager still alive after kill')
state.value = State.SHUTDOWN
try:
del BaseProxy._address_to_local[address]
# Just being defensive here.
except KeyError: #pragma no cover
pass
[docs]class ObjectManager(object):
"""
Provides a multiprocessing interface for an existing object.
obj: object
Object to provide remote access to.
address: tuple or string
A :mod:`multiprocessing` address specifying an Internet address or
a pipe.
serializer: string
Which serialization method to use.
authkey: string
Authorization key. Inherited from the current :class:`Process`
object if not specified.
name: string
Name for server, used in log files, etc.
allowed_hosts: list(string)
Host address patterns to check against.
allowed_users: dict
Dictionary of users and corresponding public keys allowed access.
If None, any user may access. If empty, no user may access.
"""
def __init__(self, obj, address=None, serializer='pickle', authkey=None,
name=None, allowed_hosts=None, allowed_users=None):
self._typeid = make_typeid(obj)
self._ident = '%x' % id(obj)
logging.debug('ObjectManager address %s, %r, name %r, ident %r',
address, keytype(authkey), name, self._ident)
self._manager = OpenMDAO_Manager(address=address, serializer=serializer,
authkey=authkey, name=name,
allowed_hosts=allowed_hosts,
allowed_users=allowed_users)
self._server = self._manager.get_server()
self._exposed = public_methods(obj)
with self._server.mutex:
self._server.id_to_obj[self._ident] = (obj, self._exposed, None)
self._server.id_to_refcount[self._ident] = 1
self._server_thread = threading.Thread(target=self._run_server)
self._server_thread.daemon = True
self._server_thread.start()
self._proxy = None
@property
[docs] def proxy(self):
""" Proxy for our object server. """
if self._proxy is None:
token = Token(self._typeid, self._server.address, self._ident)
authkey = self._server._authkey
pubkey = self._server.public_key if authkey == 'PublicKey' else None
self._proxy = _auto_proxy(token, self._manager._serializer,
authkey=authkey, exposed=self._exposed,
pubkey=pubkey)
self._server.incref(None, self._ident)
return self._proxy
def _run_server(self):
""" Run a manager server (in a separate thread!). """
self._server.serve_forever()
[docs]class OpenMDAO_Proxy(BaseProxy):
"""
Proxy for a remote object.
args: tuple
Passed to :class:`BaseProxy`.
kwds: dict
If it contains `pubkey`, then that value is used for the remote public
key, and the entry is removed before being passed to :class:`BaseProxy`.
This version sends credentials and provides dynamic result proxy generation.
.. note::
:meth:`BaseProxy.__str__` (used by :meth:`str` and the ``%s`` format)
will return :meth:`__repr__` of the remote object. To avoid the
network round-trip delay, use :meth:`repr` or the ``%r`` format.
"""
def __init__(self, *args, **kwds):
try:
pubkey = kwds['pubkey']
except KeyError:
pubkey = None
else:
del kwds['pubkey']
super(OpenMDAO_Proxy, self).__init__(*args, **kwds)
if self._manager is None:
self._pubkey = pubkey
else:
self._pubkey = self._manager._pubkey
def _connect(self):
""" This version translates tunneled addresses. """
util.debug('making connection to manager')
name = current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
address = tunnel_address(self._token.address)
conn = self._Client(address, authkey=self._authkey)
dispatch(conn, None, 'accept_connection', (name,))
self._tls.connection = conn
def _callmethod(self, methodname, args=None, kwds=None):
"""
Try to call a method of the referrent and return a copy of the result.
This version optionally encrypts the channel and sends the current
thread's credentials with method arguments.
"""
args = args or ()
kwds = kwds or {}
try:
conn = self._tls.connection
except AttributeError:
curr_thread = threading.current_thread()
util.debug('thread %r does not own a connection', curr_thread.name)
try:
self._connect()
except Exception as exc:
msg = "Can't connect to server at %r for %r: %r" \
% (self._token.address, methodname, exc)
logging.error(msg)
raise RuntimeError(msg)
conn = self._tls.connection
if self._authkey == 'PublicKey':
self._init_session(conn)
else:
self._tls.session_key = ''
session_key = self._tls.session_key
# FIXME: Bizarre problem evidenced by test_extcode.py (Python 2.6.1)
# For some reason pickling the env_vars dictionary causes:
# PicklingError: Can't pickle <class 'openmdao.main.mp_support.ObjServer'>:
# attribute lookup openmdao.main.mp_support.ObjServer failed
# The reported type is not in the (current) Dict items.
# Apparently this is some Traits 'feature'.
new_args = []
for arg in args:
if isinstance(arg, TraitDictObject):
new_args.append(dict(arg))
else:
new_args.append(arg)
try:
conn.send(encrypt((self._id, methodname, new_args, kwds,
get_credentials().encode()), session_key))
except IOError as exc:
msg = "Can't send to server at %r for %r: %r" \
% (self._token.address, methodname, exc)
logging.error(msg)
raise RuntimeError(msg)
kind, result = decrypt(conn.recv(), session_key)
if kind == '#RETURN':
return result
elif kind == '#PROXY':
exposed, token, pubkey = result
# Proxy passthru only happens remotely.
if self._manager is None: #pragma no cover
self._manager = OpenMDAO_Manager(token.address, self._authkey,
pubkey=pubkey)
try:
proxytype = self._manager._registry[token.typeid][-1]
except KeyError:
proxytype = None
if proxytype is None:
self._manager.register(token.typeid, None, _auto_proxy)
proxytype = self._manager._registry[token.typeid][-1]
if token.address != self._manager.address:
# Proxy to different server than request was sent to.
manager = OpenMDAO_Manager(token.address, self._authkey,
pubkey=pubkey)
else:
manager = self._manager
proxy = proxytype(token, self._serializer, manager=manager,
authkey=self._authkey, exposed=exposed,
pubkey=pubkey)
conn = _get_connection(self._Client, token.address, self._authkey)
dispatch(conn, None, 'decref', (token.id,))
return proxy
raise convert_to_error(kind, result)
def _init_session(self, conn):
""" Send client public key, receive session key. """
key_pair = get_key_pair(Credentials.user_host)
public_key = key_pair.publickey()
text = encode_public_key(public_key)
server_key = self._pubkey
encrypted = pk_encrypt(text, server_key)
client_version = 1
conn.send((client_version, server_key.n, server_key.e, encrypted))
server_data = conn.recv()
server_version = server_data[0]
# Just being defensive, this should never happen.
if server_version != 1: #pragma no cover
msg = 'Expecting server protocol version 1, got %r' % server_version
logging.error(msg)
if server_version == '#TRACEBACK':
try:
logging.error(''.join(server_data[1]))
except Exception:
pass
raise RuntimeError(msg)
self._tls.session_key = key_pair.decrypt(server_data[1])
def _incref(self):
"""
Tell server to increment its reference count.
This version avoids a hang in _Client if the server no longer exists.
"""
# Hard to cause this to happen.
if not OpenMDAO_Proxy.manager_is_alive(self._token.address): #pragma no cover
raise RuntimeError('Cannot connect to manager at %r'
% (self._token.address,))
conn = _get_connection(self._Client, self._token.address, self._authkey)
dispatch(conn, None, 'incref', (self._id,))
# Enable this with care. While testing CaseIteratorDriver it can cause a
# deadlock in logging (called via BaseProxy._after_fork()).
#util.debug('INCREF %r', self._token.id)
self._idset.add(self._id)
state = self._manager and self._manager._state
self._close = util.Finalize(
self, OpenMDAO_Proxy._decref,
args=(self._token, self._authkey, state,
self._tls, self._idset, self._Client),
exitpriority=10
)
@staticmethod
def _decref(token, authkey, state, tls, idset, _Client):
"""
Tell server to decrement its reference count.
This version avoids a hang in _Client if the server no longer exists.
"""
idset.discard(token.id)
# check whether manager is still alive
if state is None or state.value == State.STARTED:
# Avoid a hang in _Client() if the server isn't there anymore.
if OpenMDAO_Proxy.manager_is_alive(token.address):
# tell manager this process no longer cares about referent
try:
util.debug('DECREF %r', token.id)
conn = _get_connection(_Client, token.address, authkey)
dispatch(conn, None, 'decref', (token.id,))
# Hard to cause this to happen.
except Exception as exc: #pragma no cover
util.debug('... decref failed %s', exc)
else:
util.debug('DECREF %r -- manager already shutdown', token.id)
# check whether we can close this thread's connection because
# the process owns no more references to objects for this manager
if not idset and hasattr(tls, 'connection'):
util.debug('thread %r has no more %r proxies so closing conn',
threading.current_thread().name, token.typeid)
tls.connection.close()
del tls.connection
@staticmethod
[docs] def manager_is_alive(address):
"""
Check whether manager is still alive.
address: tuple or string
A :mod:`multiprocessing` address specifying an Internet address or
a pipe.
"""
addr_type = connection.address_type(address)
if addr_type == 'AF_PIPE': #pragma no cover
try:
win32.WaitNamedPipe(address, 10) # 0.01 sec
except WindowsError:
alive = False
else:
alive = True
else:
if addr_type == 'AF_INET':
sock = socket.socket(socket.AF_INET)
elif addr_type == 'AF_UNIX':
sock = socket.socket(socket.AF_UNIX)
address = tunnel_address(address)
try:
sock.connect(address)
except socket.error as exc:
if exc.args[0] == errno.ECONNREFUSED or \
exc.args[0] == errno.ENOENT:
alive = False
# Just being defensive.
else: #pragma no cover
raise
else:
sock.close()
alive = True
return alive
def __reduce__(self):
""" For unpickling. This version uses :func:`_auto_proxy`. """
kwds = {}
kwds['pubkey'] = self._pubkey
# Happens on other side of fork().
if Popen.thread_is_spawning(): #pragma no cover
kwds['authkey'] = self._authkey
elif self._pubkey:
# Can't pickle an AuthenticationString.
kwds['authkey'] = 'PublicKey'
kwds['exposed'] = self._exposed_
return (RebuildProxy,
(_auto_proxy, self._token, self._serializer, kwds))
[docs]def register(cls, manager, module=None):
"""
Register class `cls` proxy info with `manager`. The class will be
registered under it's full path, with '.' replaced by '_'.
Not typically called by user code.
cls: class
Class to be registered.
manager: :class:`OpenMDAO_Manager`
Manager to register with.
module: string
Module name for registration. Necessary if `cls` might be defined
by module '__main__'.
"""
if module is None:
module = cls.__module__
typeid = '%s.%s' % (module, cls.__name__)
typeid = typeid.replace('.', '_')
exposed = public_methods(cls)
proxytype = _make_proxy_type(typeid, exposed)
manager.register(typeid, callable=cls, exposed=exposed, proxytype=proxytype)
def _make_proxy_type(name, exposed):
"""
Return a proxy type whose methods are given by `exposed`.
This version supports special attribute access methods.
"""
exposed = tuple(exposed)
try:
return _PROXY_CACHE[(name, exposed)]
except KeyError:
pass
dic = {'_instance_names': {}, '_interface_names': {}}
for meth in exposed:
if meth == '__is_instance__':
# Call remote if we don't have the answer in cache.
exec """
def __is_instance__(self, *args, **kwds):
try:
return self._instance_names[args[0]]
except KeyError:
yes_no = self._callmethod('__is_instance__', args, kwds)
self._instance_names[args[0]] = yes_no
return yes_no
""" in dic
elif meth == '__has_interface__':
# Call remote if we don't have the answer in cache.
exec """
def __has_interface__(self, *args, **kwds):
try:
return self._interface_names[args[0]]
except KeyError:
yes_no = self._callmethod('__has_interface__', args, kwds)
self._interface_names[args[0]] = yes_no
return yes_no
""" in dic
elif meth == '__getattr__':
pass # Avoid recursion loop, but must be in 'exposed'.
elif meth == '__getattribute__':
# Call remote if not private or defined locally (proxied methods).
exec """
def __getattr__(self, *args, **kwds):
if args[0][0] == '_':
return object.__getattribute__(self, *args, **kwds)
try:
return object.__getattribute__(self, *args, **kwds)
except AttributeError:
try:
return self._callmethod('__getattribute__', args, kwds)
except AttributeError:
return self._callmethod('__getattr__', args, kwds)
""" in dic
elif meth == '__setattr__' or meth == '__delattr__':
# Call remote if not private.
exec """
def %s(self, *args, **kwds):
if args[0][0] == '_':
return object.%s(self, *args, **kwds)
return self._callmethod(%r, args, kwds)
""" % (meth, meth, meth) in dic
elif meth == '__exit__':
# Can't pickle traceback argument.
exec """
def __exit__(self, exc_type, exc_value, traceback):
return self._callmethod('__exit__', (exc_type, exc_value, None))
""" in dic
else:
# Normal method always calls remote.
exec """
def %s(self, *args, **kwds):
return self._callmethod(%r, args, kwds)
""" % (meth, meth) in dic
ProxyType = type(name, (OpenMDAO_Proxy,), dic)
ProxyType._exposed_ = exposed
_PROXY_CACHE[(name, exposed)] = ProxyType
return ProxyType
def _auto_proxy(token, serializer, manager=None, authkey=None,
exposed=None, incref=True, pubkey=None):
"""
Return an auto-proxy for `token`.
This version uses :func:`_make_proxy_type`.
"""
ProxyType = _make_proxy_type('OpenMDAO_AutoProxy[%s]' % token.typeid,
exposed)
try:
proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
incref=incref, pubkey=pubkey)
except Exception:
logging.exception('Auto proxy creation failed for %s at %s:',
token.typeid, token.address)
raise
proxy._isauto = True
return proxy
def _get_connection(_client, address, authkey):
"""
Get client connection to `address` using `authkey`.
Avoids dying on 'Interrupted system call'. (Should be in lower layer)
"""
address = tunnel_address(address)
for retry in range(3):
try:
conn = _client(address, authkey=authkey)
except IOError as exc:
if exc.errno != 4 or retry >= 2:
raise
except Exception:
raise
else:
return conn
raise RuntimeError('Too many connection attempts')