"""
.. _`grid_engine.py`:
GridEngine resource allocator and object server.
By adding the allocator to the resource allocation manager, resource requests
will interrogate the allocator to see if it could be used. This would typically
be done by :class:`ExternalCode` to execute a compute-intensive or parallel
application.
"""
import fnmatch
import os.path
import string
from openmdao.main.mp_support import OpenMDAO_Manager, register
from openmdao.main.objserverfactory import ObjServer
from openmdao.main.rbac import rbac
from openmdao.main.resource import FactoryAllocator, JOB_CATEGORIES, \
HOME_DIRECTORY, WORKING_DIRECTORY
from openmdao.util.shellproc import ShellProc, STDOUT, PIPE, DEV_NULL
# Translate illegal job name characters.
_XLATE = string.maketrans(' \n\t\r/:@\\*?', '__________')
[docs]class GridEngineAllocator(FactoryAllocator):
"""
Knows about GridEngine cluster resources (via `qhost`).
Uses :class:`GridEngineServer` instead of :class:`ObjServer` when deploying.
name: string
Name of allocator; used in log messages, etc.
pattern: string
:mod:`fnmatch`-style pattern used to select hosts from `qhost` output.
authkey: string
Authorization key for this allocator and any deployed servers.
allow_shell: bool
If True, :meth:`execute_command` and :meth:`load_model` are allowed
in created servers. Since :meth:`execute_command` is required, this
is defaulted to be True.
.. warning::
There is a security risk with `allow_shell` True. Be careful to limit
factory servers to the intended set of users!
Resource configuration file entry equivalent to defaults::
[GridEngine]
classname: openmdao.main.grid_engine.GridEngineAllocator
pattern: *
authkey: PublicKey
allow_shell: True
MPICH2: mpich
OpenMPI: ompi
The last two entries provide a mapping between DRMAA job category names
and the configured GridEngine parallel environment names. Additional
categories may be configured, and the above configuration is site-specific.
"""
_QHOST = ['qhost'] # Replaced with path to fake for testing.
def __init__(self, name='GridEngine', pattern='*', authkey=None,
allow_shell=True):
super(GridEngineAllocator, self).__init__(name, authkey, allow_shell)
self.factory.manager_class = _ServerManager
self.factory.server_classname = \
'grid_engine_grid_engine_GridEngineServer'
self.pattern = pattern
self.category_map = {}
@rbac('*')
[docs] def max_servers(self, resource_desc):
"""
Return the maximum number of servers which could be deployed for
`resource_desc`. The value needn't be exact, but performance may
suffer if it overestimates. The value is used to limit the number
of concurrent evaluations.
resource_desc: dict
Description of required resources.
"""
retcode, info = self.check_compatibility(resource_desc)
if retcode != 0:
return (0, info)
avail_cpus = len(self._get_hosts())
if 'min_cpus' in resource_desc:
req_cpus = resource_desc['min_cpus']
if req_cpus > avail_cpus:
return (0, {'min_cpus': 'want %s, available %s'
% (req_cpus, avail_cpus)})
else:
return (avail_cpus / req_cpus, {})
else:
return (avail_cpus, {})
@rbac('*')
[docs] def time_estimate(self, resource_desc):
"""
Return ``(estimate, criteria)`` indicating how well this resource
allocator can satisfy the `resource_desc` request. The estimate will
be:
- >0 for an estimate of walltime (seconds).
- 0 for no estimate.
- -1 for no resource at this time.
- -2 for no support for `resource_desc`.
The returned criteria is a dictionary containing information related
to the estimate, such as hostnames, load averages, unsupported
resources, etc.
resource_desc: dict
Description of required resources.
"""
retcode, info = self.check_compatibility(resource_desc)
if retcode != 0:
return (retcode, info)
hostnames = self._get_hosts()
if not hostnames:
return (-2, {'hostnames': 'no hosts available'})
if 'min_cpus' in resource_desc:
value = resource_desc['min_cpus']
if len(hostnames) < value:
return (-2, {'min_cpus': 'want %s, have %s'
% (value, len(hostnames))})
criteria = {
'hostnames' : hostnames,
'total_cpus' : len(hostnames),
}
return (0, criteria)
[docs] def check_compatibility(self, resource_desc):
"""
Check compatibility with resource attributes.
resource_desc: dict
Description of required resources.
Returns ``(retcode, info)``. If Compatible, then `retcode` is zero
and `info` is empty. Otherwise, `retcode` will be -2 and `info` will
be a single-entry dictionary whose key is the incompatible key in
`resource_desc` and whose value provides data regarding the incompatibility.
"""
retcode, info = \
super(GridEngineAllocator, self).check_compatibility(resource_desc)
if retcode != 0:
return (retcode, info)
for key in info:
value = resource_desc[key]
if key == 'localhost':
if value:
return (-2, {key: 'requested local host'})
elif key == 'min_cpus' or key == 'max_cpus':
pass # Handle in upper layer.
else:
return (-2, {key: 'unrecognized key'})
return (0, {})
def _get_hosts(self):
""" Return list of hostnames sorted by load. """
# Get host load information.
try:
proc = ShellProc(self._QHOST, stdout=PIPE)
except Exception as exc:
self._logger.error('%r failed: %s' % (self._QHOST, exc))
return []
lines = proc.stdout.readlines()
# Reduce to hosts we're interested in and sort by CPU-adjusted load.
loads = []
for line in lines:
if line.startswith(('HOSTNAME', '-')):
continue
hostname, arch, ncpu, load, \
memtot, memuse, swapto, swapus = line.split()
if self.pattern:
if not fnmatch.fnmatchcase(hostname, self.pattern):
continue
try:
load = float(load)
ncpu = int(ncpu)
except ValueError:
continue
loads.append((hostname, load / ncpu, ncpu))
loads = sorted(loads, key=lambda item: item[1])
# Return list of hostnames.
hosts = []
for hostname, load, ncpu in loads:
for i in range(ncpu):
hosts.append(hostname)
return hosts
@rbac('*')
[docs] def deploy(self, name, resource_desc, criteria):
"""
Deploy a server suitable for `resource_desc`.
Returns a proxy to the deployed server.
Overrides superclass to pass `category_map` to server.
name: string
Name for server.
resource_desc: dict
Description of required resources.
criteria: dict
The dictionary returned by :meth:`time_estimate`.
"""
server = super(GridEngineAllocator, self).deploy(name, resource_desc,
criteria)
if server is not None:
server.configure(self.category_map)
return server
[docs]class GridEngineServer(ObjServer):
""" Knows about executing a command via `qsub`. """
_QSUB = ['qsub'] # Replaced with path to fake for testing.
@rbac('owner')
@rbac('owner')
[docs] def execute_command(self, resource_desc):
"""
Submit command based on `resource_desc`.
resource_desc: dict
Description of command and required resources.
The '-V' `qsub` option is always used to export the current environment
to the job. This environment is first updated with any 'job_environment'
data. The '-sync yes' `qsub` option is used to wait for job completion.
Other job resource keys are processed as follows:
========================= =========================
Resource Key Translation
========================= =========================
``submit_as_hold`` -h
------------------------- -------------------------
rerunnable -r yes|no
------------------------- -------------------------
``working_directory`` -wd `value`
------------------------- -------------------------
``job_category`` Sets parallel environment
------------------------- -------------------------
``min_cpus`` Sets parallel environment
------------------------- -------------------------
``max_cpus`` Sets parallel environment
------------------------- -------------------------
``min_phys_memory`` Ignored
------------------------- -------------------------
email -M `value`
------------------------- -------------------------
``email_on_started`` -m b
------------------------- -------------------------
``email_on_terminated`` -m e
------------------------- -------------------------
``job_name`` -N `value`
------------------------- -------------------------
``input_path`` -i `value`
------------------------- -------------------------
``output_path`` -o `value`
------------------------- -------------------------
``error_path`` -e `value`
------------------------- -------------------------
``join_files`` -j yes|no
------------------------- -------------------------
``reservation_id`` -ar `value`
------------------------- -------------------------
``queue_name`` -q `value`
------------------------- -------------------------
priority -p `value`
------------------------- -------------------------
``start_time`` -a `value`
------------------------- -------------------------
``deadline_time`` Ignored
------------------------- -------------------------
``accounting_id`` -A `value`
========================= =========================
Where `value` is the corresponding resource value.
If 'working_directory' is not specified, add ``-cwd``.
If 'input_path' is not specified, add ``-i /dev/null``.
If 'output_path' is not specified, add ``-o <remote_command>.stdout``.
If 'error_path' is not specified, add ``-j yes``.
If 'native_specification' is specified, it is added to the `qsub`
command just before 'remote_command' and 'args'.
If specified, 'job_category' is used to index into the category
map set up during allocator configuration. The mapped category
name as well as the 'min_cpus' and 'max_cpus' values are used
with the ``-pe`` qsub option.
Some resource limits are also handled:
==================== =========================
Resource Key Translation
==================== =========================
``core_file_size`` Ignored
-------------------- -------------------------
``data_seg_size`` Ignored
-------------------- -------------------------
``file_size`` Ignored
-------------------- -------------------------
``open_files`` Ignored
-------------------- -------------------------
``stack_size`` Ignored
-------------------- -------------------------
``virtual_memory`` Ignored
-------------------- -------------------------
``cpu_time`` -l h_cpu= `value`
-------------------- -------------------------
``wallclock_time`` -l h_rt= `value`
==================== =========================
Output from `qsub` itself is routed to ``qsub.out``.
"""
self.home_dir = os.path.expanduser('~')
self.work_dir = ''
cmd = list(self._QSUB)
cmd.extend(('-V', '-sync', 'yes', '-b', 'yes'))
env = None
inp, out, err = None, None, None
# Set working directory now, for possible path fixing.
try:
value = resource_desc['working_directory']
except KeyError:
pass
else:
self.work_dir = self._fix_path(value)
cmd.extend(('-wd', value))
# Process description in fixed, repeatable order.
keys = ('submit_as_hold',
'rerunnable',
'job_environment',
'email',
'email_on_started',
'email_on_terminated',
'job_name',
'input_path',
'output_path',
'error_path',
'join_files',
'reservation_id',
'queue_name',
'priority',
'start_time',
'accounting_id')
email_events = ''
for key in keys:
try:
value = resource_desc[key]
except KeyError:
continue
if key == 'submit_as_hold':
if value:
cmd.append('-h')
elif key == 'rerunnable':
cmd.extend(('-r', 'yes' if value else 'no'))
elif key == 'job_environment':
env = value
elif key == 'email':
cmd.extend(('-M', ','.join(value)))
elif key == 'email_on_started':
email_events += 'b'
elif key == 'email_on_terminated':
email_events += 'e'
elif key == 'job_name':
if value:
cmd.extend(('-N', self._jobname(value)))
elif key == 'input_path':
cmd.extend(('-i', self._fix_path(value)))
inp = value
elif key == 'output_path':
cmd.extend(('-o', self._fix_path(value)))
out = value
elif key == 'error_path':
cmd.extend(('-e', self._fix_path(value)))
err = value
elif key == 'join_files':
cmd.extend(('-j', 'yes' if value else 'no'))
if value:
err = 'yes'
elif key == 'reservation_id':
cmd.extend(('-ar', value))
elif key == 'queue_name':
cmd.extend(('-q', value))
elif key == 'priority':
cmd.extend(('-p', str(value)))
elif key == 'start_time':
cmd.extend(('-a', value.strftime('%Y%m%d%H%M.%S')))
elif key == 'accounting_id':
cmd.extend(('-A', value))
if email_events:
cmd.extend(('-m', email_events))
# Setup parallel environment.
if 'job_category' in resource_desc:
job_category = resource_desc['job_category']
try:
parallel_environment = self.category_map[job_category]
except KeyError:
msg = 'No mapping for job_category %r' % job_category
self._logger.error(msg)
raise ValueError(msg)
min_cpus = resource_desc.get('min_cpus', 1)
max_cpus = resource_desc.get('max_cpus', min_cpus)
cmd.extend(('-pe', parallel_environment,
'%d-%d' % (min_cpus, max_cpus)))
# Set resource limits.
if 'resource_limits' in resource_desc:
limits = resource_desc['resource_limits']
if 'cpu_time' in limits:
cpu_time = limits['cpu_time']
cmd.extend(('-l', 'h_cpu=%s' % self._timelimit(cpu_time)))
if 'wallclock_time' in limits:
wall_time = limits['wallclock_time']
cmd.extend(('-l', 'h_rt=%s' % self._timelimit(wall_time)))
# Set default command configuration.
if not self.work_dir:
cmd.append('-cwd')
if inp is None:
cmd.extend(('-i', DEV_NULL))
if out is None:
base = os.path.basename(resource_desc['remote_command'])
cmd.extend(('-o', '%s.stdout' % base))
if err is None:
cmd.extend(('-j', 'yes'))
# Add 'escape' clause.
if 'native_specification' in resource_desc:
cmd.extend(resource_desc['native_specification'])
cmd.append(self._fix_path(resource_desc['remote_command']))
if 'args' in resource_desc:
for arg in resource_desc['args']:
cmd.append(self._fix_path(arg))
self._logger.info('%r', ' '.join(cmd))
try:
process = ShellProc(cmd, DEV_NULL, 'qsub.out', STDOUT, env)
except Exception as exc:
self._logger.error('exception creating process: %s', exc)
raise
self._logger.debug(' PID = %d', process.pid)
return_code, error_msg = process.wait(1)
self._logger.debug(' returning %s', (return_code, error_msg))
return (return_code, error_msg)
def _fix_path(self, path):
""" Translates special prefixes. """
if path.startswith(HOME_DIRECTORY):
path = os.path.join(self.home_dir, path[len(HOME_DIRECTORY):])
elif path.startswith(WORKING_DIRECTORY):
path = os.path.join(self.work_dir, path[len(WORKING_DIRECTORY):])
return path
@staticmethod
def _jobname(name):
""" Create legal job name from `name`. """
return name.translate(_XLATE)
@staticmethod
def _timelimit(seconds):
""" Make time string from `seconds`. """
seconds = float(seconds)
hours = int(seconds / (60*60))
seconds -= hours * 60*60
minutes = int(seconds / 60)
seconds -= minutes * 60
seconds = int(seconds)
return '%d:%d:%d' % (hours, minutes, seconds)
class _ServerManager(OpenMDAO_Manager):
"""
A :class:`multiprocessing.Manager` which manages :class:`GridEngineServer`.
"""
pass
register(GridEngineServer, _ServerManager, 'grid_engine.grid_engine')