Skip to content

Commit

Permalink
Merge pull request #65 from geoneric/gh56
Browse files Browse the repository at this point in the history
Fix horizontal spread when multiple actions continue from a single tipping point
  • Loading branch information
kordejong authored Jan 31, 2025
2 parents 69357ca + d5de2f6 commit 1e94c36
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 414 deletions.
5 changes: 4 additions & 1 deletion source/package/adaptation_pathways/cli/plot_pathway_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def main() -> int:
Usage:
{command} [--title=<title>] [--x_label=<label>] [--show_legend]
[--overshoot] [--spread=<spread>] <basename> <plot>
[--overshoot] [--spread=<spread>] [--spread_root] <basename> <plot>
Arguments:
basename Either, the name without postfix and extension of text
Expand All @@ -99,6 +99,7 @@ def main() -> int:
about the separation of vertical lines (transitions).
Vertical spread is about horizontal lines (actions).
[default: 0]
--spread_root Spread the root (current) action as well
--title=<title> Title
--x_label=<label> Label of x-axis
Expand All @@ -119,12 +120,14 @@ def main() -> int:
show_legend = arguments["--show_legend"]
overshoot = arguments["--overshoot"]
overlapping_lines_spread: tuple[float, float] = parse_spread(arguments["--spread"])
spread_root_action = arguments["--spread_root"]

plot_arguments: dict[str, typing.Any] = {
"title": title,
"x_label": x_label,
"show_legend": show_legend,
"overlapping_lines_spread": overlapping_lines_spread,
"spread_root_action": spread_root_action,
}

if overshoot:
Expand Down
48 changes: 20 additions & 28 deletions source/package/adaptation_pathways/graph/convert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .node import ActionBegin, ActionEnd, ActionPeriod
from .node.action import Action as ActionNode
from .pathway_graph import PathwayGraph
from .pathway_graph import ActionConversion, PathwayGraph
from .pathway_map import PathwayMap
from .sequence_graph import SequenceGraph

Expand Down Expand Up @@ -37,36 +37,28 @@ def pathway_graph_to_pathway_map(pathway_graph: PathwayGraph) -> PathwayMap:
"""
Convert a pathway graph to a pathway map
"""

def visit_graph(
pathway_graph: PathwayGraph,
pathway_map: PathwayMap,
action_period: ActionPeriod,
action_ends: dict[ActionNode, ActionEnd],
) -> ActionBegin:
begin = ActionBegin(action_period.action)
end = ActionEnd(action_period.action)

pathway_map.add_period(begin, end)

for conversion in pathway_graph.to_conversions(action_period):
begin_new = visit_graph(
pathway_graph,
pathway_map,
pathway_graph.to_action_period(conversion),
action_ends,
)
pathway_map.add_conversion(end, begin_new)

return begin

pathway_map = PathwayMap()

if pathway_graph.nr_nodes() > 0:
action_period = pathway_graph.root_node
action_ends: dict[ActionNode, ActionEnd] = {}
visit_graph(pathway_graph, pathway_map, action_period, action_ends)

# For each individual path, add a graph to the pathway map
for path in pathway_graph.all_paths():
action_period = path[0]
begin = ActionBegin(action_period.action)
end = ActionEnd(action_period.action)
pathway_map.add_period(begin, end)

for action_conversion_idx in range(1, len(path), 2):
action_conversion = path[action_conversion_idx]
action_period = path[action_conversion_idx + 1]
assert isinstance(action_conversion, ActionConversion)
assert isinstance(action_period, ActionPeriod)

begin = ActionBegin(action_period.action)
pathway_map.add_conversion(end, begin)
end = ActionEnd(action_period.action)
pathway_map.add_period(begin, end)

# print(pathway_map)
return pathway_map


Expand Down
103 changes: 103 additions & 0 deletions source/package/adaptation_pathways/graph/directed_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import typing

import networkx as nx


class DirectedGraph:
"""
Base class for specialized directed graphs.
"""

_graph: nx.DiGraph

def __init__(self) -> None:
self._graph = nx.DiGraph()

def __str__(self) -> str:
return "\n".join(nx.generate_network_text(self._graph))

@property
def graph(self) -> nx.DiGraph:
"""
:return: The layered directed graph instance
Try not to use it -- it should be an implementation detail as much as possible.
"""
return self._graph

# def is_empty(self) -> bool:
# """
# :return: Whether or not the tree is empty
# """
# return nx.is_empty(self._graph)

def nr_nodes(self) -> int:
"""
:return: Number of nodes
"""
return len(self._graph.nodes)

def nr_edges(self) -> int:
"""
:return: Number of edges
"""
return len(self._graph.edges)

def all_to_nodes(self, from_node):
# Use shortest_path to find all nodes reachable from the node passed in
graph = self._graph.subgraph(nx.shortest_path(self._graph, from_node))

# Remove the from_node itself before returning the result
result = list(graph.nodes)
result.remove(from_node)

