Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix horizontal spread when multiple actions continue from a single tipping point #65

Merged
merged 2 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion source/package/adaptation_pathways/cli/plot_pathway_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def main() -> int:

Usage:
{command} [--title=<title>] [--x_label=<label>] [--show_legend]
[--overshoot] [--spread=<spread>] <basename> <plot>
[--overshoot] [--spread=<spread>] [--spread_root] <basename> <plot>

Arguments:
basename Either, the name without postfix and extension of text
Expand All @@ -99,6 +99,7 @@ def main() -> int:
about the separation of vertical lines (transitions).
Vertical spread is about horizontal lines (actions).
[default: 0]
--spread_root Spread the root (current) action as well
--title=<title> Title
--x_label=<label> Label of x-axis

Expand All @@ -119,12 +120,14 @@ def main() -> int:
show_legend = arguments["--show_legend"]
overshoot = arguments["--overshoot"]
overlapping_lines_spread: tuple[float, float] = parse_spread(arguments["--spread"])
spread_root_action = arguments["--spread_root"]

plot_arguments: dict[str, typing.Any] = {
"title": title,
"x_label": x_label,
"show_legend": show_legend,
"overlapping_lines_spread": overlapping_lines_spread,
"spread_root_action": spread_root_action,
}

if overshoot:
Expand Down
48 changes: 20 additions & 28 deletions source/package/adaptation_pathways/graph/convert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .node import ActionBegin, ActionEnd, ActionPeriod
from .node.action import Action as ActionNode
from .pathway_graph import PathwayGraph
from .pathway_graph import ActionConversion, PathwayGraph
from .pathway_map import PathwayMap
from .sequence_graph import SequenceGraph

Expand Down Expand Up @@ -37,36 +37,28 @@ def pathway_graph_to_pathway_map(pathway_graph: PathwayGraph) -> PathwayMap:
"""
Convert a pathway graph to a pathway map
"""

def visit_graph(
pathway_graph: PathwayGraph,
pathway_map: PathwayMap,
action_period: ActionPeriod,
action_ends: dict[ActionNode, ActionEnd],
) -> ActionBegin:
begin = ActionBegin(action_period.action)
end = ActionEnd(action_period.action)

pathway_map.add_period(begin, end)

for conversion in pathway_graph.to_conversions(action_period):
begin_new = visit_graph(
pathway_graph,
pathway_map,
pathway_graph.to_action_period(conversion),
action_ends,
)
pathway_map.add_conversion(end, begin_new)

return begin

pathway_map = PathwayMap()

if pathway_graph.nr_nodes() > 0:
action_period = pathway_graph.root_node
action_ends: dict[ActionNode, ActionEnd] = {}
visit_graph(pathway_graph, pathway_map, action_period, action_ends)

# For each individual path, add a graph to the pathway map
for path in pathway_graph.all_paths():
action_period = path[0]
begin = ActionBegin(action_period.action)
end = ActionEnd(action_period.action)
pathway_map.add_period(begin, end)

for action_conversion_idx in range(1, len(path), 2):
action_conversion = path[action_conversion_idx]
action_period = path[action_conversion_idx + 1]
assert isinstance(action_conversion, ActionConversion)
assert isinstance(action_period, ActionPeriod)

begin = ActionBegin(action_period.action)
pathway_map.add_conversion(end, begin)
end = ActionEnd(action_period.action)
pathway_map.add_period(begin, end)

# print(pathway_map)
return pathway_map


Expand Down
103 changes: 103 additions & 0 deletions source/package/adaptation_pathways/graph/directed_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import typing

import networkx as nx


class DirectedGraph:
"""
Base class for specialized directed graphs.
"""

_graph: nx.DiGraph

def __init__(self) -> None:
self._graph = nx.DiGraph()

def __str__(self) -> str:
return "\n".join(nx.generate_network_text(self._graph))

@property
def graph(self) -> nx.DiGraph:
"""
:return: The layered directed graph instance

Try not to use it -- it should be an implementation detail as much as possible.
"""
return self._graph

# def is_empty(self) -> bool:
# """
# :return: Whether or not the tree is empty
# """
# return nx.is_empty(self._graph)

def nr_nodes(self) -> int:
"""
:return: Number of nodes
"""
return len(self._graph.nodes)

def nr_edges(self) -> int:
"""
:return: Number of edges
"""
return len(self._graph.edges)

def all_to_nodes(self, from_node):
# Use shortest_path to find all nodes reachable from the node passed in
graph = self._graph.subgraph(nx.shortest_path(self._graph, from_node))

# Remove the from_node itself before returning the result
result = list(graph.nodes)
result.remove(from_node)

return result

