"""
Routines analyzing dependencies (class and module) in Python source.
"""
import os
import sys
import ast
import time
import __builtin__
import cPickle
import networkx as nx
from openmdao.util.fileutil import find_files, get_module_path, find_module
from openmdao.util.log import logger
# This is a dict containing all of the entry point groups that OpenMDAO uses to
# identify plugins, and their corresponding Interfaces.
plugin_groups = { 'openmdao.container': ['IContainer'],
'openmdao.component': ['IComponent','IContainer'],
'openmdao.driver': ['IDriver','IComponent','IContainer'],
'openmdao.variable': ['IVariable'],
'openmdao.surrogatemodel': ['ISurrogate'],
'openmdao.doegenerator': ['IDOEgenerator'],
'openmdao.casefilter': ['ICaseFilter'],
'openmdao.caseiterator': ['ICaseIterator'],
'openmdao.caserecorder': ['ICaseRecorder'],
'openmdao.architecture': ['IArchitecture'],
'openmdao.optproblem': ['IOptProblem','IAssembly',
'IComponent','IContainer'],
'openmdao.parametric_geometry': ['IParametricGeometry'],
}
iface_set = set()
for ifaces in plugin_groups.values():
iface_set.update(ifaces)
def _to_str(node):
"""Take groups of Name nodes or a Str node and convert to a string."""
if isinstance(node, ast.Name):
return node.id
val = node.value
parts = [node.attr]
while True:
if isinstance(val, ast.Attribute):
parts.append(val.attr)
val = val.value
elif isinstance(val, ast.Name):
parts.append(val.id)
break
else: # it's more than just a simple dotted name
return None
return '.'.join(parts[::-1])
# the following are interfaces that get added via the add_delegate decorator
_delegate_ifaces = {
'HasParameters': 'IHasParameters',
'HasConstraints': 'IHasConstraints',
'HasEqConstraints': 'IHasEqConstraints',
'HasInEqConstraints': 'IHasInEqConstraints',
'HasObjective': 'IHasObjective',
'HasObjectives': 'IHasObjectives',
'HasStopConditions': 'IHasStopConditions',
'HasEvents': 'IHasEvents',
'HasCouplingVars': 'IHasCouplingVars',
}
[docs]class ClassInfo(object):
def __init__(self, name, fname, bases, meta, decorators):
self.name = name
self.fname = fname
self.bases = bases
self.meta = meta
ifaces = meta.setdefault('ifaces', [])
for dec in decorators:
if dec.func.id == 'add_delegate':
for arg in [_to_str(a) for a in dec.args]:
if arg in _delegate_ifaces:
ifaces.append(_delegate_ifaces[arg])
class _ClassBodyVisitor(ast.NodeVisitor):
def __init__(self):
self.metadata = {}
ast.NodeVisitor.__init__(self)
def visit_ClassDef(self, node):
for bnode in node.body:
self.visit(bnode)
def visit_Call(self, node):
if isinstance(node.func, ast.Name) and node.func.id == 'implements':
for arg in node.args:
if isinstance(arg, ast.Name):
self.metadata.setdefault('ifaces', []).append(arg.id)
def visit_Assign(self, node):
if len(self.metadata)==0 and len(node.targets) == 1:
lhs = node.targets[0]
if isinstance(lhs, ast.Name) and lhs.id == '__openmdao_meta__':
dct = ast.literal_eval(node.value)
dct.setdefault('ifaces', [])
dct['ifaces'].extend(self.metadata.setdefault('ifaces', []))
self.metadata.update(dct)
[docs]class PythonSourceFileAnalyser(ast.NodeVisitor):
"""Collects info about imports and class inheritance from a
Python file.
"""
def __init__(self, fname, tree_analyser):
ast.NodeVisitor.__init__(self)
self.fname = os.path.abspath(os.path.expanduser(fname))
self.modpath = get_module_path(fname)
self.classes = {}
self.localnames = {} # map of local names to package names
self.starimports = []
self.unresolved_classes = set()
self.tree_analyser = tree_analyser
# in order to get this to work with the 'ast' lib, I have
# to read using universal newlines and append a newline
# to the string I read for some files. The 'compiler' lib
# didn't have this problem. :(
f = open(self.fname, 'Ur')
try:
contents = f.read()
if len(contents)>0 and contents[-1] != '\n':
contents += '\n'
for node in ast.walk(ast.parse(contents, self.fname)):
self.visit(node)
finally:
f.close()
self.update_graph(self.tree_analyser.graph)
self.update_ifaces(self.tree_analyser.graph)
self.tree_analyser = None
[docs] def visit_ClassDef(self, node):
"""This executes every time a class definition is parsed."""
fullname = '.'.join([self.modpath, node.name])
self.localnames[node.name] = fullname
bases = [_to_str(b) for b in node.bases]
bvisitor = _ClassBodyVisitor()
bvisitor.visit(node)
bases = [self.localnames.get(b, b) for b in bases]
self.classes[fullname] = ClassInfo(fullname, self.fname, bases,
bvisitor.metadata,
node.decorator_list)
self.tree_analyser.class_map[fullname] = self.classes[fullname]
undef_bases = [b for b in bases if b not in self.classes and not hasattr(__builtin__, b)]
while undef_bases:
base = undef_bases.pop()
cinfo = self.tree_analyser.find_classinfo(base)
if cinfo is None:
parts = base.rsplit('.', 1)
if len(parts) == 1: # no dot, so maybe it came in with a '*' import
trymods = self.starimports[::-1]
basename = base
else:
trymods = [parts[0]]
basename = parts[1]
for modname in trymods:
excluded = False
for m in self.tree_analyser.mod_excludes:
if m == modname or modname.startswith(m+'.'):
excluded = True
break
if excluded:
continue
fpath = find_module(modname)
if fpath is not None:
fanalyzer = self.tree_analyser.analyze_file(fpath)
if '.' not in base:
trybase = '.'.join([modname, base])
else:
trybase = base
if trybase in fanalyzer.classes:
break
elif basename in fanalyzer.localnames:
newname = fanalyzer.localnames[basename]
self.tree_analyser.class_map[trybase] = newname
if newname not in self.tree_analyser.class_map and \
newname not in self.unresolved_classes:
undef_bases.append(newname)
break
else:
self.unresolved_classes.add(base)
[docs] def visit_Import(self, node):
"""This executes every time an "import foo" style import statement
is parsed.
"""
for al in node.names:
if al.asname is None:
self.localnames[al.name] = al.name
else:
self.localnames[al.asname] = al.name
[docs] def visit_ImportFrom(self, node):
"""This executes every time a "from foo import bar" style import
statement is parsed.
"""
# need the following to handle relative imports
if node.level == 0:
module = node.module
else:
parts = self.modpath.split('.')
module = '.'.join(parts[0:len(parts)-node.level])
if node.module is not None:
module += '.'.join([module, node.module])
for al in node.names:
if al.name == '*':
self.starimports.append(module)
continue
if al.asname is None:
self.localnames[al.name] = '.'.join([module, al.name])
else:
self.localnames[al.asname] = '.'.join([module, al.name])
[docs] def update_graph(self, graph):
"""Update the inheritance/implements graph."""
for classname, classinfo in self.classes.items():
graph.add_node(classname, classinfo=classinfo)
for base in classinfo.bases:
cinfo = self.tree_analyser.find_classinfo(base)
if cinfo:
base = cinfo.name
graph.add_edge(base, classname)
for iface in classinfo.meta.setdefault('ifaces', []):
graph.add_edge(iface, classname)
[docs] def update_ifaces(self, graph):
"""Update our ifaces metadata based on the contents of the
inheritance/implements graph.
"""
for iface in iface_set:
try:
paths = nx.shortest_path(graph, source=iface)
except KeyError:
continue
for cname, cinfo in self.classes.items():
if cname in paths:
cinfo.meta.setdefault('ifaces', []).append(iface)
cinfo.meta['ifaces'] = list(set(cinfo.meta['ifaces']))
[docs]class PythonSourceTreeAnalyser(object):
def __init__(self, startdir=None, exclude=None, mod_excludes=None,
direxclude=None):
self.files_count = 0 # number of files analyzed
# inheritance graph. It's a directed graph with base classes pointing
# to the classes that inherit from them. Also includes interfaces
# pointing to classes that implement them.
self.graph = nx.DiGraph()
if isinstance(startdir, basestring):
self.startdirs = [startdir]
elif startdir is None:
self.startdirs = []
else:
self.startdirs = startdir
self.startdirs = [os.path.expandvars(os.path.expanduser(d)) for d in self.startdirs]
if mod_excludes is None:
self.mod_excludes = set(['traits', 'zope', 'ast'])
else:
self.mod_excludes = mod_excludes
self.modinfo = {} # maps module pathnames to PythonSourceFileAnalyzers
self.fileinfo = {} # maps filenames to (PythonSourceFileAnalyzer, modtime)
self.class_map = {} # map of classname to ClassInfo for the class
for pyfile in find_files(self.startdirs, "*.py", exclude=exclude,
direxclude=direxclude):
self.analyze_file(pyfile)
[docs] def dump(self, out, options):
"""Dumps the contents of this object for debugging purposes."""
for f, tup in self.fileinfo.items():
out.write("%s\n" % os.path.relpath(f))
if options.showclasses and tup[0].classes:
out.write(" classes:\n")
for item, cinfo in tup[0].classes.items():
out.write(" %s\n" % item)
if options.showbases and cinfo.bases and cinfo.bases != ['object']:
out.write(" bases:\n")
for base in cinfo.bases:
if base in tup[0].unresolved_classes:
out.write(" ???(%s)\n" % base)
else:
out.write(" %s\n" % base)
if options.showifaces and cinfo.meta.get('ifaces'):
out.write(" interfaces:\n")
for iface in cinfo.meta['ifaces']:
out.write(" %s\n" % iface)
out.write("\n\nFiles examined: %d\n\n" % self.files_count)
[docs] def find_classinfo(self, cname):
cinfo = cname
prev = None
while cinfo != prev:
prev = cinfo
try:
cinfo = self.class_map[cinfo]
except KeyError:
return None
if isinstance(cinfo, ClassInfo):
return cinfo
return None
[docs] def analyze_file(self, pyfile, use_cache=False):
# don't analyze the file again if we've already done it and it hasn't
# changed. If `use_cache` is True then lookup/record in global cache.
mtime = os.path.getmtime(pyfile)
if pyfile in self.fileinfo:
if mtime <= self.fileinfo[pyfile][1]:
return self.fileinfo[pyfile][0]
if use_cache:
info = _FileInfoCache.lookup(pyfile)
if info is not None and mtime <= info[1]:
self.fileinfo[pyfile] = info
return info[0]
logger.info("analyzing %s", pyfile)
myvisitor = PythonSourceFileAnalyser(pyfile, self)
self.modinfo[get_module_path(pyfile)] = myvisitor
self.fileinfo[myvisitor.fname] = (myvisitor,
os.path.getmtime(myvisitor.fname))
self.files_count += 1
if use_cache:
_FileInfoCache.record(pyfile, (myvisitor,
os.path.getmtime(myvisitor.fname)))
return myvisitor
[docs] def flush_cache(self):
_FileInfoCache.save()
[docs] def remove_file(self, fname):
fvisitor = self.fileinfo[fname][0]
del self.fileinfo[fname]
del self.modinfo[fvisitor.modpath]
nodes = []
for klass, cinfo in self.class_map.items():
if isinstance(cinfo, ClassInfo):
if cinfo.fname == fname:
nodes.append(klass)
else:
modname = get_module_path(fname)+'.'
if klass.startswith(modname) or cinfo.startswith(modname):
nodes.append(klass)
self.graph.remove_nodes_from(nodes)
for klass in nodes:
del self.class_map[klass]
[docs] def find_inheritors(self, base):
"""Returns a list of names of classes that inherit from the given base
class."""
try:
paths = nx.shortest_path(self.graph, source=base, target=None)
except KeyError:
return []
del paths[base] # don't want the base itself in the list
return paths.keys()
[docs] def get_interfaces(self, classname):
''' Returns a set of interfaces for a given class.'''
ifaces = set()
klass = self.find_classinfo(classname)
if klass:
ifaces.update(klass.meta['ifaces'])
for base in klass.bases:
ifaces.update(self.get_interfaces(base))
return ifaces
class _FileInfoCache(object):
""" Retains file analysis information. """
_cache = None
_dirty = False
@staticmethod
def lookup(path):
""" Return analysis info for `path`. """
if _FileInfoCache._cache is None:
_FileInfoCache._load()
return _FileInfoCache._cache.get(path)
@staticmethod
def record(path, info):
""" Record analysis info for `path`. """
_FileInfoCache._cache[path] = info
_FileInfoCache._dirty = True
@staticmethod
def save():
""" Save analysis info to file. """
if _FileInfoCache._dirty:
out = _FileInfoCache._open('wb')
cPickle.dump(_FileInfoCache._cache, out, cPickle.HIGHEST_PROTOCOL)
out.close()
_FileInfoCache._dirty = False
@staticmethod
def _load():
""" Load analysis info from file. """
_FileInfoCache._cache = {}
try:
inp = _FileInfoCache._open('rb')
except Exception:
return
# Full test with coverage removes cache at start, so we won't get here.
try: #pragma no cover
_FileInfoCache._cache = cPickle.load(inp)
except Exception: #pragma no cover
return
finally: #pragma no cover
inp.close()
@staticmethod
def _open(mode):
""" Return opened file for '~/.openmdao/fileanalyzer.dat'. """
filename = \
os.path.expanduser(os.path.join('~', '.openmdao', 'fileanalyzer.dat'))
dirname = os.path.dirname(filename)
# Full test with coverage leaves directory intact.
if not os.path.exists(dirname): #pragma no cover
os.mkdir(dirname)
return open(filename, mode)
[docs]def main():
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument("-c", "--classes", action='store_true', dest='showclasses',
help="show classes found")
parser.add_argument("-b", "--bases", action="store_true", dest="showbases",
help="show base classes (only works if --classes is active)")
parser.add_argument("-i", "--interfaces", action="store_true", dest="showifaces",
help="show interfaces of classes (only works if --classes is active)")
parser.add_argument("-u", "--use-cache", action='store_true', dest='use_cache',
help="use analysis cache")
parser.add_argument('files', metavar='fname', type=str, nargs='+',
help='a file or directory to be scanned')
options = parser.parse_args()
print options.use_cache
stime = time.time()
psta = PythonSourceTreeAnalyser()
for f in options.files:
f = os.path.abspath(os.path.expanduser(f))
if os.path.isdir(f):
for pyfile in find_files(f, "*.py", exclude=lambda n: 'test' in n.split(os.sep)):
psta.analyze_file(pyfile, use_cache=options.use_cache)
else:
psta.analyze_file(f, use_cache=options.use_cache)
psta.dump(sys.stdout, options)
sys.stdout.write("elapsed time: %s seconds\n\n" % (time.time() - stime))
if options.use_cache:
_FileInfoCache.save()
if __name__ == '__main__':
main()