return result

def to_nodes(self, from_node) -> list[typing.Any]:
"""
:return: Collection of nodes that start at the node passed in
"""
return list(self._graph.adj[from_node])

def from_nodes(self, to_node):
"""
:return: Collection of nodes that end at the node passed in
"""
# TODO Can this be done more efficiently?
return [node for node in self._graph.nodes() if to_node in self.to_nodes(node)]

def leaf_nodes(self) -> typing.Iterable[typing.Any]:
"""
:Return: Iterable for iterating over all leaf nodes
"""
return [
node
for node in self._graph.nodes()
if self._graph.in_degree(node) != 0 and self._graph.out_degree(node) == 0
]

def all_paths(self) -> list[list[typing.Any]]:
result = []

if self.nr_nodes() > 0:
graph = self._graph

root_nodes = [
node for node, degree in self._graph.in_degree() if degree == 0
]
leaf_nodes = self.leaf_nodes()

for root_node in root_nodes:
cutoff = None
result += nx.all_simple_paths(graph, root_node, leaf_nodes, cutoff)

return result

def set_attribute(self, name: str, value: typing.Any) -> None:
"""
Add / update attribute value to / of the graph
:param name: Name of attribute to set
:param value: Value of the attribute to set
"""
self.graph.graph[name] = value
22 changes: 22 additions & 0 deletions source/package/adaptation_pathways/graph/multi_rooted_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from .directed_graph import DirectedGraph


class MultiRootedGraph(DirectedGraph):
"""
Class for directed out-graphs in which there can be multiple graphs rooted at independent nodes.
"""

@property
def root_nodes(self):
"""
:return: The root nodes
"""

root_nodes = [node for node, degree in self._graph.in_degree() if degree == 0]

nr_root_nodes = len(root_nodes)

if nr_root_nodes == 0:
raise LookupError("Graph is empty")

return root_nodes
51 changes: 33 additions & 18 deletions source/package/adaptation_pathways/graph/pathway_map.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from ..action import Action
from ..action_combination import ActionCombination
from .multi_rooted_graph import MultiRootedGraph
from .node import ActionBegin, ActionEnd, TippingPoint
from .rooted_graph import RootedGraph


class PathwayMap(RootedGraph):
class PathwayMap(MultiRootedGraph):
"""
A PathwayMap represents a collection of adaptation pathways. These pathways are encoded
in a directed rooted graph in which the nodes represent...
Expand Down Expand Up @@ -37,24 +37,28 @@ def all_action_begins_and_ends(
return self.all_to_nodes(begin)

def all_action_begins(self) -> list[ActionBegin]:
assert isinstance(self.root_node, ActionBegin)
result = []

result = [self.root_node]
for root_node in self.root_nodes:
assert isinstance(root_node, ActionBegin)

for node in self.all_to_nodes(self.root_node):
if isinstance(node, ActionBegin):
result.append(node)
result.append(root_node)

for node in self.all_to_nodes(root_node):
if isinstance(node, ActionBegin):
result.append(node)

return result

def all_action_ends(self) -> list[ActionEnd]:
assert isinstance(self.root_node, ActionBegin)

result = []

for node in self.all_to_nodes(self.root_node):
if isinstance(node, ActionEnd):
result.append(node)
for root_node in self.root_nodes:
assert isinstance(root_node, ActionBegin)

for node in self.all_to_nodes(root_node):
if isinstance(node, ActionEnd):
result.append(node)

return result

Expand Down Expand Up @@ -135,20 +139,31 @@ def tipping_points(self) -> list[TippingPoint]:
return list(dict.fromkeys(result))

def tipping_point_range(self) -> tuple[TippingPoint, TippingPoint]:
result: tuple[TippingPoint, TippingPoint] = (0, 0)
min_tipping_point = 0.0
max_tipping_point = 0.0

if self.nr_nodes() > 0:
min_tipping_point = self.action_end(self.root_node).tipping_point
root_nodes = self.root_nodes

# Initialize min tipping point to a value in the actual range of the tipping points
min_tipping_point = self.action_end(root_nodes[0]).tipping_point

for root_node in root_nodes[1:]:
min_tipping_point = min(
min_tipping_point, self.action_end(root_node).tipping_point
)

# Initialize max tipping point to a value in the actual range of the tipping points
max_tipping_point = min_tipping_point

for end_node in self.leaf_nodes():
max_tipping_point = max(max_tipping_point, end_node.tipping_point)

result = (min_tipping_point, max_tipping_point)

assert result[0] <= result[1], result
assert (
min_tipping_point <= max_tipping_point
), f"{min_tipping_point} < {max_tipping_point}"

return result
return min_tipping_point, max_tipping_point

# def set_node_attribute(self, name: str, value: dict[Action, typing.Any]) -> None:
# """
Expand Down
Loading

0 comments on commit 1e94c36

Please sign in to comment.