Source code for openmdao.main.publisher

import traceback

from threading import RLock

try:
    import simplejson as json
except ImportError:
    import json

try:
    import zmq
    from zmq.eventloop import zmqstream
except ImportError:
    zmq = None

from pkg_resources import working_set
from openmdao.util.log import logger
from openmdao.main.variable import json_default


_lock = RLock()
_binpub_types = None  # classes for sending complex binary reps of objects
_binpubs = {}  # [count, sender] corresponding to specific topics


try:
    from pyV3D import WV_Wrapper
except ImportError:
    WV_Wrapper = None
else:
[docs] class Pub_WV_Wrapper(WV_Wrapper): """A wrapper for the wv library that is used by a Sender to send updates to the Publisher. """ def __init__(self, name): super(Pub_WV_Wrapper, self).__init__() self.objname = name
[docs] def send(self, first=False): self.prepare_for_sends() if first: self.send_GPrim(self, 1, self.send_binary_data) # send init packet self.send_GPrim(self, -1, self.send_binary_data) # send initial suite of GPrims else: self.send_GPrim(self, -1, self.send_binary_data) # send initial suite of GPrims self.finish_sends()
[docs] def send_binary_data(self, wsi, buf, ibuf): """This is called multiple times during the sending of a set of graphics primitives. """ try: publish(self.objname, buf, binary=True) except Exception: logger.error(traceback.format_exc()) return -1 return 0
if zmq is None: class Publisher(object): def __init__(self, *args, **kwargs): pass def publish(self, topic, value, lock=True, binary=False): pass def publish_list(self, items): pass @staticmethod def get_instance(): return Publisher() @staticmethod def init(context, url, use_stream=True): pass @staticmethod def enable(): pass @staticmethod def disable(): pass @staticmethod def register(topic, obj): pass @staticmethod def unregister(topic): pass else:
[docs] class Publisher(object): __publisher = None __enabled = True silent = False def __init__(self, context, url, use_stream=True): # Socket to talk to pub socket sock = context.socket(zmq.PUB) sock.bind(url) if use_stream: self._sender = zmqstream.ZMQStream(sock) else: self._sender = sock
[docs] def publish(self, topic, value, lock=True, binary=False): global _binpubs if Publisher.__enabled: try: if lock: _lock.acquire() if binary: if not isinstance(value, bytes): raise TypeError("published binary value must be of type 'bytes'") logger.debug("sending binary value for topic %s" % topic) self._sender.send_multipart([topic.encode('utf-8'), value]) elif topic in _binpubs: # if a binary publisher exists for this topic, use that to # publish the value. It will call publish again (possibly multiple times) # with binary=True logger.debug("sending value via binpub for topic %s" % topic) try: _binpubs[topic][1].send(value) except Exception: logger.error("ERROR: %s" % traceback.format_exc()) else: msg = json.dumps([topic.encode('utf-8'), value], default=json_default) self._sender.send_multipart([msg]) if hasattr(self._sender, 'flush'): self._sender.flush() except Exception: print 'Publisher - Error publishing message %s: %s, %s' % \ (topic, value, traceback.format_exc()) finally: if lock: _lock.release()
[docs] def publish_list(self, items): if Publisher.__enabled: with _lock: for topic, value in items: self.publish(topic, value, lock=False)
@staticmethod
[docs] def get_instance(): return Publisher.__publisher
@staticmethod
[docs] def init(context, url, use_stream=True): if Publisher.__publisher is not None: raise RuntimeError("publisher already exists") Publisher.__publisher = Publisher(context, url, use_stream) return Publisher.__publisher
@staticmethod
[docs] def enable(): Publisher.__enabled = True
@staticmethod
[docs] def disable(): Publisher.__enabled = False
@staticmethod
[docs] def register(topic, obj): """Associates a given topic with a binary publisher based on the corresponding object type. If no binpub type exists for that object type, nothing happens. """ global _binpubs, _binpub_types sender = None if _binpub_types is None: load_binpubs() with _lock: if topic in _binpubs: logger.debug("found topic %s in _binpubs" % topic) _binpubs[topic][0] += 1 else: # see if a sender is registered for this object type for sender_type in _binpub_types: if sender_type.supports(obj): logger.debug("creating a sender for topic: %s" % topic) try: sender = sender_type(Pub_WV_Wrapper(topic)) except Exception: logger.error(traceback.format_exc()) _binpubs[topic] = [1, sender] break if sender is not None: sender.send(obj, first=True)
@staticmethod
[docs] def unregister(topic): """Removes an association between a 'sender' and a topic.""" global _binpubs with _lock: if topic in _binpubs: logger.debug("Publisher unregistering topic %s" % topic) if _binpubs[topic][0] <= 1: del _binpubs[topic] else: _binpubs[topic][0] -= 1
[docs]def publish(topic, msg, binary=False): try: Publisher.get_instance().publish(topic, msg, binary=binary) except AttributeError: if not Publisher.silent: raise RuntimeError("Publisher has not been initialized")
if WV_Wrapper is None: def load_binpubs(): pass else:
[docs] def load_binpubs(): """Loads all binpubs entry points.""" global _binpub_types logger.debug("loading binpubs") if _binpub_types is None: _binpub_types = [] # find all of the installed binpubs for ep in working_set.iter_entry_points('openmdao.binpub'): try: klass = ep.load() except Exception as err: logger.error("Entry point %s failed to load: %s" % (str(ep).split()[0], err)) else: logger.debug("adding binpub entry point: %s" % str(ep).split()[0]) with _lock: _binpub_types.append(klass)
OpenMDAO Home