From bd9b180eb959490efe9a3fc1771b0b1cbfbf79b3 Mon Sep 17 00:00:00 2001 From: Kor de Jong Date: Fri, 24 Jan 2025 14:59:58 +0100 Subject: [PATCH 1/2] Initial work on refactoring the graph hierarchy --- .../graph/directed_graph.py | 103 ++++++++++++++++++ .../graph/multi_rooted_graph.py | 22 ++++ .../adaptation_pathways/graph/pathway_map.py | 51 ++++++--- .../adaptation_pathways/graph/rooted_graph.py | 98 +---------------- .../plot/pathway_map/classic.py | 61 +++++++---- .../plot/pathway_map/default.py | 12 +- 6 files changed, 205 insertions(+), 142 deletions(-) create mode 100644 source/package/adaptation_pathways/graph/directed_graph.py create mode 100644 source/package/adaptation_pathways/graph/multi_rooted_graph.py diff --git a/source/package/adaptation_pathways/graph/directed_graph.py b/source/package/adaptation_pathways/graph/directed_graph.py new file mode 100644 index 0000000..21a17cd --- /dev/null +++ b/source/package/adaptation_pathways/graph/directed_graph.py @@ -0,0 +1,103 @@ +import typing + +import networkx as nx + + +class DirectedGraph: + """ + Base class for specialized directed graphs. + """ + + _graph: nx.DiGraph + + def __init__(self) -> None: + self._graph = nx.DiGraph() + + def __str__(self) -> str: + return "\n".join(nx.generate_network_text(self._graph)) + + @property + def graph(self) -> nx.DiGraph: + """ + :return: The layered directed graph instance + + Try not to use it -- it should be an implementation detail as much as possible. + """ + return self._graph + + # def is_empty(self) -> bool: + # """ + # :return: Whether or not the tree is empty + # """ + # return nx.is_empty(self._graph) + + def nr_nodes(self) -> int: + """ + :return: Number of nodes + """ + return len(self._graph.nodes) + + def nr_edges(self) -> int: + """ + :return: Number of edges + """ + return len(self._graph.edges) + + def all_to_nodes(self, from_node): + # Use shortest_path to find all nodes reachable from the node passed in + graph = self._graph.subgraph(nx.shortest_path(self._graph, from_node)) + + # Remove the from_node itself before returning the result + result = list(graph.nodes) + result.remove(from_node) + + return result + + def to_nodes(self, from_node) -> list[typing.Any]: + """ + :return: Collection of nodes that start at the node passed in + """ + return list(self._graph.adj[from_node]) + + def from_nodes(self, to_node): + """ + :return: Collection of nodes that end at the node passed in + """ + # TODO Can this be done more efficiently? + return [node for node in self._graph.nodes() if to_node in self.to_nodes(node)] + + def leaf_nodes(self) -> typing.Iterable[typing.Any]: + """ + :Return: Iterable for iterating over all leaf nodes + """ + return [ + node + for node in self._graph.nodes() + if self._graph.in_degree(node) != 0 and self._graph.out_degree(node) == 0 + ] + + def all_paths(self) -> list[list[typing.Any]]: + result = [] + + if self.nr_nodes() > 0: + graph = self._graph + + root_nodes = [ + node for node, degree in self._graph.in_degree() if degree == 0 + ] + leaf_nodes = self.leaf_nodes() + + for root_node in root_nodes: + cutoff = None + result += nx.all_simple_paths(graph, root_node, leaf_nodes, cutoff) + + return result + + def set_attribute(self, name: str, value: typing.Any) -> None: + """ + Add / update attribute value to / of the graph + + :param name: Name of attribute to set + :param value: Value of the attribute to set + """ + self.graph.graph[name] = value diff --git a/source/package/adaptation_pathways/graph/multi_rooted_graph.py b/source/package/adaptation_pathways/graph/multi_rooted_graph.py new file mode 100644 index 0000000..3b6df1c --- /dev/null +++ b/source/package/adaptation_pathways/graph/multi_rooted_graph.py @@ -0,0 +1,22 @@ +from .directed_graph import DirectedGraph + + +class MultiRootedGraph(DirectedGraph): + """ + Class for directed out-graphs in which there can be multiple graphs rooted at independent nodes. + """ + + @property + def root_nodes(self): + """ + :return: The root nodes + """ + + root_nodes = [node for node, degree in self._graph.in_degree() if degree == 0] + + nr_root_nodes = len(root_nodes) + + if nr_root_nodes == 0: + raise LookupError("Graph is empty") + + return root_nodes diff --git a/source/package/adaptation_pathways/graph/pathway_map.py b/source/package/adaptation_pathways/graph/pathway_map.py index d900535..beb9daf 100644 --- a/source/package/adaptation_pathways/graph/pathway_map.py +++ b/source/package/adaptation_pathways/graph/pathway_map.py @@ -1,10 +1,10 @@ from ..action import Action from ..action_combination import ActionCombination +from .multi_rooted_graph import MultiRootedGraph from .node import ActionBegin, ActionEnd, TippingPoint -from .rooted_graph import RootedGraph -class PathwayMap(RootedGraph): +class PathwayMap(MultiRootedGraph): """ A PathwayMap represents a collection of adaptation pathways. These pathways are encoded in a directed rooted graph in which the nodes represent... @@ -37,24 +37,28 @@ def all_action_begins_and_ends( return self.all_to_nodes(begin) def all_action_begins(self) -> list[ActionBegin]: - assert isinstance(self.root_node, ActionBegin) + result = [] - result = [self.root_node] + for root_node in self.root_nodes: + assert isinstance(root_node, ActionBegin) - for node in self.all_to_nodes(self.root_node): - if isinstance(node, ActionBegin): - result.append(node) + result.append(root_node) + + for node in self.all_to_nodes(root_node): + if isinstance(node, ActionBegin): + result.append(node) return result def all_action_ends(self) -> list[ActionEnd]: - assert isinstance(self.root_node, ActionBegin) - result = [] - for node in self.all_to_nodes(self.root_node): - if isinstance(node, ActionEnd): - result.append(node) + for root_node in self.root_nodes: + assert isinstance(root_node, ActionBegin) + + for node in self.all_to_nodes(root_node): + if isinstance(node, ActionEnd): + result.append(node) return result @@ -135,20 +139,31 @@ def tipping_points(self) -> list[TippingPoint]: return list(dict.fromkeys(result)) def tipping_point_range(self) -> tuple[TippingPoint, TippingPoint]: - result: tuple[TippingPoint, TippingPoint] = (0, 0) + min_tipping_point = 0.0 + max_tipping_point = 0.0 if self.nr_nodes() > 0: - min_tipping_point = self.action_end(self.root_node).tipping_point + root_nodes = self.root_nodes + + # Initialize min tipping point to a value in the actual range of the tipping points + min_tipping_point = self.action_end(root_nodes[0]).tipping_point + + for root_node in root_nodes[1:]: + min_tipping_point = min( + min_tipping_point, self.action_end(root_node).tipping_point + ) + + # Initialize max tipping point to a value in the actual range of the tipping points max_tipping_point = min_tipping_point for end_node in self.leaf_nodes(): max_tipping_point = max(max_tipping_point, end_node.tipping_point) - result = (min_tipping_point, max_tipping_point) - - assert result[0] <= result[1], result + assert ( + min_tipping_point <= max_tipping_point + ), f"{min_tipping_point} < {max_tipping_point}" - return result + return min_tipping_point, max_tipping_point # def set_node_attribute(self, name: str, value: dict[Action, typing.Any]) -> None: # """ diff --git a/source/package/adaptation_pathways/graph/rooted_graph.py b/source/package/adaptation_pathways/graph/rooted_graph.py index 283ff93..6b80a84 100644 --- a/source/package/adaptation_pathways/graph/rooted_graph.py +++ b/source/package/adaptation_pathways/graph/rooted_graph.py @@ -1,49 +1,11 @@ -import typing +from .directed_graph import DirectedGraph -import networkx as nx - -class RootedGraph: +class RootedGraph(DirectedGraph): """ - A rooted graph represents a directed out-graph in which each node can be reached from the - root node. + Class for directed out-graphs in which each node can be reached from the one root node. """ - _graph: nx.DiGraph - - def __init__(self) -> None: - self._graph = nx.DiGraph() - - def __str__(self) -> str: - return "\n".join(nx.generate_network_text(self._graph)) - - @property - def graph(self) -> nx.DiGraph: - """ - :return: The layered directed graph instance - - Try not to use it -- it should be an implementation detail as much as possible. - """ - return self._graph - - # def is_empty(self) -> bool: - # """ - # :return: Whether or not the tree is empty - # """ - # return nx.is_empty(self._graph) - - def nr_nodes(self) -> int: - """ - :return: Number of nodes - """ - return len(self._graph.nodes) - - def nr_edges(self) -> int: - """ - :return: Number of edges - """ - return len(self._graph.edges) - @property def root_node(self): """ @@ -64,57 +26,3 @@ def root_node(self): ) return root_nodes[0] - - def all_to_nodes(self, from_node): - # Use shortest_path to find all nodes reachable from the node passed in - graph = self._graph.subgraph(nx.shortest_path(self._graph, from_node)) - - # Remove the from_node itself before returning the result - result = list(graph.nodes) - result.remove(from_node) - - return result - - def to_nodes(self, from_node) -> list[typing.Any]: - """ - :return: Collection of nodes that start at the node passed in - """ - return list(self._graph.adj[from_node]) - - def from_nodes(self, to_node): - """ - :return: Collection of nodes that end at the node passed in - """ - # TODO Can this be done more efficiently? - return [node for node in self._graph.nodes() if to_node in self.to_nodes(node)] - - def leaf_nodes(self) -> typing.Iterable[typing.Any]: - """ - :Return: Iterable for iterating over all leaf nodes - """ - return [ - node - for node in self._graph.nodes() - if self._graph.in_degree(node) != 0 and self._graph.out_degree(node) == 0 - ] - - def all_paths(self): - result = [] - - if self.nr_nodes() > 0: - graph = self._graph - source_node = self.root_node - target_nodes = self.leaf_nodes() - cutoff = None - result = nx.all_simple_paths(graph, source_node, target_nodes, cutoff) - - return result - - def set_attribute(self, name: str, value: typing.Any) -> None: - """ - Add / update attribute value to / of the graph - - :param name: Name of attribute to set - :param value: Value of the attribute to set - """ - self.graph.graph[name] = value diff --git a/source/package/adaptation_pathways/plot/pathway_map/classic.py b/source/package/adaptation_pathways/plot/pathway_map/classic.py index 883722e..8269816 100644 --- a/source/package/adaptation_pathways/plot/pathway_map/classic.py +++ b/source/package/adaptation_pathways/plot/pathway_map/classic.py @@ -434,16 +434,15 @@ def _spread_horizontally( # pylint: disable-next=too-many-locals, too-many-branches def _distribute_vertically( pathway_map: PathwayMap, - root_action_begin: ActionBegin, + root_actions_begins: list[ActionBegin], position_by_node: dict[ActionBegin | ActionEnd, np.ndarray], ) -> dict[str, float]: - action_end = pathway_map.action_end(root_action_begin) - position_by_node[action_end][1] = position_by_node[root_action_begin][1] + for root_action_begin in root_actions_begins: + action_end = pathway_map.action_end(root_action_begin) + position_by_node[action_end][1] = position_by_node[root_action_begin][1] - # min_distance = 1.0 - - # All unique action instances in the graph + # All action instances in the graph actions = pathway_map.actions() # Sieve out combined actions that combine a single *existing* action with a *new* one. These @@ -484,16 +483,17 @@ def _distribute_vertically( ) ) - # Nodes related to the root action are already positioned - # Delete the y-coordinate of the root action - assert y_coordinates[math.floor(len(names_of_actions_to_distribute) / 2)] == 0.0 - del y_coordinates[math.floor(len(names_of_actions_to_distribute) / 2)] - - # Delete the name of the root action - assert ( - names_of_actions_to_distribute[0] == root_action_begin.action.name - ), names_of_actions_to_distribute[0] - del names_of_actions_to_distribute[0] + # Nodes related to the root action are already positioned, at y == 0.0. Delete those coordinates and the + # root action names. + y_coordinates = [coordinate for coordinate in y_coordinates if coordinate != 0.0] + root_actions = [ + root_action_begin.action for root_action_begin in root_actions_begins + ] + root_action_names = [action.name for action in root_actions] + names_of_actions_to_distribute = [ + name for name in names_of_actions_to_distribute if name not in root_action_names + ] + assert len(y_coordinates) == len(names_of_actions_to_distribute) # Now it is time to re-order the actions to distribute, based on their level, if any was set level_by_action = ( @@ -559,7 +559,8 @@ def _distribute_vertically( assert np.isnan(position_by_node[action_end][1]) position_by_node[action_end][1] = y_coordinate - y_coordinate_by_action_name[root_action_begin.action.name] = 0 + for root_action_begin in root_actions_begins: + y_coordinate_by_action_name[root_action_begin.action.name] = 0 return y_coordinate_by_action_name @@ -594,20 +595,32 @@ def _layout( y_coordinate_by_action_name: dict[str, float] = {} if pathway_map.nr_edges() > 0: - root_action_begin = pathway_map.root_node - root_action_end = pathway_map.action_end(root_action_begin) - tipping_point = root_action_end.tipping_point min_tipping_point, max_tipping_point = pathway_map.tipping_point_range() tipping_point_range = max_tipping_point - min_tipping_point assert tipping_point_range >= 0 - x_coordinate = tipping_point - 0.1 * tipping_point_range - add_position(position_by_node, root_action_begin, (x_coordinate, 0)) + root_actions_begins = pathway_map.root_nodes + root_actions_ends = [ + pathway_map.action_end(root_action_begin) + for root_action_begin in root_actions_begins + ] + root_actions_tipping_points = [ + root_action_end.tipping_point for root_action_end in root_actions_ends + ] + x_coordinates = [ + tipping_point - 0.1 * tipping_point_range + for tipping_point in root_actions_tipping_points + ] + + for root_action_begin, x_coordinate in zip(root_actions_begins, x_coordinates): + add_position(position_by_node, root_action_begin, (x_coordinate, 0)) + + for root_action_begin, x_coordinate in zip(root_actions_begins, x_coordinates): + _distribute_horizontally(pathway_map, root_action_begin, position_by_node) - _distribute_horizontally(pathway_map, root_action_begin, position_by_node) y_coordinate_by_action_name = _distribute_vertically( - pathway_map, root_action_begin, position_by_node + pathway_map, root_actions_begins, position_by_node ) if not isinstance(overlapping_lines_spread, tuple): diff --git a/source/package/adaptation_pathways/plot/pathway_map/default.py b/source/package/adaptation_pathways/plot/pathway_map/default.py index 1e19149..64f6543 100644 --- a/source/package/adaptation_pathways/plot/pathway_map/default.py +++ b/source/package/adaptation_pathways/plot/pathway_map/default.py @@ -54,7 +54,7 @@ def _distribute_vertically( # incoming actions ends into account # - Take some minimum distance into account. Move nodes that are too close to each other. - assert isinstance(action_begin, ActionBegin) + assert isinstance(action_begin, ActionBegin), action_begin min_distance = 1.0 nodes = pathway_map.all_action_begins_and_ends(action_begin) @@ -115,11 +115,13 @@ def _layout( position_by_node: dict[Node, np.ndarray] = {} if pathway_map.nr_edges() > 0: - action_begin = pathway_map.root_node - add_position(position_by_node, action_begin, (0, 0)) - _distribute_horizontally(pathway_map, action_begin, position_by_node) - _distribute_vertically(pathway_map, action_begin, position_by_node) + action_begins = pathway_map.root_nodes + + for action_begin in action_begins: + add_position(position_by_node, action_begin, (0, 0)) + _distribute_horizontally(pathway_map, action_begin, position_by_node) + _distribute_vertically(pathway_map, action_begin, position_by_node) return position_by_node From d5de2f6e9c8361437a4fbd19a43a3a66ab30229d Mon Sep 17 00:00:00 2001 From: Kor de Jong Date: Fri, 31 Jan 2025 16:07:44 +0100 Subject: [PATCH 2/2] Separate pathways in pathways map --- .../cli/plot_pathway_map.py | 5 +- .../adaptation_pathways/graph/convert.py | 48 +-- .../plot/pathway_map/classic.py | 81 ++-- .../plot/pathway_map/default.py | 111 ++---- .../ap_test/plot/pathway_map/layout_test.py | 351 +++++++++++------- 5 files changed, 315 insertions(+), 281 deletions(-) diff --git a/source/package/adaptation_pathways/cli/plot_pathway_map.py b/source/package/adaptation_pathways/cli/plot_pathway_map.py index 3a7bdff..cddeef7 100644 --- a/source/package/adaptation_pathways/cli/plot_pathway_map.py +++ b/source/package/adaptation_pathways/cli/plot_pathway_map.py @@ -75,7 +75,7 @@ def main() -> int: Usage: {command} [--title=] [--x_label=<label>] [--show_legend] - [--overshoot] [--spread=<spread>] <basename> <plot> + [--overshoot] [--spread=<spread>] [--spread_root] <basename> <plot> Arguments: basename Either, the name without postfix and extension of text @@ -99,6 +99,7 @@ def main() -> int: about the separation of vertical lines (transitions). Vertical spread is about horizontal lines (actions). [default: 0] + --spread_root Spread the root (current) action as well --title=<title> Title --x_label=<label> Label of x-axis @@ -119,12 +120,14 @@ def main() -> int: show_legend = arguments["--show_legend"] overshoot = arguments["--overshoot"] overlapping_lines_spread: tuple[float, float] = parse_spread(arguments["--spread"]) + spread_root_action = arguments["--spread_root"] plot_arguments: dict[str, typing.Any] = { "title": title, "x_label": x_label, "show_legend": show_legend, "overlapping_lines_spread": overlapping_lines_spread, + "spread_root_action": spread_root_action, } if overshoot: diff --git a/source/package/adaptation_pathways/graph/convert.py b/source/package/adaptation_pathways/graph/convert.py index a1d412c..faccb7d 100644 --- a/source/package/adaptation_pathways/graph/convert.py +++ b/source/package/adaptation_pathways/graph/convert.py @@ -1,6 +1,6 @@ from .node import ActionBegin, ActionEnd, ActionPeriod from .node.action import Action as ActionNode -from .pathway_graph import PathwayGraph +from .pathway_graph import ActionConversion, PathwayGraph from .pathway_map import PathwayMap from .sequence_graph import SequenceGraph @@ -37,36 +37,28 @@ def pathway_graph_to_pathway_map(pathway_graph: PathwayGraph) -> PathwayMap: """ Convert a pathway graph to a pathway map """ - - def visit_graph( - pathway_graph: PathwayGraph, - pathway_map: PathwayMap, - action_period: ActionPeriod, - action_ends: dict[ActionNode, ActionEnd], - ) -> ActionBegin: - begin = ActionBegin(action_period.action) - end = ActionEnd(action_period.action) - - pathway_map.add_period(begin, end) - - for conversion in pathway_graph.to_conversions(action_period): - begin_new = visit_graph( - pathway_graph, - pathway_map, - pathway_graph.to_action_period(conversion), - action_ends, - ) - pathway_map.add_conversion(end, begin_new) - - return begin - pathway_map = PathwayMap() if pathway_graph.nr_nodes() > 0: - action_period = pathway_graph.root_node - action_ends: dict[ActionNode, ActionEnd] = {} - visit_graph(pathway_graph, pathway_map, action_period, action_ends) - + # For each individual path, add a graph to the pathway map + for path in pathway_graph.all_paths(): + action_period = path[0] + begin = ActionBegin(action_period.action) + end = ActionEnd(action_period.action) + pathway_map.add_period(begin, end) + + for action_conversion_idx in range(1, len(path), 2): + action_conversion = path[action_conversion_idx] + action_period = path[action_conversion_idx + 1] + assert isinstance(action_conversion, ActionConversion) + assert isinstance(action_period, ActionPeriod) + + begin = ActionBegin(action_period.action) + pathway_map.add_conversion(end, begin) + end = ActionEnd(action_period.action) + pathway_map.add_period(begin, end) + + # print(pathway_map) return pathway_map diff --git a/source/package/adaptation_pathways/plot/pathway_map/classic.py b/source/package/adaptation_pathways/plot/pathway_map/classic.py index 8269816..47cf819 100644 --- a/source/package/adaptation_pathways/plot/pathway_map/classic.py +++ b/source/package/adaptation_pathways/plot/pathway_map/classic.py @@ -308,6 +308,7 @@ def _spread_vertically( pathway_map: PathwayMap, position_by_node: dict[ActionBegin | ActionEnd, np.ndarray], overlapping_lines_spread: float, + spread_root_action: bool, ) -> None: # - Assign all action_begin / action_end combinations to bins, by y-coordinate @@ -319,19 +320,22 @@ def _spread_vertically( float, list[tuple[alias.Region, tuple[ActionBegin, ActionEnd]]] ] = {} + action_begins_to_skip = pathway_map.root_nodes if not spread_root_action else [] + for action_begin in pathway_map.all_action_begins(): - action_end = pathway_map.action_end(action_begin) + if action_begin not in action_begins_to_skip: + action_end = pathway_map.action_end(action_begin) - x_begin, y_begin = position_by_node[action_begin] - x_end, y_end = position_by_node[action_end] - assert x_end >= x_begin - assert y_end == y_begin - region = x_begin, x_end + x_begin, y_begin = position_by_node[action_begin] + x_end, y_end = position_by_node[action_end] + assert x_end >= x_begin + assert y_end == y_begin + region = x_begin, x_end - if y_begin not in nodes_by_y: - nodes_by_y[y_begin] = [] + if y_begin not in nodes_by_y: + nodes_by_y[y_begin] = [] - nodes_by_y[y_begin].append((region, (action_begin, action_end))) + nodes_by_y[y_begin].append((region, (action_begin, action_end))) min_y = min(nodes_by_y.keys()) max_y = max(nodes_by_y.keys()) @@ -380,6 +384,7 @@ def _spread_horizontally( pathway_map: PathwayMap, position_by_node: dict[ActionBegin | ActionEnd, np.ndarray], overlapping_lines_spread: float, + spread_root_action: bool, ) -> None: # - Assign all action_end / action_begin combinations to bins, by x-coordinate @@ -408,11 +413,13 @@ def _spread_horizontally( nodes_by_x[x_end].append((region, (action_end, action_begin))) min_x = min(nodes_by_x.keys()) - max_x = max(nodes_by_x.keys()) + max_x = max( + position_by_node[action_end][0] for action_end in pathway_map.leaf_nodes() + ) range_x = max_x - min_x - # Root action end. This x coordinate needs no tweaking. - del nodes_by_x[min_x] + if not spread_root_action: + del nodes_by_x[min_x] for x_coordinate, regions in nodes_by_x.items(): grouped_regions = _group_overlapping_regions(regions) @@ -539,28 +546,29 @@ def _distribute_vertically( zip(names_of_actions_to_distribute, y_coordinates) ) + for root_action_begin in root_actions_begins: + y_coordinate_by_action_name[root_action_begin.action.name] = 0 + for action_begin in pathway_map.all_action_begins()[1:]: # Skip root node - action = action_begin.action + if action_begin not in root_actions_begins: + action = action_begin.action - if ( - isinstance(action, ActionCombination) - and action in action_combinations_sieved - ): - # In this case we want the combination to end up at the same y-coordinate as the - # one action that is being continued - action = action_combinations_sieved[action] + if ( + isinstance(action, ActionCombination) + and action in action_combinations_sieved + ): + # In this case we want the combination to end up at the same y-coordinate as the + # one action that is being continued + action = action_combinations_sieved[action] - y_coordinate = y_coordinate_by_action_name[action.name] + y_coordinate = y_coordinate_by_action_name[action.name] - assert np.isnan(position_by_node[action_begin][1]) - position_by_node[action_begin][1] = y_coordinate - action_end = pathway_map.action_end(action_begin) + assert np.isnan(position_by_node[action_begin][1]) + position_by_node[action_begin][1] = y_coordinate + action_end = pathway_map.action_end(action_begin) - assert np.isnan(position_by_node[action_end][1]) - position_by_node[action_end][1] = y_coordinate - - for root_action_begin in root_actions_begins: - y_coordinate_by_action_name[root_action_begin.action.name] = 0 + assert np.isnan(position_by_node[action_end][1]) + position_by_node[action_end][1] = y_coordinate return y_coordinate_by_action_name @@ -569,6 +577,7 @@ def _layout( pathway_map: PathwayMap, *, overlapping_lines_spread: float | tuple[float, float], + spread_root_action: bool = False, ) -> tuple[dict[ActionBegin | ActionEnd, np.ndarray], dict[str, float]]: """ Layout that replicates the pathway map layout of the original (pre-2024) pathway generator @@ -632,9 +641,13 @@ def _layout( horizontal_spread, vertical_spread = overlapping_lines_spread if horizontal_spread > 0: - _spread_horizontally(pathway_map, position_by_node, horizontal_spread) + _spread_horizontally( + pathway_map, position_by_node, horizontal_spread, spread_root_action + ) if vertical_spread > 0: - _spread_vertically(pathway_map, position_by_node, vertical_spread) + _spread_vertically( + pathway_map, position_by_node, vertical_spread, spread_root_action + ) return position_by_node, y_coordinate_by_action_name @@ -657,8 +670,12 @@ def plot( "overlapping_lines_spread", (0, 0) ) + spread_root_action: bool = arguments.get("spread_root_action", False) + layout, y_coordinate_by_action_name = _layout( - pathway_map, overlapping_lines_spread=overlapping_lines_spread + pathway_map, + overlapping_lines_spread=overlapping_lines_spread, + spread_root_action=spread_root_action, ) classic_pathway_map_plotter( diff --git a/source/package/adaptation_pathways/plot/pathway_map/default.py b/source/package/adaptation_pathways/plot/pathway_map/default.py index 64f6543..35cf16a 100644 --- a/source/package/adaptation_pathways/plot/pathway_map/default.py +++ b/source/package/adaptation_pathways/plot/pathway_map/default.py @@ -1,104 +1,54 @@ -import itertools - import matplotlib as mpl import numpy as np from ...graph import PathwayMap -from ...graph.node import ActionBegin, Node +from ...graph.node import Node from ..colour import PlotColours -from ..util import add_position, distribute, plot_graph, sort_horizontally +from ..util import add_position, distribute, plot_graph from .colour import default_colours def _distribute_horizontally( pathway_map: PathwayMap, - action_begin: ActionBegin, position_by_node: dict[Node, np.ndarray], ) -> None: - assert isinstance(action_begin, ActionBegin) - - min_distance = 1.0 - begin_x = position_by_node[action_begin][0] - - action_end = pathway_map.action_end(action_begin) - end_x = begin_x + min_distance - - # Push to the right if necessary - if action_end in position_by_node: - end_x = max(end_x, position_by_node[action_end][0]) + """ + Assign x-coordinates to all action_{begin,end} nodes in the pathway map + """ - add_position(position_by_node, action_end, (end_x, np.nan)) + paths = pathway_map.all_paths() - for action_begin_new in pathway_map.action_begins( - pathway_map.action_end(action_begin) - ): - begin_x = end_x + min_distance + if len(paths) > 0: + min_distance = 1.0 - # Push to the right if necessary - if action_begin_new in position_by_node: - begin_x = max(begin_x, position_by_node[action_begin_new][0]) + for path in paths: + begin_x = 0.0 + end_x = begin_x + min_distance - add_position(position_by_node, action_begin_new, (begin_x, np.nan)) - _distribute_horizontally(pathway_map, action_begin_new, position_by_node) + for action_begin_idx in range(0, len(path), 2): + action_begin, action_end = ( + path[action_begin_idx], + path[action_begin_idx + 1], + ) + add_position(position_by_node, action_begin, (begin_x, np.nan)) + add_position(position_by_node, action_end, (end_x, np.nan)) + begin_x = end_x + min_distance + end_x = begin_x + min_distance def _distribute_vertically( pathway_map: PathwayMap, - action_begin: ActionBegin, position_by_node: dict[Node, np.ndarray], ) -> None: - # Visit *all* action begins / ends in one go, in order of increasing x-coordinate - # - Group action begins / ends by x-coordinate. Within each group: - # - Put each action end to position at the same height as its begin - # - For each action begin to position, take the mean y-coordinate of all - # incoming actions ends into account - # - Take some minimum distance into account. Move nodes that are too close to each other. - - assert isinstance(action_begin, ActionBegin), action_begin - + paths = pathway_map.all_paths() min_distance = 1.0 - nodes = pathway_map.all_action_begins_and_ends(action_begin) - sorted_nodes, _ = sort_horizontally(nodes, position_by_node) + y_coordinates = distribute([0.0] * len(paths), min_distance) - # Iterate over sorted_nodes, grouped by their x-coordinate - for _, grouped_sorted_nodes in itertools.groupby( # type: ignore - sorted_nodes, - lambda action_begin_or_end: position_by_node[action_begin_or_end][0], - ): - nodes = list(grouped_sorted_nodes) + for path_idx, path in enumerate(paths): + y_coordinate = y_coordinates[path_idx] - # Initialize the y-coordinate with the mean of the y-coordinates of the nodes - # that end in each node - for node in nodes: - # Each node is only visited once - assert np.isnan(position_by_node[node][1]) - - # Calculate the mean y-coordinate of actions that end in the to_action - from_nodes = pathway_map.from_nodes(node) - - # Only the root node does not have from_nodes - assert len(from_nodes) > 0 - - y_coordinates = [position_by_node[node][1] for node in from_nodes] - - # All from_nodes must already be positioned - assert all( - not np.isnan(coordinate) for coordinate in y_coordinates - ), from_nodes - - mean_y = sum(position_by_node[node][1] for node in from_nodes) / len( - from_nodes - ) - position_by_node[node][1] = mean_y - - # If the previous loop resulted in same / similar y-coordinates, we spread them - # out some more - y_coordinates = [position_by_node[node][1] for node in nodes] - y_coordinates = distribute(list(y_coordinates), min_distance) - - for idx, node in enumerate(nodes): - assert not np.isnan(position_by_node[node][1]) - position_by_node[node][1] = y_coordinates[idx] + for node in path: + position_by_node[node][1] = y_coordinate def _layout( @@ -115,13 +65,8 @@ def _layout( position_by_node: dict[Node, np.ndarray] = {} if pathway_map.nr_edges() > 0: - - action_begins = pathway_map.root_nodes - - for action_begin in action_begins: - add_position(position_by_node, action_begin, (0, 0)) - _distribute_horizontally(pathway_map, action_begin, position_by_node) - _distribute_vertically(pathway_map, action_begin, position_by_node) + _distribute_horizontally(pathway_map, position_by_node) + _distribute_vertically(pathway_map, position_by_node) return position_by_node diff --git a/source/test/ap_test/plot/pathway_map/layout_test.py b/source/test/ap_test/plot/pathway_map/layout_test.py index 437c19a..ced81b9 100644 --- a/source/test/ap_test/plot/pathway_map/layout_test.py +++ b/source/test/ap_test/plot/pathway_map/layout_test.py @@ -114,14 +114,14 @@ def test_diverging_pathways(self): self.assertEqual(len(paths), 3) positions = default_layout(pathway_map) - self.assertEqual(len(positions), 8) + self.assertEqual(len(positions), 12) self.assert_equal_positions( positions, paths[0], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, 1)), + ("current]", (1, 1)), ("[a", (2, 1)), ("a]", (3, 1)), ], @@ -140,8 +140,8 @@ def test_diverging_pathways(self): positions, paths[2], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, -1)), + ("current]", (1, -1)), ("[c", (2, -1)), ("c]", (3, -1)), ], @@ -173,14 +173,14 @@ def test_converging_pathways(self): self.assertEqual(len(paths), 3) positions = default_layout(pathway_map) - self.assertEqual(len(positions), 14) + self.assertEqual(len(positions), 18) self.assert_equal_positions( positions, paths[0], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, 1)), + ("current]", (1, 1)), ("[a", (2, 1)), ("a]", (3, 1)), ("[d", (4, 1)), @@ -203,8 +203,8 @@ def test_converging_pathways(self): positions, paths[2], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, -1)), + ("current]", (1, -1)), ("[c", (2, -1)), ("c]", (3, -1)), ("[d", (4, -1)), @@ -249,14 +249,14 @@ def test_use_case_01(self): self.assertEqual(len(paths), 4) positions = default_layout(pathway_map) - self.assertEqual(len(positions), 24) + self.assertEqual(len(positions), 30) self.assert_equal_positions( positions, paths[0], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, 1.5)), + ("current]", (1, 1.5)), ("[a", (2, 1.5)), ("a]", (3, 1.5)), ("[e", (4, 1.5)), @@ -267,8 +267,8 @@ def test_use_case_01(self): positions, paths[1], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, 0.5)), + ("current]", (1, 0.5)), ("[b", (2, 0.5)), ("b]", (3, 0.5)), ("[f", (4, 0.5)), @@ -281,8 +281,8 @@ def test_use_case_01(self): positions, paths[2], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, -0.5)), + ("current]", (1, -0.5)), ("[c", (2, -0.5)), ("c]", (3, -0.5)), ("[f", (4, -0.5)), @@ -295,8 +295,8 @@ def test_use_case_01(self): positions, paths[3], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, -1.5)), + ("current]", (1, -1.5)), ("[d", (2, -1.5)), ("d]", (3, -1.5)), ("[f", (4, -1.5)), @@ -310,31 +310,44 @@ def test_use_case_02(self): sequence_graph = SequenceGraph() current = Action("current") a = Action("a") - b1 = Action("b1") - b2 = Action("b2") + b = Action("b") c = Action("c") d = Action("d") current_node = ActionNode(current) - a_node = ActionNode(a) - b1_node = ActionNode(b1) - b2_node = ActionNode(b2) - c_node = ActionNode(c) - d_node = ActionNode(d) + a1_node = ActionNode(a) + a2_node = ActionNode(a) + a3_node = ActionNode(a) + a4_node = ActionNode(a) + a5_node = ActionNode(a) + a6_node = ActionNode(a) + b1_node = ActionNode(b) + b2_node = ActionNode(b) + b3_node = ActionNode(b) + c1_node = ActionNode(c) + c2_node = ActionNode(c) + d1_node = ActionNode(d) + d2_node = ActionNode(d) + d3_node = ActionNode(d) + d4_node = ActionNode(d) sequence_graph.add_sequences( [ - (current_node, a_node), + (current_node, a1_node), (current_node, b1_node), - (current_node, c_node), - (current_node, d_node), - (b1_node, a_node), - (b1_node, c_node), - (b1_node, d_node), - (c_node, b2_node), - (b2_node, a_node), - (c_node, a_node), - (c_node, d_node), + (b1_node, a2_node), + (b1_node, c2_node), + (c2_node, b2_node), + (b2_node, a3_node), + (c2_node, a4_node), + (c2_node, d3_node), + (b1_node, d2_node), + (current_node, c1_node), + (c1_node, b3_node), + (b3_node, a5_node), + (c1_node, a6_node), + (c1_node, d4_node), + (current_node, d1_node), ] ) @@ -342,144 +355,133 @@ def test_use_case_02(self): paths = list(pathway_map.all_paths()) self.assertEqual(len(paths), 10) - pathway_map.assign_tipping_points( - { - current: 2030, - a: 2100, - b1: 2040, - c: 2050, - d: 2100, - b2: 2070, - } - ) - positions = default_layout(pathway_map) - self.assertEqual(len(positions), 32) + self.assertEqual(len(positions), 66) self.assert_equal_positions( positions, paths[0], [ - ("[current", (0, 0)), - ("current]", (1, 0)), - ("[a", (2, 1.5)), - ("a]", (3, 1.5)), + ("[current", (0, 4.5)), + ("current]", (1, 4.5)), + ("[a", (2, 4.5)), + ("a]", (3, 4.5)), ], ) self.assert_equal_positions( positions, paths[1], [ - ("[current", (0, 0)), - ("current]", (1, 0)), - ("[b1", (2, 0.5)), - ("b1]", (3, 0.5)), - ("[a", (4, 2.5)), - ("a]", (5, 2.5)), + ("[current", (0, 3.5)), + ("current]", (1, 3.5)), + ("[b", (2, 3.5)), + ("b]", (3, 3.5)), + ("[a", (4, 3.5)), + ("a]", (5, 3.5)), ], ) self.assert_equal_positions( positions, paths[2], [ - ("[current", (0, 0)), - ("current]", (1, 0)), - ("[b1", (2, 0.5)), - ("b1]", (3, 0.5)), - ("[c", (4, 1.5)), - ("c]", (5, 1.5)), - ("[b2", (6, 2)), - ("b2]", (7, 2)), - ("[a", (8, 2)), - ("a]", (9, 2)), + ("[current", (0, 2.5)), + ("current]", (1, 2.5)), + ("[b", (2, 2.5)), + ("b]", (3, 2.5)), + ("[c", (4, 2.5)), + ("c]", (5, 2.5)), + ("[b", (6, 2.5)), + ("b]", (7, 2.5)), + ("[a", (8, 2.5)), + ("a]", (9, 2.5)), ], ) self.assert_equal_positions( positions, paths[3], [ - ("[current", (0, 0)), - ("current]", (1, 0)), - ("[b1", (2, 0.5)), - ("b1]", (3, 0.5)), + ("[current", (0, 1.5)), + ("current]", (1, 1.5)), + ("[b", (2, 1.5)), + ("b]", (3, 1.5)), ("[c", (4, 1.5)), ("c]", (5, 1.5)), - ("[a", (6, 1)), - ("a]", (7, 1)), + ("[a", (6, 1.5)), + ("a]", (7, 1.5)), ], ) self.assert_equal_positions( positions, paths[4], [ - ("[current", (0, 0)), - ("current]", (1, 0)), - ("[b1", (2, 0.5)), - ("b1]", (3, 0.5)), - ("[c", (4, 1.5)), - ("c]", (5, 1.5)), - ("[d", (6, 0)), - ("d]", (7, 0)), + ("[current", (0, 0.5)), + ("current]", (1, 0.5)), + ("[b", (2, 0.5)), + ("b]", (3, 0.5)), + ("[c", (4, 0.5)), + ("c]", (5, 0.5)), + ("[d", (6, 0.5)), + ("d]", (7, 0.5)), ], ) self.assert_equal_positions( positions, paths[5], [ - ("[current", (0, 0)), - ("current]", (1, 0)), - ("[b1", (2, 0.5)), - ("b1]", (3, 0.5)), - ("[d", (4, 0.5)), - ("d]", (5, 0.5)), + ("[current", (0, -0.5)), + ("current]", (1, -0.5)), + ("[b", (2, -0.5)), + ("b]", (3, -0.5)), + ("[d", (4, -0.5)), + ("d]", (5, -0.5)), ], ) self.assert_equal_positions( positions, paths[6], [ - ("[current", (0, 0)), - ("current]", (1, 0)), - ("[c", (2, -0.5)), - ("c]", (3, -0.5)), - ("[b2", (4, -0.5)), - ("b2]", (5, -0.5)), - ("[a", (6, -1)), - ("a]", (7, -1)), + ("[current", (0, -1.5)), + ("current]", (1, -1.5)), + ("[c", (2, -1.5)), + ("c]", (3, -1.5)), + ("[b", (4, -1.5)), + ("b]", (5, -1.5)), + ("[a", (6, -1.5)), + ("a]", (7, -1.5)), ], ) self.assert_equal_positions( positions, paths[7], [ - ("[current", (0, 0)), - ("current]", (1, 0)), - ("[c", (2, -0.5)), - ("c]", (3, -0.5)), - ("[a", (4, -1.5)), - ("a]", (5, -1.5)), + ("[current", (0, -2.5)), + ("current]", (1, -2.5)), + ("[c", (2, -2.5)), + ("c]", (3, -2.5)), + ("[a", (4, -2.5)), + ("a]", (5, -2.5)), ], ) self.assert_equal_positions( positions, paths[8], [ - ("[current", (0, 0)), - ("current]", (1, 0)), - ("[c", (2, -0.5)), - ("c]", (3, -0.5)), - ("[d", (4, -2.5)), - ("d]", (5, -2.5)), + ("[current", (0, -3.5)), + ("current]", (1, -3.5)), + ("[c", (2, -3.5)), + ("c]", (3, -3.5)), + ("[d", (4, -3.5)), + ("d]", (5, -3.5)), ], ) self.assert_equal_positions( positions, paths[9], [ - ("[current", (0, 0)), - ("current]", (1, 0)), - ("[d", (2, -1.5)), - ("d]", (3, -1.5)), + ("[current", (0, -4.5)), + ("current]", (1, -4.5)), + ("[d", (2, -4.5)), + ("d]", (3, -4.5)), ], ) @@ -511,14 +513,14 @@ def test_action_combination01(self): self.assertEqual(len(paths), 3) positions = default_layout(pathway_map) - self.assertEqual(len(positions), 10) + self.assertEqual(len(positions), 14) self.assert_equal_positions( positions, paths[0], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, 1)), + ("current]", (1, 1)), ("[a", (2, 1)), ("a]", (3, 1)), ("[d", (4, 1)), @@ -539,8 +541,8 @@ def test_action_combination01(self): positions, paths[2], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, -1)), + ("current]", (1, -1)), ("[c", (2, -1)), ("c]", (3, -1)), ], @@ -575,14 +577,14 @@ def test_action_combination02(self): self.assertEqual(len(paths), 3) positions = default_layout(pathway_map) - self.assertEqual(len(positions), 12) + self.assertEqual(len(positions), 16) self.assert_equal_positions( positions, paths[0], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, 1)), + ("current]", (1, 1)), ("[a", (2, 1)), ("a]", (3, 1)), ("[d", (4, 1)), @@ -605,8 +607,8 @@ def test_action_combination02(self): positions, paths[2], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, -1)), + ("current]", (1, -1)), ("[c", (2, -1)), ("c]", (3, -1)), ], @@ -637,14 +639,14 @@ def test_action_edition_01(self): self.assertEqual(len(paths), 2) positions = default_layout(pathway_map) - self.assertEqual(len(positions), 8) + self.assertEqual(len(positions), 10) self.assert_equal_positions( positions, paths[0], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, 0.5)), + ("current]", (1, 0.5)), ("[a", (2, 0.5)), ("a]", (3, 0.5)), ], @@ -653,8 +655,8 @@ def test_action_edition_01(self): positions, paths[1], [ - ("[current", (0, 0)), - ("current]", (1, 0)), + ("[current", (0, -0.5)), + ("current]", (1, -0.5)), ("[b", (2, -0.5)), ("b]", (3, -0.5)), ("[a", (4, -0.5)), @@ -784,7 +786,7 @@ def test_diverging_pathways(self): ) positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) - self.assertEqual(len(positions), 8) + self.assertEqual(len(positions), 12) self.assert_equal_positions( positions, @@ -817,6 +819,81 @@ def test_diverging_pathways(self): ], ) + def test_converging_pathways(self): + sequence_graph = SequenceGraph() + current = Action("current") + a = Action("a") + b = Action("b") + c = Action("c") + d = Action("d") + + current_node = ActionNode(current) + a_node = ActionNode(a) + b_node = ActionNode(b) + c_node = ActionNode(c) + d_node = ActionNode(d) + + sequence_graph.add_sequence(current_node, a_node) + sequence_graph.add_sequence(current_node, b_node) + sequence_graph.add_sequence(current_node, c_node) + sequence_graph.add_sequence(a_node, d_node) + sequence_graph.add_sequence(b_node, d_node) + sequence_graph.add_sequence(c_node, d_node) + + pathway_map = sequence_graph_to_pathway_map(sequence_graph) + paths = list(pathway_map.all_paths()) + self.assertEqual(len(paths), 3) + + pathway_map.assign_tipping_points( + { + current: 2030, + a: 2040, + b: 2050, + c: 2060, + d: 2070, + } + ) + + positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) + self.assertEqual(len(positions), 18) + + self.assert_equal_positions( + positions, + paths[0], + [ + ("[current", (2026, 0)), + ("current]", (2030, 0)), + ("[a", (2030, 2)), + ("a]", (2040, 2)), + ("[d", (2040, -2)), + ("d]", (2070, -2)), + ], + ) + self.assert_equal_positions( + positions, + paths[1], + [ + ("[current", (2026, 0)), + ("current]", (2030, 0)), + ("[b", (2030, 1)), + ("b]", (2050, 1)), + ("[d", (2050, -2)), + ("d]", (2070, -2)), + ], + ) + self.assert_equal_positions( + positions, + paths[2], + [ + ("[current", (2026, 0)), + ("current]", (2030, 0)), + ("[c", (2030, -1)), + ("c]", (2060, -1)), + ("[d", (2060, -2)), + ("d]", (2070, -2)), + ], + ) + def test_use_case_01(self): actions = """ current #ff4c566a @@ -846,7 +923,7 @@ def test_use_case_01(self): self.assertEqual(len(paths), 4) positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) - self.assertEqual(len(positions), 24) + self.assertEqual(len(positions), 30) self.assert_equal_positions( positions, @@ -938,7 +1015,7 @@ def test_use_case_02(self): self.assertEqual(len(paths), 10) positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) - self.assertEqual(len(positions), 32) + self.assertEqual(len(positions), 66) self.assert_equal_positions( positions, @@ -1104,7 +1181,7 @@ def test_action_combination01(self): } ) positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) - self.assertEqual(len(positions), 10) + self.assertEqual(len(positions), 14) self.assert_equal_positions( positions, @@ -1160,7 +1237,7 @@ def test_action_combination02(self): self.assertEqual(len(paths), 3) positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) - self.assertEqual(len(positions), 12) + self.assertEqual(len(positions), 16) self.assert_equal_positions( positions, @@ -1235,7 +1312,7 @@ def test_action_combination03(self): } ) positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) - self.assertEqual(len(positions), 12) + self.assertEqual(len(positions), 16) self.assert_equal_positions( positions, @@ -1310,7 +1387,7 @@ def test_action_combination04(self): } ) positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) - self.assertEqual(len(positions), 10) + self.assertEqual(len(positions), 14) self.assert_equal_positions( positions, @@ -1366,7 +1443,7 @@ def test_action_combination05(self): self.assertEqual(len(paths), 3) positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) - self.assertEqual(len(positions), 12) + self.assertEqual(len(positions), 16) self.assert_equal_positions( positions, @@ -1423,7 +1500,7 @@ def test_action_combination06(self): self.assertEqual(len(paths), 3) positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) - self.assertEqual(len(positions), 10) + self.assertEqual(len(positions), 14) self.assert_equal_positions( positions, @@ -1491,7 +1568,7 @@ def test_action_edition_01(self): } ) positions, _ = classic_layout(pathway_map, overlapping_lines_spread=0) - self.assertEqual(len(positions), 8) + self.assertEqual(len(positions), 10) self.assert_equal_positions( positions,