def to_nodes(self, from_node) -> list[typing.Any]:
"""
:return: Collection of nodes that start at the node passed in
"""
return list(self._graph.adj[from_node])

def from_nodes(self, to_node):
"""
:return: Collection of nodes that end at the node passed in
"""
# TODO Can this be done more efficiently?
return [node for node in self._graph.nodes() if to_node in self.to_nodes(node)]

def leaf_nodes(self) -> typing.Iterable[typing.Any]:
"""
:Return: Iterable for iterating over all leaf nodes
"""
return [
node
for node in self._graph.nodes()
if self._graph.in_degree(node) != 0 and self._graph.out_degree(node) == 0
]

def all_paths(self) -> list[list[typing.Any]]:
result = []

if self.nr_nodes() > 0:
graph = self._graph

root_nodes = [
node for node, degree in self._graph.in_degree() if degree == 0
]
leaf_nodes = self.leaf_nodes()

for root_node in root_nodes:
cutoff = None
result += nx.all_simple_paths(graph, root_node, leaf_nodes, cutoff)

return result

def set_attribute(self, name: str, value: typing.Any) -> None:
"""
Add / update attribute value to / of the graph

:param name: Name of attribute to set
:param value: Value of the attribute to set
"""
self.graph.graph[name] = value
22 changes: 22 additions & 0 deletions source/package/adaptation_pathways/graph/multi_rooted_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from .directed_graph import DirectedGraph


class MultiRootedGraph(DirectedGraph):
"""
Class for directed out-graphs in which there can be multiple graphs rooted at independent nodes.
"""

@property
def root_nodes(self):
"""
:return: The root nodes
"""

root_nodes = [node for node, degree in self._graph.in_degree() if degree == 0]

nr_root_nodes = len(root_nodes)

if nr_root_nodes == 0:
raise LookupError("Graph is empty")

return root_nodes
51 changes: 33 additions & 18 deletions source/package/adaptation_pathways/graph/pathway_map.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from ..action import Action
from ..action_combination import ActionCombination
from .multi_rooted_graph import MultiRootedGraph
from .node import ActionBegin, ActionEnd, TippingPoint
from .rooted_graph import RootedGraph


class PathwayMap(RootedGraph):
class PathwayMap(MultiRootedGraph):
"""
A PathwayMap represents a collection of adaptation pathways. These pathways are encoded
in a directed rooted graph in which the nodes represent...
Expand Down Expand Up @@ -37,24 +37,28 @@ def all_action_begins_and_ends(
return self.all_to_nodes(begin)

def all_action_begins(self) -> list[ActionBegin]:
assert isinstance(self.root_node, ActionBegin)
result = []

result = [self.root_node]
for root_node in self.root_nodes:
assert isinstance(root_node, ActionBegin)

for node in self.all_to_nodes(self.root_node):
if isinstance(node, ActionBegin):
result.append(node)
result.append(root_node)

for node in self.all_to_nodes(root_node):
if isinstance(node, ActionBegin):
result.append(node)

return result

def all_action_ends(self) -> list[ActionEnd]:
assert isinstance(self.root_node, ActionBegin)

result = []

for node in self.all_to_nodes(self.root_node):
if isinstance(node, ActionEnd):
result.append(node)
for root_node in self.root_nodes:
assert isinstance(root_node, ActionBegin)

for node in self.all_to_nodes(root_node):
if isinstance(node, ActionEnd):
result.append(node)

return result

Expand Down Expand Up @@ -135,20 +139,31 @@ def tipping_points(self) -> list[TippingPoint]:
return list(dict.fromkeys(result))

def tipping_point_range(self) -> tuple[TippingPoint, TippingPoint]:
result: tuple[TippingPoint, TippingPoint] = (0, 0)
min_tipping_point = 0.0
max_tipping_point = 0.0

if self.nr_nodes() > 0:
min_tipping_point = self.action_end(self.root_node).tipping_point
root_nodes = self.root_nodes

# Initialize min tipping point to a value in the actual range of the tipping points
min_tipping_point = self.action_end(root_nodes[0]).tipping_point

for root_node in root_nodes[1:]:
min_tipping_point = min(
min_tipping_point, self.action_end(root_node).tipping_point
)

# Initialize max tipping point to a value in the actual range of the tipping points
max_tipping_point = min_tipping_point

for end_node in self.leaf_nodes():
max_tipping_point = max(max_tipping_point, end_node.tipping_point)

result = (min_tipping_point, max_tipping_point)

assert result[0] <= result[1], result
assert (
min_tipping_point <= max_tipping_point
), f"{min_tipping_point} < {max_tipping_point}"

return result
return min_tipping_point, max_tipping_point

# def set_node_attribute(self, name: str, value: dict[Action, typing.Any]) -> None:
# """
Expand Down
Loading