Source code for larch.model.tree

import networkx as nx
from collections import OrderedDict
import numpy
import heapq
from ..util.touch_notifier import TouchNotify
from ..util.lazy import lazy

[docs]class NestingTree(nx.DiGraph): node_dict_factory = OrderedDict adjlist_dict_factory = OrderedDict def __get__(self, instance, owner): # self : SubkeyStore # instance : instance of parent class that has `self` as a member, or None # owner : class of `instance` if instance is None: pass # print("GRR: no instance") return self newself = getattr(instance, self.private_name, None) if newself is None: pass # print(f"GRR No Current: {instance=} {owner=}") try: instance.initialize_graph() except ValueError: pass newself = getattr(instance, self.private_name, None) if newself is not None: newself._instance = instance pass # print(f"GRR: get {instance=} {newself=}") return newself def __set__(self, instance, value): # self : NestingTree object # instance : instance of parent class that has `self` as a member # value : the new value that is trying to be assigned assert isinstance(value, NestingTree) t = value.copy() t._instance = instance setattr(instance, self.private_name, t) try: t._instance.mangle() except AttributeError as err: pass # print(f"GRR: {err}") else: pass # print(f"GRR Mangle: {instance}") def __delete__(self, instance): setattr(instance, self.private_name, None) try: instance.mangle() except AttributeError as err: pass # print(f"GRR: {err}") else: pass # print(f"GRR Mangle: {instance}") def __set_name__(self, owner, name): self.name = name self.private_name = "_private_"+name def touch(self): try: self._instance.mangle() except AttributeError: pass # print("GRR: mangle failure") else: pass # print("GRR: mangle ok")
[docs] def __init__(self, *arg, root_id=0, suggested_elemental_order=(), **kwarg): if len(arg) and isinstance(arg[0], NestingTree): super().__init__(*arg, **kwarg) self._root_id = arg[0]._root_id if suggested_elemental_order != (): self._suggested_elemental_order = suggested_elemental_order else: self._suggested_elemental_order = arg[0]._suggested_elemental_order else: super().__init__(*arg, **kwarg) self._root_id = root_id self._suggested_elemental_order = suggested_elemental_order if self._root_id not in self.nodes: self.add_node(root_id, name='_root_', root=True) self._clear_caches()
def __eq__(self, other): return ( self._adj == other._adj and self._node == other._node and self._root_id == other._root_id and self._suggested_elemental_order == other._suggested_elemental_order ) def suggest_elemental_order(self, order): self._suggested_elemental_order = tuple(j for j in order if j in self.nodes) @property def root_id(self): """int : The code for the root node.""" return self._root_id @root_id.setter def root_id(self, x): top_nests = list(self.successors(self._root_id)) top_attrs = [self.edges[self._root_id,t] for t in top_nests] if self._root_id in self.nodes: self.remove_node(self._root_id) self._root_id = x if self._root_id not in self.nodes: self.add_node(self._root_id, name='_root_', root=True) for t,a in zip(top_nests, top_attrs): self.add_edge(self._root_id, t, **a, _clear_caches=False) self._clear_caches() def _clear_caches(self): NestingTree.topological_sorted.invalidate(self, 'topological_sorted') NestingTree.topological_sorted_no_elementals.invalidate(self, 'topological_sorted_no_elementals') NestingTree.standard_sort.invalidate(self, 'standard_sort') NestingTree.standard_sort.invalidate(self, 'standard_slot_map') NestingTree.elementals.invalidate(self, 'elementals') NestingTree.standard_competitive_edge_list.invalidate(self, 'standard_competitive_edge_list') NestingTree.standard_competitive_edge_list_2.invalidate(self, 'standard_competitive_edge_list_2') self._predecessor_slots = {} self._successor_slots = {} self.touch()
[docs] def add_edge(self, u, v, implied=False, _clear_caches=True, **kwarg): """ Add an edge between u and v. The nodes u and v will be automatically added if they are not already in the graph. Edge attributes can be specified with keywords. Parameters ---------- u, v : int Nodes should be integer codes. The upstream node `u` is a nest or the root node. Downsteam node `v` can be a nest or elemental alternative. implied : bool, default False Implied edges are for connection of otherwise unconnected nests to the root node. _clear_caches : bool, default True kwarg : keyword arguments, optional Edge data (or labels or objects) can be assigned using keyword arguments. """ if not implied: drops = [] for u_,v_,imp_ in self.in_edges(nbunch=[v], data='implied'): if imp_: drops.append([u_,v_]) for d in drops: super().remove_edge(*d) if _clear_caches: self._clear_caches() return super().add_edge(int(u), int(v), implied=implied, **kwarg)
def _remove_edge_no_implied(self, u, v, *arg, **kwarg): result = super().remove_edge(u, v) self._clear_caches() return result
[docs] def remove_edge(self, u, v, *arg, **kwarg): """ Remove the edge between u and v. Parameters ---------- u, v : int Remove the edge between nodes u and v. Raises ------ NetworkXError If there is not an edge between u and v. """ result = super().remove_edge(u, v) if self.in_degree(v)==0 and v!=self._root_id: self.add_edge(self._root_id, v, implied=True, _clear_caches=False) self._clear_caches() return result
[docs] def add_node(self, code, *, children=(), parent=None, parents=None, phi_parameters=None, **kwarg): """ Add a single node `code` and update node attributes. Parameters ---------- code : int Although the generic networkx.DiGraph allows a node to be any hashable Python object except None, Larch assumes that node codes are integers. children : Collection A collection of other node codes that are the children of this new node. Links will be created from this node to each child. parent : int, optional The parent of this new node. If not given, the root node is assumed to be the parent of this node, and an implied link is created. This implied link is removed if the node is later made the child of some other node. If the parent is set explicitly, the link is *not* removed later. parents : Collection, optional Set multiple parent up-stream nodes. phi_parameters : Mapping Set phi parameters on graph links connecting to this node, used in network GEV models. The keys of this mapping indicate the node at the other end of the link, and the values are parameter names. kwarg : other keyword arguments, optional Set or change node attributes using key=value. """ if parents is not None and parent is not None: raise TypeError("cannot give both parent and parents arguments") super().add_node(code, **kwarg) for child in children: self.add_edge(code, child, _clear_caches=False) if parent is not None: self.add_edge(parent, code, _clear_caches=False) elif parents is not None: for p in parents: self.add_edge(p, code, _clear_caches=False) else: if self.in_degree(code)==0 and code!=self._root_id: self.add_edge(self._root_id, code, implied=True, _clear_caches=False) if phi_parameters is not None: for k, parametername in phi_parameters.items(): if (code, k) in self.edges: self.edges[code, k]['parameter'] = str(parametername) elif (k,code) in self.edges: self.edges[k, code]['parameter'] = str(parametername) else: raise ValueError(f"connected node {k} from phi_parameters not found") self._clear_caches()
[docs] def new_node(self, *, code=None, **kwarg): """ Add a new nesting node to this NestingTree. A new unique code is automatically created and returned by this method for creating new nests. All arguments must be given as keyword parameters. Parameters ---------- parameter : str The name of the parameter to associate with this nest. children : Collection[int], optional The code numbers for the children of this nest. These can be elemental alternatives or other nests. If not given, no children will be defined initially, but they can be added later. parent : int, optional The code number for the parent of this nest. If not given, the parent is implied as the root node, unless and until set to some other node. name : str, optional A human-readable name to associate with this nest. code : int, optional Use this code for the new nest. If this code already exists, a ValueError is raised. Returns ------- int The new code for this nest. Raises ------ ValueError If a new code is given but it already exists in this tree. """ if code is None: proposed_code = len(self) while proposed_code in self: proposed_code += 1 else: if code in self: raise ValueError(f'code {code} already exists in this tree') proposed_code = code self.add_node(proposed_code, **kwarg) return proposed_code
def add_nodes(self, codes, *arg, parent=None, **kwarg): for code in codes: self.add_node(code, *arg, parent=parent, **kwarg)
[docs] def remove_node(self, n): """ Remove node n. Removes the node n, reconnecting all outedges to the head node of all inedges. Attempting to remove a non-existent node will raise an exception. Parameters ---------- n : int A node in the graph """ replace_edges = { k: self.edges[k].copy() for k in self.edges(n) } replace_heads = [k for k, _ in self.in_edges(n)] super(NestingTree, self).remove_node(n) for k, attrs in replace_edges.items(): for h in replace_heads: super().add_edge(h, k[1], **attrs) self._clear_caches()
@lazy def topological_sorted(self): return list(reverse_lexicographical_topological_sort(self)) @lazy def topological_sorted_no_elementals(self): try: result = self.topological_sorted.copy() except nx.NetworkXUnfeasible: from networkx.algorithms.cycles import find_cycle try: cycle = find_cycle(self) except: pass else: print("Found graph cycle:") print(list(cycle)) raise # collect zero out-degree (elemental) codes in a set # to remove them in a batch, which is much faster than # removing them from the list one at a time. to_remove = set() for code, out_degree in self.out_degree: if not out_degree: to_remove.add(code) return [i for i in result if i not in to_remove] @lazy def standard_sort(self): return self.elementals + tuple(self.topological_sorted_no_elementals) def node_name(self, code): return self.nodes[code].get('name', str(code)) @property def standard_sort_names(self): return [self.node_name(s) for s in self.standard_sort] def node_names(self): return {s:(self.node_name(s) or s) for s in self.standard_sort} def elemental_names(self): return {s:(self.node_name(s) or s) for s in self.elementals} @lazy def standard_slot_map(self): return {i:n for n,i in enumerate(self.standard_sort)} def predecessor_slots(self, code): if code in self._predecessor_slots: return self._predecessor_slots[code] s = numpy.empty(self.in_degree(code), dtype=numpy.int32) for n,i in enumerate( self.predecessors(code) ): s[n] = self.standard_slot_map[i] self._predecessor_slots[code] = s return s def successor_slots(self, code): if code in self._successor_slots: return self._successor_slots[code] s = numpy.empty(self.out_degree(code), dtype=numpy.int32) for n,i in enumerate( self.successors(code) ): s[n] = self.standard_slot_map[i] self._successor_slots[code] = s return s def __elementals_iter(self): for code, out_degree in self.out_degree: if not out_degree: yield code @lazy def elementals(self): result = [] found = set() for e in self._suggested_elemental_order: if self.out_degree(e)==0: result.append(e) found.add(e) for e in sorted(self.__elementals_iter()): if e not in found: result.append(e) found.add(e) return tuple(result) def n_elementals(self): return len(self.elementals) def n_intermediate_nests(self): return len(self.nodes) - self.n_elementals() - 1 def elemental_descendants_iter(self, code): if not self.out_degree(code): yield code return all_d = nx.descendants(self, code) for dcode, dout_degree in self.out_degree(all_d): if not dout_degree: yield dcode def elemental_descendants(self, code): return [i for i in self.elemental_descendants_iter(code)] @property def n_edges(self): return self.number_of_edges() def edge_slot_arrays(self, alpha_locator=None): s = self.n_edges up = numpy.zeros(s, dtype=numpy.int32) dn = numpy.zeros(s, dtype=numpy.int32) first_visit = numpy.zeros(s, dtype=numpy.int32) alloc_slot = numpy.full_like(first_visit, -1) n = s first_visit_found = set() for upcode in reversed(self.standard_sort): upslot = self.standard_slot_map[upcode] for dnslot in reversed(self.successor_slots(upcode)): n -= 1 up[n] = upslot dn[n] = dnslot for n in range(s): if dn[n] not in first_visit_found: first_visit[n] = 1 first_visit_found.add(dn[n]) if alpha_locator is not None: for n in range(s): alloc_slot[n] = alpha_locator.get( (self.standard_sort[up[n]], self.standard_sort[dn[n]]), -1 ) return up, dn, first_visit, alloc_slot def nodes_with_successors_iter(self): for code, out_degree in self.out_degree: if out_degree: yield code def nodes_with_multiple_predecessors_iter(self): for code, in_degree in self.in_degree: if in_degree>1: yield code @lazy def standard_competitive_edge_list(self): alphas = [] for n in self.nodes_with_multiple_predecessors_iter(): predecessors = sorted(self.predecessors(n)) for k in predecessors: alphas.append( (k,n) ) return alphas @lazy def standard_competitive_edge_list_2(self): alphas = [] for n in self.nodes_with_multiple_predecessors_iter(): predecessors = sorted(self.predecessors(n)) alphas.append( (predecessors, n) ) return alphas def __getstate__(self): attr = {} # self.__dict__.copy() no_pickle = ( 'topological_sorted', 'topological_sorted_no_elementals', 'standard_sort', 'standard_slot_map', #'_standard_elemental_sort', 'elementals', '_predecessor_slots', '_successor_slots', '_touch', 'node_dict_factory', '_instance', ) for k,v in self.__dict__.items(): if k not in no_pickle: attr[k] = v return attr def __setstate__(self, state): self.__dict__ = state.copy() self._predecessor_slots = {} self._successor_slots = {} def __xml__(self, use_viz=True, use_dot=True, output='svg', figsize=None, **format): viz = None dot = None if use_viz: try: import pygraphviz as viz except ImportError: if use_dot: try: import pydot as dot except ImportError: pass elif use_dot: try: import pydot as dot except ImportError: pass if viz is None and dot is None: import warnings if use_viz and use_dot: msg = "neither pydot nor pygraphviz modules are installed, unable to draw nesting tree" elif use_viz: msg = "pygraphviz module not installed, unable to draw nesting tree" elif use_dot: msg = "pydot module not installed, unable to draw nesting tree" else: msg = "no drawing module used, unable to draw nesting tree" warnings.warn(msg) raise NotImplementedError(msg) if viz is not None: existing_format_keys = list(format.keys()) for key in existing_format_keys: if key.upper()!=key: format[key.upper()] = format[key] if 'SUPPRESSGRAPHSIZE' not in format: if 'GRAPHWIDTH' not in format: format['GRAPHWIDTH'] = 6.5 if 'GRAPHHEIGHT' not in format: format['GRAPHHEIGHT'] = 4 if 'UNAVAILABLE' not in format: format['UNAVAILABLE'] = True # x = XML_Builder("div", {'class':"nesting_graph larch_art"}) # x.h2("Nesting Structure", anchor=1, attrib={'class':'larch_art_xhtml'}) from io import BytesIO if 'SUPPRESSGRAPHSIZE' not in format: G=viz.AGraph(name='Tree',directed=True,size="{GRAPHWIDTH},{GRAPHHEIGHT}".format(**format)) else: G=viz.AGraph(name='Tree',directed=True) for n in self.nodes: nname = self.nodes[n].get('name', n) if nname == n: G.add_node(n, label='<{1}>'.format(n,nname), style='rounded,solid', shape='box') else: G.add_node(n, label='<{1} <FONT COLOR="#999999">({0})</FONT>>'.format(n,nname), style='rounded,solid', shape='box') eG = G.add_subgraph(name='cluster_elemental', nbunch=self.elementals, color='#cccccc', bgcolor='#eeeeee', label='Elemental Alternatives', labelloc='b', style='rounded,solid') unavailable_nodes = set() # if format['UNAVAILABLE']: # if self.is_provisioned(): # try: # for n, ncode in enumerate(self.alternative_codes()): # if numpy.sum(self.Data('Avail'),axis=0)[n,0]==0: unavailable_nodes.add(ncode) # except: raise # try: # legible_avail = not isinstance(self.df.queries.avail, str) # except: # legible_avail = False # if legible_avail: # for ncode,navail in self.df.queries.avail.items(): # try: # if navail=='0': unavailable_nodes.add(ncode) # except: raise # eG.add_subgraph(name='cluster_elemental_unavailable', nbunch=unavailable_nodes, color='#bbbbbb', bgcolor='#dddddd', # label='Unavailable Alternatives', labelloc='b', style='rounded,solid') G.add_node(self.root_id, label="Root") up_nodes = set() down_nodes = set() for i,j in self.edges: G.add_edge(i,j) down_nodes.add(j) up_nodes.add(i) pyg_imgdata = BytesIO() try: G.draw(pyg_imgdata, format=output, prog='dot') # write postscript in k5.ps with neato layout except ValueError as err: if 'in path' in str(err): import warnings warnings.warn(str(err)+"; unable to draw nesting tree in report") raise NotImplementedError() if output=='svg': import xml.etree.ElementTree as ET ET.register_namespace("","http://www.w3.org/2000/svg") ET.register_namespace("xlink","http://www.w3.org/1999/xlink") return ET.fromstring(pyg_imgdata.getvalue().decode()) else: raise NotImplementedError(f"output {output} with use_viz") else: pydot = dot # set Graphviz graph type if self.is_directed(): graph_type = 'digraph' else: graph_type = 'graph' strict = nx.number_of_selfloops(self) == 0 and not self.is_multigraph() name = self.name graph_defaults = self.graph.get('graph', {}) if name == '': P = pydot.Dot('', graph_type=graph_type, strict=strict, **graph_defaults) else: P = pydot.Dot('"%s"' % name, graph_type=graph_type, strict=strict, **graph_defaults) try: P.set_node_defaults(**self.graph['node']) except KeyError: pass try: P.set_edge_defaults(**self.graph['edge']) except KeyError: pass cluster_elemental = pydot.Cluster( 'elemental', style='rounded', bgcolor='lightgrey', color='white', rank='same', rankdir="LR", ) for n, nodedata in self.nodes(data=True): str_nodedata = dict((k if k!='name' else 'name_', '"'+str(v)+'"') for k, v in nodedata.items()) if 'parameter' in nodedata: param_label = '<BR ALIGN="CENTER" /><FONT COLOR="#999999" POINT-SIZE="9"><I>{0}</I></FONT>'.format(nodedata['parameter']) else: param_label = '' if 'name' in nodedata and n != self.root_id: name = nodedata['name'] str_nodedata['label'] = '<' \ '<FONT COLOR="#999999" POINT-SIZE="9">({1}) </FONT>' \ '{0}' \ '{2}>'.format(name,n,param_label) # Default styling for nodes can have been overridden if n in self.elementals: str_nodedata['style'] = str_nodedata.get('style', 'filled') str_nodedata['fillcolor'] = str_nodedata.get('fillcolor', 'white') elif n == self.root_id: str_nodedata['shape'] = str_nodedata.get('shape', 'invhouse') else: str_nodedata['style'] = str_nodedata.get('style', 'rounded') str_nodedata['shape'] = str_nodedata.get('shape', 'rectangle') p = pydot.Node(str(n), **str_nodedata) P.add_node(p) if n in self.elementals: cluster_elemental.add_node(p) P.add_subgraph(cluster_elemental) if self.is_multigraph(): for u, v, key, edgedata in self.edges(data=True, keys=True): str_edgedata = dict((k, str(v_)) for k, v_ in edgedata.items() if k != 'key') if v in self.elementals: str_edgedata['constraint'] = 'false' edge = pydot.Edge(str(u), str(v), key=str(key), **str_edgedata) P.add_edge(edge) else: for u, v, edgedata in self.edges(data=True): str_edgedata = dict((k, '"'+str(v)+'"') for k, v in edgedata.items()) edge = pydot.Edge(str(u), str(v), **str_edgedata) P.add_edge(edge) ### from xmle import Elem prog = None if output == 'svg': import xml.etree.ElementTree as ET ET.register_namespace("","http://www.w3.org/2000/svg") ET.register_namespace("xlink","http://www.w3.org/1999/xlink") elif output == 'png': prog = [P.prog, '-Gdpi=300'] if figsize is not None: prog.append(f"-Gsize={figsize[0]},{figsize[1]}\!") e = Elem.from_any(P.create(prog=prog, format=output, **format)) e.attrib['dpi'] = (300,300) return e return Elem.from_any(P.create(prog=prog, format=output, **format)) def _repr_html_(self): from xmle import Elem x = Elem('div') << (self.__xml__()) return x.tostring() def to_png(self, figsize=None, filename=None): """ Output the graph visualization as a png. Parameters ---------- figsize : 2-tuple, optional The (width, height) in inches. Returns ------- xmle.Elem """ result = self.__xml__(output='png', use_viz=False, figsize=figsize) if filename is not None: import base64 if result.attrib['src'][:22] != "data:image/png;base64,": raise ValueError("problem decoding png:{}".format(result.attrib['src'][:22])) with open(filename, "wb") as fh: fh.write(base64.decodebytes(result.attrib['src'][22:].encode())) return result def partial_figure(self, including_nodes=None, source=None, *, n=None, n_at_level=3, n_expand=1): """ Generate a partial figure of the graph. Parameters ---------- including_nodes : iterable or None An iterable containing node codes or names (or a mix). source : nodecode, optional All paths from this node to everything in `including_nodes` will be represented. Defaults to `root_id`. n : int If including_nodes is None, select this number of nodes randomly Returns ------- Elem """ if source is None: source = self.root_id from networkx.algorithms.simple_paths import all_simple_paths shows = set() if including_nodes is None and n is not None: including_nodes = sorted(numpy.random.choice(self.nodes, n, replace=False)) if including_nodes is None and n_at_level is not None: from collections import deque import itertools q = deque([self.root_id]) including_nodes = [] while q: i = q.popleft() take = tuple(itertools.islice(self.successors(i), n_at_level)) q.extend(take[:n_expand]) including_nodes.extend(take) # Add every node in every path from the root to each `including_nodes` for each_node in including_nodes: if each_node in self.nodes: for i in all_simple_paths(self, source, each_node): for j in i: shows.add(j) else: for each_node_ in self.get_nodes_by_name(each_node): for i in all_simple_paths(self, source, each_node_): for j in i: shows.add(j) s = self.subgraph(shows) return graph_to_figure(s) def get_nodes_by_name(self, name): result = [k for k,v in self.nodes(data=True) if v.get('name')==name] return result def subgraph_from(self, node): from collections import deque Q = deque([node]) found = set() while len(Q): i = Q.popleft() if i not in found: found.add(i) Q.extend(self.successors(i)) return NestingTree(self.subgraph(found), root_id=node) def stats_summarize(self): print("Graph Stats") print(f" Overall: {len(self)} nodes") tier = [self.root_id] next_tier = list(self.successors(self.root_id)) n = 0 while len(next_tier): tier = next_tier n += 1 print(f" Tier {n}: {len(tier)} nodes") next_tier = list() for i in tier: next_tier.extend(self.successors(i)) def node_slot_arrays(self, model): muslots= numpy.full([len(self), ], -1, dtype=numpy.int32) for child, childcode in enumerate(self.standard_sort): # for parent in self.predecessor_slots(childcode): # alpha[parent, child] = 1 pname = self.nodes[childcode].get('parameter', None) muslots[child] = model.get_slot_x(pname) num = numpy.zeros(len(self.nodes), dtype=numpy.int32) start = numpy.full(len(self.nodes), -1, dtype=numpy.int32) n = self.n_edges for upcode in reversed(self.standard_sort): upslot = self.standard_slot_map[upcode] for dnslot in reversed(self.successor_slots(upcode)): n -= 1 num[upslot] += 1 start[upslot] = n return (muslots, start, num, ) def _get_simple_mu_and_alpha(self, model, holdfast_invalidates=True): # alpha = numpy.zeros([len(self), len(self)], dtype=numpy.float64) mu = numpy.ones ([len(self), ], dtype=numpy.float64) muslots= numpy.full([len(self), ], -1, dtype=numpy.int32) for child, childcode in enumerate(self.standard_sort): # for parent in self.predecessor_slots(childcode): # alpha[parent, child] = 1 pname = self.nodes[childcode].get('parameter', None) mu[child] = model.get_value(pname, default=1.0) muslots[child] = model.get_slot_x(pname, holdfast_invalidates) s = self.n_edges up = numpy.zeros(s, dtype=numpy.int32) dn = numpy.zeros(s, dtype=numpy.int32) val = numpy.zeros(s, dtype=numpy.float64) num = numpy.zeros(len(self.nodes), dtype=numpy.int32) start = numpy.full(len(self.nodes), -1, dtype=numpy.int32) #first_visit = numpy.zeros(s, dtype=numpy.int32) n = s #first_visit_found = set() for upcode in reversed(self.standard_sort): upslot = self.standard_slot_map[upcode] for dnslot in reversed(self.successor_slots(upcode)): n -= 1 up[n] = upslot dn[n] = dnslot num[upslot] += 1 start[upslot] = n val[n] = 1/len(self.predecessor_slots(self.standard_sort[dnslot])) # TODO make not always constant fraction # for n in range(s): # if dn[n] not in first_visit_found: # first_visit[n] = 1 # first_visit_found.add(dn[n]) return mu, muslots, up, dn, num, start, val
def graph_to_figure(graph, output_format='svg', **format): try: import pygraphviz as viz except ImportError: import warnings warnings.warn("pygraphviz module not installed, unable to draw nesting tree") raise NotImplementedError("pygraphviz module not installed, unable to draw nesting tree") existing_format_keys = list(format.keys()) for key in existing_format_keys: if key.upper()!=key: format[key.upper()] = format[key] if 'SUPPRESSGRAPHSIZE' not in format: if 'GRAPHWIDTH' not in format: format['GRAPHWIDTH'] = 6.5 if 'GRAPHHEIGHT' not in format: format['GRAPHHEIGHT'] = 4 if 'UNAVAILABLE' not in format: format['UNAVAILABLE'] = True # x = XML_Builder("div", {'class':"nesting_graph larch_art"}) # x.h2("Nesting Structure", anchor=1, attrib={'class':'larch_art_xhtml'}) from io import BytesIO if 'SUPPRESSGRAPHSIZE' not in format: G=viz.AGraph(name='Tree',directed=True,size="{GRAPHWIDTH},{GRAPHHEIGHT}".format(**format)) else: G=viz.AGraph(name='Tree',directed=True) for n in graph.nodes: nname = graph.nodes[n].get('name', n) if nname == n: G.add_node(n, label='<{1}>'.format(n,nname), style='rounded,solid', shape='box') else: G.add_node(n, label='<{1} <FONT COLOR="#999999">({0})</FONT>>'.format(n,nname), style='rounded,solid', shape='box') try: graph.elementals except AttributeError: pass else: eG = G.add_subgraph(name='cluster_elemental', nbunch=graph.elementals, color='#cccccc', bgcolor='#eeeeee', label='Elemental Alternatives', labelloc='b', style='rounded,solid') unavailable_nodes = set() # if format['UNAVAILABLE']: # if self.is_provisioned(): # try: # for n, ncode in enumerate(self.alternative_codes()): # if numpy.sum(self.Data('Avail'),axis=0)[n,0]==0: unavailable_nodes.add(ncode) # except: raise # try: # legible_avail = not isinstance(self.df.queries.avail, str) # except: # legible_avail = False # if legible_avail: # for ncode,navail in self.df.queries.avail.items(): # try: # if navail=='0': unavailable_nodes.add(ncode) # except: raise # eG.add_subgraph(name='cluster_elemental_unavailable', nbunch=unavailable_nodes, color='#bbbbbb', bgcolor='#dddddd', # label='Unavailable Alternatives', labelloc='b', style='rounded,solid') try: G.add_node(graph.root_id, label="Root") except AttributeError: pass up_nodes = set() down_nodes = set() for i,j in graph.edges: G.add_edge(i,j) down_nodes.add(j) up_nodes.add(i) pyg_imgdata = BytesIO() try: G.draw(pyg_imgdata, format=output_format, prog='dot') # write postscript in k5.ps with neato layout except ValueError as err: if 'in path' in str(err): import warnings warnings.warn(str(err)+"; unable to draw nesting tree in report") raise NotImplementedError() from xmle import Elem if output_format == 'svg': import xml.etree.ElementTree as ET ET.register_namespace("","http://www.w3.org/2000/svg") ET.register_namespace("xlink","http://www.w3.org/1999/xlink") result = ET.fromstring(pyg_imgdata.getvalue().decode()) else: result = Elem('span', attrib={'style':'color:red'}, text=f"Unable to render output_format '{output_format}'") x = Elem('div') << result return x def reverse_lexicographical_topological_sort(G, key=None): """ Generator of nodes in reverse lexicographically topologically sorted order. A general topological sort is a nonunique permutation of the nodes such that an edge from u to v implies that u appears before v in the topological sort order. The lexicographical topological sort breaks ties by ordering according to node labels, so that the sorting becomes unique. Parameters ---------- G : NetworkX digraph A directed acyclic graph (DAG) key : function, optional This function maps nodes to keys with which to resolve ambiguities in the sort order. Defaults to the identity function. Returns ------- iterable An iterable of node names in lexicographical topological sort order. Raises ------ NetworkXError Topological sort is defined for directed graphs only. If the graph `G` is undirected, a :exc:`NetworkXError` is raised. NetworkXUnfeasible If `G` is not a directed acyclic graph (DAG) no topological sort exists and a :exc:`NetworkXUnfeasible` exception is raised. This can also be raised if `G` is changed while the returned iterator is being processed RuntimeError If `G` is changed while the returned iterator is being processed. """ if not G.is_directed(): msg = "Topological sort not defined on undirected graphs." raise nx.NetworkXError(msg) if key is None: def key(node): return node nodeid_map = {n: i for i, n in enumerate(G)} def create_tuple(node): return key(node), nodeid_map[node], node outdegree_map = {v: d for v, d in G.out_degree() if d > 0} # These nodes have zero outdegree and ready to be returned. zero_outdegree = [create_tuple(v) for v, d in G.out_degree() if d == 0] heapq.heapify(zero_outdegree) while zero_outdegree: _, _, node = heapq.heappop(zero_outdegree) if node not in G: raise RuntimeError("Graph changed during iteration") for parent, child in G.in_edges(node): try: outdegree_map[parent] -= 1 except KeyError as e: raise RuntimeError("Graph changed during iteration") from e if outdegree_map[parent] == 0: heapq.heappush(zero_outdegree, create_tuple(parent)) del outdegree_map[parent] yield node if zero_outdegree: msg = "Graph contains a cycle or graph changed during iteration" raise nx.NetworkXUnfeasible(msg)