diff --git a/internal/pkg/graph/graph.go b/internal/pkg/graph/graph.go index 9f94c3ebcdd..b610129a59d 100644 --- a/internal/pkg/graph/graph.go +++ b/internal/pkg/graph/graph.go @@ -4,6 +4,11 @@ // Package graph provides functionality for directed graphs. package graph +import ( + "context" + "sync" +) + // vertexStatus denotes the visiting status of a vertex when running DFS in a graph. type vertexStatus int @@ -17,6 +22,8 @@ const ( type Graph[V comparable] struct { vertices map[V]neighbors[V] // Adjacency list for each vertex. inDegrees map[V]int // Number of incoming edges for each vertex. + status map[V]string // status of each vertex. + lock sync.Mutex // lock used to mutate graph data. } // Edge represents one edge of a directed graph. @@ -28,17 +35,34 @@ type Edge[V comparable] struct { type neighbors[V comparable] map[V]bool // New initiates a new Graph. -func New[V comparable](vertices ...V) *Graph[V] { +func New[V comparable](vertices []V, opts ...GraphOption[V]) *Graph[V] { adj := make(map[V]neighbors[V]) inDegrees := make(map[V]int) for _, vertex := range vertices { adj[vertex] = make(neighbors[V]) inDegrees[vertex] = 0 } - return &Graph[V]{ + g := &Graph[V]{ vertices: adj, inDegrees: inDegrees, } + for _, opt := range opts { + opt(g) + } + return g +} + +// GraphOption allows you to initialize Graph with additional properties. +type GraphOption[V comparable] func(g *Graph[V]) + +// WithStatus sets the status of each vertex in the Graph. +func WithStatus[V comparable](status string) func(g *Graph[V]) { + return func(g *Graph[V]) { + g.status = make(map[V]string) + for vertex := range g.vertices { + g.status[vertex] = status + } + } } // Neighbors returns the list of connected vertices from vtx. @@ -135,6 +159,77 @@ func (g *Graph[V]) Roots() []V { return roots } +// updateStatus updates the status of a vertex. +func (g *Graph[V]) updateStatus(vertex V, status string) { + g.lock.Lock() + defer g.lock.Unlock() + g.status[vertex] = status +} + +// getStatus gets the status of a vertex. +func (g *Graph[V]) getStatus(vertex V) string { + g.lock.Lock() + defer g.lock.Unlock() + return g.status[vertex] +} + +// getLeaves returns the leaves of a given vertex. +func (g *Graph[V]) leaves() []V { + g.lock.Lock() + defer g.lock.Unlock() + var leaves []V + for vtx := range g.vertices { + if len(g.vertices[vtx]) == 0 { + leaves = append(leaves, vtx) + } + } + return leaves +} + +// getParents returns the parent vertices (incoming edges) of vertex. +func (g *Graph[V]) parents(vtx V) []V { + g.lock.Lock() + defer g.lock.Unlock() + var parents []V + for v, neighbors := range g.vertices { + if neighbors[vtx] { + parents = append(parents, v) + } + } + return parents +} + +// getChildren returns the child vertices (outgoing edges) of vertex. +func (g *Graph[V]) children(vtx V) []V { + g.lock.Lock() + defer g.lock.Unlock() + return g.Neighbors(vtx) +} + +// filterParents filters parents based on the vertex status. +func (g *Graph[V]) filterParents(vtx V, status string) []V { + parents := g.parents(vtx) + var filtered []V + for _, parent := range parents { + if g.getStatus(parent) == status { + filtered = append(filtered, parent) + } + } + return filtered +} + +// filterChildren filters children based on the vertex status. +func (g *Graph[V]) filterChildren(vtx V, status string) []V { + children := g.children(vtx) + var filtered []V + for _, child := range children { + if g.getStatus(child) == status { + filtered = append(filtered, child) + } + } + return filtered +} + func (g *Graph[V]) hasCycles(temp *findCycleTempVars[V], currVertex V) bool { temp.status[currVertex] = visiting for vertex := range g.vertices[currVertex] { @@ -196,12 +291,13 @@ func (alg *TopologicalSorter[V]) traverse(g *Graph[V]) { // // An example graph and their ranks is shown below to illustrate: // . -//├── a rank: 0 -//│ ├── c rank: 1 -//│ │ └── f rank: 2 -//│ └── d rank: 1 -//└── b rank: 0 -// └── e rank: 1 +// ├── a rank: 0 +// │ ├── c rank: 1 +// │ │ └── f rank: 2 +// │ └── d rank: 1 +// └── b rank: 0 +// +// └── e rank: 1 func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], error) { if vertices, isAcyclic := digraph.IsAcyclic(); !isAcyclic { return nil, &errCycle[V]{ @@ -215,3 +311,134 @@ func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], e topo.traverse(digraph) return topo, nil } + +/* +UpwardTraversal performs an upward traversal on the graph starting from leaves (nodes with no children) +and moving towards root nodes (nodes with children). +It applies the specified process function to each vertex in the graph, skipping vertices with the +"adjacentVertexSkipStatus" status, and continuing traversal until reaching vertices with the "requiredVertexStatus" status. +The traversal is concurrent and may process vertices in parallel. +Returns an error if the traversal encounters any issues, or nil if successful. +*/ +func (g *Graph[V]) UpwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error { + traversal := &graphTraversal[V]{ + mu: sync.Mutex{}, + seen: make(map[V]struct{}), + findBoundaryVertices: func(g *Graph[V]) []V { return g.leaves() }, + findAdjacentVertices: func(g *Graph[V], v V) []V { return g.parents(v) }, + filterVerticesByStatus: func(g *Graph[V], v V, status string) []V { return g.filterChildren(v, status) }, + requiredVertexStatus: requiredVertexStatus, + adjacentVertexSkipStatus: adjacentVertexSkipStatus, + processVertex: processVertexFunc, + } + return traversal.execute(ctx, g) +} + +/* +DownwardTraversal performs a downward traversal on the graph starting from root nodes (nodes with no parents) +and moving towards leaf nodes (nodes with parents). It applies the specified process function to each +vertex in the graph, skipping vertices with the "adjacentVertexSkipStatus" status, and continuing traversal +until reaching vertices with the "requiredVertexStatus" status. +The traversal is concurrent and may process vertices in parallel. +Returns an error if the traversal encounters any issues. +*/ +func (g *Graph[V]) DownwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error { + traversal := &graphTraversal[V]{ + mu: sync.Mutex{}, + seen: make(map[V]struct{}), + findBoundaryVertices: func(g *Graph[V]) []V { return g.Roots() }, + findAdjacentVertices: func(g *Graph[V], v V) []V { return g.children(v) }, + filterVerticesByStatus: func(g *Graph[V], v V, status string) []V { return g.filterParents(v, status) }, + requiredVertexStatus: requiredVertexStatus, + adjacentVertexSkipStatus: adjacentVertexSkipStatus, + processVertex: processVertexFunc, + } + return traversal.execute(ctx, g) +} + +type graphTraversal[V comparable] struct { + mu sync.Mutex + seen map[V]struct{} + findBoundaryVertices func(*Graph[V]) []V + findAdjacentVertices func(*Graph[V], V) []V + filterVerticesByStatus func(*Graph[V], V, string) []V + requiredVertexStatus string + adjacentVertexSkipStatus string + processVertex func(context.Context, V) error +} + +func (t *graphTraversal[V]) execute(ctx context.Context, graph *Graph[V]) error { + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + vertexCount := len(graph.vertices) + if vertexCount == 0 { + return nil + } + + var wg sync.WaitGroup + vertexCh := make(chan V, vertexCount) + errCh := make(chan error, vertexCount) + + processVertices := func(ctx context.Context, graph *Graph[V], wg *sync.WaitGroup, vertices []V, vertexCh chan V) { + for _, vertex := range vertices { + vertex := vertex + // Delay processing this vertex if any of its dependent vertices are yet to be processed. + if len(t.filterVerticesByStatus(graph, vertex, t.adjacentVertexSkipStatus)) != 0 { + continue + } + if !t.markAsSeen(vertex) { + // Skip this vertex if it's already been processed by another go routine. + continue + } + wg.Add(1) + go func() { + defer wg.Done() + if err := t.processVertex(ctx, vertex); err != nil { + errCh <- err + return + } + graph.updateStatus(vertex, t.requiredVertexStatus) + vertexCh <- vertex + }() + } + } + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case vertex := <-vertexCh: + vertexCount-- + if vertexCount == 0 { + return + } + processVertices(ctx, graph, &wg, t.findAdjacentVertices(graph, vertex), vertexCh) + } + } + }() + processVertices(ctx, graph, &wg, t.findBoundaryVertices(graph), vertexCh) + wg.Wait() + close(errCh) + + for err := range errCh { + if err != nil { + return err + } + } + return nil +} + +func (t *graphTraversal[V]) markAsSeen(vertex V) bool { + t.mu.Lock() + defer t.mu.Unlock() + if _, seen := t.seen[vertex]; seen { + return false + } + t.seen[vertex] = struct{}{} + return true +} diff --git a/internal/pkg/graph/graph_test.go b/internal/pkg/graph/graph_test.go index 71121718314..4c857af9a02 100644 --- a/internal/pkg/graph/graph_test.go +++ b/internal/pkg/graph/graph_test.go @@ -4,6 +4,7 @@ package graph import ( + "context" "strings" "testing" @@ -13,7 +14,7 @@ import ( func TestGraph_Add(t *testing.T) { t.Run("success", func(t *testing.T) { // GIVEN - graph := New[string]() + graph := New[string]([]string{}) // WHEN // A <-> B @@ -44,7 +45,7 @@ func TestGraph_InDegree(t *testing.T) { wanted map[rune]int }{ "should return 0 for nodes that don't exist in the graph": { - graph: New[rune](), + graph: New[rune]([]rune{}), wanted: map[rune]int{ 'a': 0, @@ -52,7 +53,7 @@ func TestGraph_InDegree(t *testing.T) { }, "should return number of incoming edges for complex graph": { graph: func() *Graph[rune] { - g := New[rune]() + g := New[rune]([]rune{}) g.Add(Edge[rune]{'a', 'b'}) g.Add(Edge[rune]{'b', 'a'}) g.Add(Edge[rune]{'a', 'c'}) @@ -88,7 +89,7 @@ func TestGraph_Remove(t *testing.T) { }{ "edge deletion should be idempotent": { graph: func() *Graph[rune] { - g := New[rune]() + g := New[rune]([]rune{}) g.Add(Edge[rune]{'a', 'b'}) g.Add(Edge[rune]{'z', 'b'}) g.Remove(Edge[rune]{'a', 'b'}) @@ -123,13 +124,13 @@ func TestGraph_Remove(t *testing.T) { func TestGraph_IsAcyclic(t *testing.T) { testCases := map[string]struct { - graph Graph[string] + graph *Graph[string] isAcyclic bool cycle []string }{ "small non acyclic graph": { - graph: Graph[string]{ + graph: &Graph[string]{ vertices: map[string]neighbors[string]{ "A": {"B": true, "C": true}, "B": {"A": true}, @@ -140,7 +141,7 @@ func TestGraph_IsAcyclic(t *testing.T) { cycle: []string{"A", "B"}, }, "non acyclic": { - graph: Graph[string]{ + graph: &Graph[string]{ vertices: map[string]neighbors[string]{ "K": {"F": true}, "A": {"B": true, "C": true}, @@ -155,7 +156,7 @@ func TestGraph_IsAcyclic(t *testing.T) { cycle: []string{"A", "G", "E", "B"}, }, "acyclic": { - graph: Graph[string]{ + graph: &Graph[string]{ vertices: map[string]neighbors[string]{ "A": {"B": true, "C": true}, "B": {"D": true}, @@ -186,15 +187,15 @@ func TestGraph_Roots(t *testing.T) { wantedRoots []int }{ "should return nil if the graph is empty": { - graph: New[int](), + graph: New[int]([]int{}), }, "should return all the vertices if there are no edges in the graph": { - graph: New[int](1, 2, 3, 4, 5), + graph: New[int]([]int{1, 2, 3, 4, 5}), wantedRoots: []int{1, 2, 3, 4, 5}, }, "should return only vertices with no in degrees": { graph: func() *Graph[int] { - g := New[int]() + g := New[int]([]int{}) g.Add(Edge[int]{ From: 1, To: 3, @@ -231,7 +232,7 @@ func TestTopologicalOrder(t *testing.T) { "should return an error when a cycle is detected": { // frontend <-> backend graph: func() *Graph[string] { - g := New("frontend", "backend") + g := New([]string{"frontend", "backend"}) g.Add(Edge[string]{ From: "frontend", To: "backend", @@ -247,7 +248,7 @@ func TestTopologicalOrder(t *testing.T) { "should return the ranks for a graph that looks like a bus": { // vpc -> lb -> api graph: func() *Graph[string] { - g := New[string]() + g := New[string]([]string{}) g.Add(Edge[string]{ From: "vpc", To: "lb", @@ -270,7 +271,7 @@ func TestTopologicalOrder(t *testing.T) { // vpc -> rds -> backend // -> s3 -> api // -> frontend - g := New[string]() + g := New[string]([]string{}) g.Add(Edge[string]{ From: "vpc", To: "rds", @@ -307,7 +308,7 @@ func TestTopologicalOrder(t *testing.T) { graph: func() *Graph[string] { // warehouse -> orders -> frontend // payments -> - g := New[string]() + g := New[string]([]string{}) g.Add(Edge[string]{ From: "payments", To: "frontend", @@ -334,7 +335,7 @@ func TestTopologicalOrder(t *testing.T) { graph: func() *Graph[string] { // a -> b -> c -> d -> f // a -> e -> f - g := New[string]() + g := New[string]([]string{}) for _, edge := range []Edge[string]{{"a", "b"}, {"b", "c"}, {"c", "d"}, {"d", "f"}, {"a", "e"}, {"e", "f"}} { g.Add(edge) } @@ -369,3 +370,71 @@ func TestTopologicalOrder(t *testing.T) { }) } } + +func buildGraphWithSingleParent() *Graph[string] { + vertices := []string{"A", "B", "C", "D"} + graph := New[string](vertices, WithStatus[string]("started")) + graph.Add(Edge[string]{From: "D", To: "C"}) // D -> C + graph.Add(Edge[string]{From: "C", To: "B"}) // C -> B + graph.Add(Edge[string]{From: "B", To: "A"}) // B -> A + return graph +} + +func TestTraverseInDependencyOrder(t *testing.T) { + t.Run("graph with single root vertex", func(t *testing.T) { + graph := buildGraphWithSingleParent() + var visited []string + processFn := func(ctx context.Context, v string) error { + visited = append(visited, v) + return nil + } + err := graph.UpwardTraversal(context.Background(), processFn, "started", "stopped") + require.NoError(t, err) + expected := []string{"A", "B", "C", "D"} + require.Equal(t, expected, visited) + }) + t.Run("graph with multiple parents and boundary nodes", func(t *testing.T) { + vertices := []string{"A", "B", "C", "D"} + graph := New[string](vertices, WithStatus[string]("started")) + graph.Add(Edge[string]{From: "A", To: "C"}) + graph.Add(Edge[string]{From: "A", To: "D"}) + graph.Add(Edge[string]{From: "B", To: "D"}) + vtxChan := make(chan string, 4) + seen := make(map[string]int) + done := make(chan struct{}) + go func() { + for _, vtx := range vertices { + seen[vtx]++ + } + done <- struct{}{} + }() + + err := graph.DownwardTraversal(context.Background(), func(ctx context.Context, vertice string) error { + vtxChan <- vertice + return nil + }, "started", "stopped") + require.NoError(t, err, "Error during iteration") + close(vtxChan) + <-done + + require.Len(t, seen, 4) + for vtx, count := range seen { + require.Equal(t, 1, count, "%s", vtx) + } + }) +} + +func TestTraverseInReverseDependencyOrder(t *testing.T) { + t.Run("Graph with single root vertex", func(t *testing.T) { + graph := buildGraphWithSingleParent() + var visited []string + processFn := func(ctx context.Context, v string) error { + visited = append(visited, v) + return nil + } + err := graph.DownwardTraversal(context.Background(), processFn, "started", "stopped") + require.NoError(t, err) + expected := []string{"D", "C", "B", "A"} + require.Equal(t, expected, visited) + }) +} diff --git a/internal/pkg/manifest/validate.go b/internal/pkg/manifest/validate.go index a6598b3ae01..e3de0736184 100644 --- a/internal/pkg/manifest/validate.go +++ b/internal/pkg/manifest/validate.go @@ -2331,7 +2331,7 @@ func validateNoCircularDependencies(deps map[string]containerDependency) error { } func buildDependencyGraph(deps map[string]containerDependency) (*graph.Graph[string], error) { - dependencyGraph := graph.New[string]() + dependencyGraph := graph.New[string]([]string{}) for name, containerDep := range deps { for dep := range containerDep.dependsOn { if _, ok := deps[dep]; !ok {