From a966d109ed4a74aac4749522424a37b936e4516d Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Wed, 29 Nov 2023 17:03:23 -0800 Subject: [PATCH 01/12] chore: traverse func container start and shutdown --- internal/pkg/graph/graph.go | 243 +++++++++++++++++++++++++++++- internal/pkg/graph/graph_test.go | 101 +++++++++++-- internal/pkg/manifest/validate.go | 2 +- 3 files changed, 321 insertions(+), 25 deletions(-) diff --git a/internal/pkg/graph/graph.go b/internal/pkg/graph/graph.go index 9f94c3ebcdd..b610129a59d 100644 --- a/internal/pkg/graph/graph.go +++ b/internal/pkg/graph/graph.go @@ -4,6 +4,11 @@ // Package graph provides functionality for directed graphs. package graph +import ( + "context" + "sync" +) + // vertexStatus denotes the visiting status of a vertex when running DFS in a graph. type vertexStatus int @@ -17,6 +22,8 @@ const ( type Graph[V comparable] struct { vertices map[V]neighbors[V] // Adjacency list for each vertex. inDegrees map[V]int // Number of incoming edges for each vertex. + status map[V]string // status of each vertex. + lock sync.Mutex // lock used to mutate graph data. } // Edge represents one edge of a directed graph. @@ -28,17 +35,34 @@ type Edge[V comparable] struct { type neighbors[V comparable] map[V]bool // New initiates a new Graph. -func New[V comparable](vertices ...V) *Graph[V] { +func New[V comparable](vertices []V, opts ...GraphOption[V]) *Graph[V] { adj := make(map[V]neighbors[V]) inDegrees := make(map[V]int) for _, vertex := range vertices { adj[vertex] = make(neighbors[V]) inDegrees[vertex] = 0 } - return &Graph[V]{ + g := &Graph[V]{ vertices: adj, inDegrees: inDegrees, } + for _, opt := range opts { + opt(g) + } + return g +} + +// GraphOption allows you to initialize Graph with additional properties. +type GraphOption[V comparable] func(g *Graph[V]) + +// WithStatus sets the status of each vertex in the Graph. +func WithStatus[V comparable](status string) func(g *Graph[V]) { + return func(g *Graph[V]) { + g.status = make(map[V]string) + for vertex := range g.vertices { + g.status[vertex] = status + } + } } // Neighbors returns the list of connected vertices from vtx. @@ -135,6 +159,77 @@ func (g *Graph[V]) Roots() []V { return roots } +// updateStatus updates the status of a vertex. +func (g *Graph[V]) updateStatus(vertex V, status string) { + g.lock.Lock() + defer g.lock.Unlock() + g.status[vertex] = status +} + +// getStatus gets the status of a vertex. +func (g *Graph[V]) getStatus(vertex V) string { + g.lock.Lock() + defer g.lock.Unlock() + return g.status[vertex] +} + +// getLeaves returns the leaves of a given vertex. +func (g *Graph[V]) leaves() []V { + g.lock.Lock() + defer g.lock.Unlock() + var leaves []V + for vtx := range g.vertices { + if len(g.vertices[vtx]) == 0 { + leaves = append(leaves, vtx) + } + } + return leaves +} + +// getParents returns the parent vertices (incoming edges) of vertex. +func (g *Graph[V]) parents(vtx V) []V { + g.lock.Lock() + defer g.lock.Unlock() + var parents []V + for v, neighbors := range g.vertices { + if neighbors[vtx] { + parents = append(parents, v) + } + } + return parents +} + +// getChildren returns the child vertices (outgoing edges) of vertex. +func (g *Graph[V]) children(vtx V) []V { + g.lock.Lock() + defer g.lock.Unlock() + return g.Neighbors(vtx) +} + +// filterParents filters parents based on the vertex status. +func (g *Graph[V]) filterParents(vtx V, status string) []V { + parents := g.parents(vtx) + var filtered []V + for _, parent := range parents { + if g.getStatus(parent) == status { + filtered = append(filtered, parent) + } + } + return filtered +} + +// filterChildren filters children based on the vertex status. +func (g *Graph[V]) filterChildren(vtx V, status string) []V { + children := g.children(vtx) + var filtered []V + for _, child := range children { + if g.getStatus(child) == status { + filtered = append(filtered, child) + } + } + return filtered +} + func (g *Graph[V]) hasCycles(temp *findCycleTempVars[V], currVertex V) bool { temp.status[currVertex] = visiting for vertex := range g.vertices[currVertex] { @@ -196,12 +291,13 @@ func (alg *TopologicalSorter[V]) traverse(g *Graph[V]) { // // An example graph and their ranks is shown below to illustrate: // . -//├── a rank: 0 -//│ ├── c rank: 1 -//│ │ └── f rank: 2 -//│ └── d rank: 1 -//└── b rank: 0 -// └── e rank: 1 +// ├── a rank: 0 +// │ ├── c rank: 1 +// │ │ └── f rank: 2 +// │ └── d rank: 1 +// └── b rank: 0 +// +// └── e rank: 1 func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], error) { if vertices, isAcyclic := digraph.IsAcyclic(); !isAcyclic { return nil, &errCycle[V]{ @@ -215,3 +311,134 @@ func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], e topo.traverse(digraph) return topo, nil } + +/* +UpwardTraversal performs an upward traversal on the graph starting from leaves (nodes with no children) +and moving towards root nodes (nodes with children). +It applies the specified process function to each vertex in the graph, skipping vertices with the +"adjacentVertexSkipStatus" status, and continuing traversal until reaching vertices with the "requiredVertexStatus" status. +The traversal is concurrent and may process vertices in parallel. +Returns an error if the traversal encounters any issues, or nil if successful. +*/ +func (g *Graph[V]) UpwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error { + traversal := &graphTraversal[V]{ + mu: sync.Mutex{}, + seen: make(map[V]struct{}), + findBoundaryVertices: func(g *Graph[V]) []V { return g.leaves() }, + findAdjacentVertices: func(g *Graph[V], v V) []V { return g.parents(v) }, + filterVerticesByStatus: func(g *Graph[V], v V, status string) []V { return g.filterChildren(v, status) }, + requiredVertexStatus: requiredVertexStatus, + adjacentVertexSkipStatus: adjacentVertexSkipStatus, + processVertex: processVertexFunc, + } + return traversal.execute(ctx, g) +} + +/* +DownwardTraversal performs a downward traversal on the graph starting from root nodes (nodes with no parents) +and moving towards leaf nodes (nodes with parents). It applies the specified process function to each +vertex in the graph, skipping vertices with the "adjacentVertexSkipStatus" status, and continuing traversal +until reaching vertices with the "requiredVertexStatus" status. +The traversal is concurrent and may process vertices in parallel. +Returns an error if the traversal encounters any issues. +*/ +func (g *Graph[V]) DownwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error { + traversal := &graphTraversal[V]{ + mu: sync.Mutex{}, + seen: make(map[V]struct{}), + findBoundaryVertices: func(g *Graph[V]) []V { return g.Roots() }, + findAdjacentVertices: func(g *Graph[V], v V) []V { return g.children(v) }, + filterVerticesByStatus: func(g *Graph[V], v V, status string) []V { return g.filterParents(v, status) }, + requiredVertexStatus: requiredVertexStatus, + adjacentVertexSkipStatus: adjacentVertexSkipStatus, + processVertex: processVertexFunc, + } + return traversal.execute(ctx, g) +} + +type graphTraversal[V comparable] struct { + mu sync.Mutex + seen map[V]struct{} + findBoundaryVertices func(*Graph[V]) []V + findAdjacentVertices func(*Graph[V], V) []V + filterVerticesByStatus func(*Graph[V], V, string) []V + requiredVertexStatus string + adjacentVertexSkipStatus string + processVertex func(context.Context, V) error +} + +func (t *graphTraversal[V]) execute(ctx context.Context, graph *Graph[V]) error { + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + vertexCount := len(graph.vertices) + if vertexCount == 0 { + return nil + } + + var wg sync.WaitGroup + vertexCh := make(chan V, vertexCount) + errCh := make(chan error, vertexCount) + + processVertices := func(ctx context.Context, graph *Graph[V], wg *sync.WaitGroup, vertices []V, vertexCh chan V) { + for _, vertex := range vertices { + vertex := vertex + // Delay processing this vertex if any of its dependent vertices are yet to be processed. + if len(t.filterVerticesByStatus(graph, vertex, t.adjacentVertexSkipStatus)) != 0 { + continue + } + if !t.markAsSeen(vertex) { + // Skip this vertex if it's already been processed by another go routine. + continue + } + wg.Add(1) + go func() { + defer wg.Done() + if err := t.processVertex(ctx, vertex); err != nil { + errCh <- err + return + } + graph.updateStatus(vertex, t.requiredVertexStatus) + vertexCh <- vertex + }() + } + } + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case vertex := <-vertexCh: + vertexCount-- + if vertexCount == 0 { + return + } + processVertices(ctx, graph, &wg, t.findAdjacentVertices(graph, vertex), vertexCh) + } + } + }() + processVertices(ctx, graph, &wg, t.findBoundaryVertices(graph), vertexCh) + wg.Wait() + close(errCh) + + for err := range errCh { + if err != nil { + return err + } + } + return nil +} + +func (t *graphTraversal[V]) markAsSeen(vertex V) bool { + t.mu.Lock() + defer t.mu.Unlock() + if _, seen := t.seen[vertex]; seen { + return false + } + t.seen[vertex] = struct{}{} + return true +} diff --git a/internal/pkg/graph/graph_test.go b/internal/pkg/graph/graph_test.go index 71121718314..4c857af9a02 100644 --- a/internal/pkg/graph/graph_test.go +++ b/internal/pkg/graph/graph_test.go @@ -4,6 +4,7 @@ package graph import ( + "context" "strings" "testing" @@ -13,7 +14,7 @@ import ( func TestGraph_Add(t *testing.T) { t.Run("success", func(t *testing.T) { // GIVEN - graph := New[string]() + graph := New[string]([]string{}) // WHEN // A <-> B @@ -44,7 +45,7 @@ func TestGraph_InDegree(t *testing.T) { wanted map[rune]int }{ "should return 0 for nodes that don't exist in the graph": { - graph: New[rune](), + graph: New[rune]([]rune{}), wanted: map[rune]int{ 'a': 0, @@ -52,7 +53,7 @@ func TestGraph_InDegree(t *testing.T) { }, "should return number of incoming edges for complex graph": { graph: func() *Graph[rune] { - g := New[rune]() + g := New[rune]([]rune{}) g.Add(Edge[rune]{'a', 'b'}) g.Add(Edge[rune]{'b', 'a'}) g.Add(Edge[rune]{'a', 'c'}) @@ -88,7 +89,7 @@ func TestGraph_Remove(t *testing.T) { }{ "edge deletion should be idempotent": { graph: func() *Graph[rune] { - g := New[rune]() + g := New[rune]([]rune{}) g.Add(Edge[rune]{'a', 'b'}) g.Add(Edge[rune]{'z', 'b'}) g.Remove(Edge[rune]{'a', 'b'}) @@ -123,13 +124,13 @@ func TestGraph_Remove(t *testing.T) { func TestGraph_IsAcyclic(t *testing.T) { testCases := map[string]struct { - graph Graph[string] + graph *Graph[string] isAcyclic bool cycle []string }{ "small non acyclic graph": { - graph: Graph[string]{ + graph: &Graph[string]{ vertices: map[string]neighbors[string]{ "A": {"B": true, "C": true}, "B": {"A": true}, @@ -140,7 +141,7 @@ func TestGraph_IsAcyclic(t *testing.T) { cycle: []string{"A", "B"}, }, "non acyclic": { - graph: Graph[string]{ + graph: &Graph[string]{ vertices: map[string]neighbors[string]{ "K": {"F": true}, "A": {"B": true, "C": true}, @@ -155,7 +156,7 @@ func TestGraph_IsAcyclic(t *testing.T) { cycle: []string{"A", "G", "E", "B"}, }, "acyclic": { - graph: Graph[string]{ + graph: &Graph[string]{ vertices: map[string]neighbors[string]{ "A": {"B": true, "C": true}, "B": {"D": true}, @@ -186,15 +187,15 @@ func TestGraph_Roots(t *testing.T) { wantedRoots []int }{ "should return nil if the graph is empty": { - graph: New[int](), + graph: New[int]([]int{}), }, "should return all the vertices if there are no edges in the graph": { - graph: New[int](1, 2, 3, 4, 5), + graph: New[int]([]int{1, 2, 3, 4, 5}), wantedRoots: []int{1, 2, 3, 4, 5}, }, "should return only vertices with no in degrees": { graph: func() *Graph[int] { - g := New[int]() + g := New[int]([]int{}) g.Add(Edge[int]{ From: 1, To: 3, @@ -231,7 +232,7 @@ func TestTopologicalOrder(t *testing.T) { "should return an error when a cycle is detected": { // frontend <-> backend graph: func() *Graph[string] { - g := New("frontend", "backend") + g := New([]string{"frontend", "backend"}) g.Add(Edge[string]{ From: "frontend", To: "backend", @@ -247,7 +248,7 @@ func TestTopologicalOrder(t *testing.T) { "should return the ranks for a graph that looks like a bus": { // vpc -> lb -> api graph: func() *Graph[string] { - g := New[string]() + g := New[string]([]string{}) g.Add(Edge[string]{ From: "vpc", To: "lb", @@ -270,7 +271,7 @@ func TestTopologicalOrder(t *testing.T) { // vpc -> rds -> backend // -> s3 -> api // -> frontend - g := New[string]() + g := New[string]([]string{}) g.Add(Edge[string]{ From: "vpc", To: "rds", @@ -307,7 +308,7 @@ func TestTopologicalOrder(t *testing.T) { graph: func() *Graph[string] { // warehouse -> orders -> frontend // payments -> - g := New[string]() + g := New[string]([]string{}) g.Add(Edge[string]{ From: "payments", To: "frontend", @@ -334,7 +335,7 @@ func TestTopologicalOrder(t *testing.T) { graph: func() *Graph[string] { // a -> b -> c -> d -> f // a -> e -> f - g := New[string]() + g := New[string]([]string{}) for _, edge := range []Edge[string]{{"a", "b"}, {"b", "c"}, {"c", "d"}, {"d", "f"}, {"a", "e"}, {"e", "f"}} { g.Add(edge) } @@ -369,3 +370,71 @@ func TestTopologicalOrder(t *testing.T) { }) } } + +func buildGraphWithSingleParent() *Graph[string] { + vertices := []string{"A", "B", "C", "D"} + graph := New[string](vertices, WithStatus[string]("started")) + graph.Add(Edge[string]{From: "D", To: "C"}) // D -> C + graph.Add(Edge[string]{From: "C", To: "B"}) // C -> B + graph.Add(Edge[string]{From: "B", To: "A"}) // B -> A + return graph +} + +func TestTraverseInDependencyOrder(t *testing.T) { + t.Run("graph with single root vertex", func(t *testing.T) { + graph := buildGraphWithSingleParent() + var visited []string + processFn := func(ctx context.Context, v string) error { + visited = append(visited, v) + return nil + } + err := graph.UpwardTraversal(context.Background(), processFn, "started", "stopped") + require.NoError(t, err) + expected := []string{"A", "B", "C", "D"} + require.Equal(t, expected, visited) + }) + t.Run("graph with multiple parents and boundary nodes", func(t *testing.T) { + vertices := []string{"A", "B", "C", "D"} + graph := New[string](vertices, WithStatus[string]("started")) + graph.Add(Edge[string]{From: "A", To: "C"}) + graph.Add(Edge[string]{From: "A", To: "D"}) + graph.Add(Edge[string]{From: "B", To: "D"}) + vtxChan := make(chan string, 4) + seen := make(map[string]int) + done := make(chan struct{}) + go func() { + for _, vtx := range vertices { + seen[vtx]++ + } + done <- struct{}{} + }() + + err := graph.DownwardTraversal(context.Background(), func(ctx context.Context, vertice string) error { + vtxChan <- vertice + return nil + }, "started", "stopped") + require.NoError(t, err, "Error during iteration") + close(vtxChan) + <-done + + require.Len(t, seen, 4) + for vtx, count := range seen { + require.Equal(t, 1, count, "%s", vtx) + } + }) +} + +func TestTraverseInReverseDependencyOrder(t *testing.T) { + t.Run("Graph with single root vertex", func(t *testing.T) { + graph := buildGraphWithSingleParent() + var visited []string + processFn := func(ctx context.Context, v string) error { + visited = append(visited, v) + return nil + } + err := graph.DownwardTraversal(context.Background(), processFn, "started", "stopped") + require.NoError(t, err) + expected := []string{"D", "C", "B", "A"} + require.Equal(t, expected, visited) + }) +} diff --git a/internal/pkg/manifest/validate.go b/internal/pkg/manifest/validate.go index a6598b3ae01..e3de0736184 100644 --- a/internal/pkg/manifest/validate.go +++ b/internal/pkg/manifest/validate.go @@ -2331,7 +2331,7 @@ func validateNoCircularDependencies(deps map[string]containerDependency) error { } func buildDependencyGraph(deps map[string]containerDependency) (*graph.Graph[string], error) { - dependencyGraph := graph.New[string]() + dependencyGraph := graph.New[string]([]string{}) for name, containerDep := range deps { for dep := range containerDep.dependsOn { if _, ok := deps[dep]; !ok { From 99482affa8ff6d014799f905dbc9d428c45b402f Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Wed, 29 Nov 2023 17:08:38 -0800 Subject: [PATCH 02/12] fix static check --- internal/pkg/deploy/pipeline.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/pkg/deploy/pipeline.go b/internal/pkg/deploy/pipeline.go index bee9a60c5e1..355951a69a0 100644 --- a/internal/pkg/deploy/pipeline.go +++ b/internal/pkg/deploy/pipeline.go @@ -8,13 +8,14 @@ package deploy import ( "errors" "fmt" - "gopkg.in/yaml.v3" "path" "path/filepath" "regexp" "sort" "strings" + "gopkg.in/yaml.v3" + "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/copilot-cli/internal/pkg/config" "github.com/aws/copilot-cli/internal/pkg/graph" @@ -780,7 +781,7 @@ func (stg *PipelineStage) buildActionsGraph(rankables []actionGraphNode) *graph. for _, r := range rankables { names = append(names, r.name) } - digraph := graph.New(names...) + digraph := graph.New(names) for _, r := range rankables { if r.depends_on == nil { From 4374f8d4f612d74c7fa03be5b6652e43e2d89812 Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Wed, 29 Nov 2023 19:17:28 -0800 Subject: [PATCH 03/12] use errorgroup instead of waitGroup --- internal/pkg/graph/graph.go | 46 +++++++++++++++---------------------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/internal/pkg/graph/graph.go b/internal/pkg/graph/graph.go index b610129a59d..842e8f3b96a 100644 --- a/internal/pkg/graph/graph.go +++ b/internal/pkg/graph/graph.go @@ -7,6 +7,8 @@ package graph import ( "context" "sync" + + "golang.org/x/sync/errgroup" ) // vertexStatus denotes the visiting status of a vertex when running DFS in a graph. @@ -376,12 +378,11 @@ func (t *graphTraversal[V]) execute(ctx context.Context, graph *Graph[V]) error if vertexCount == 0 { return nil } - - var wg sync.WaitGroup + eg, ctx := errgroup.WithContext(ctx) vertexCh := make(chan V, vertexCount) - errCh := make(chan error, vertexCount) + defer close(vertexCh) - processVertices := func(ctx context.Context, graph *Graph[V], wg *sync.WaitGroup, vertices []V, vertexCh chan V) { + processVertices := func(ctx context.Context, graph *Graph[V], eg *errgroup.Group, vertices []V, vertexCh chan V) { for _, vertex := range vertices { vertex := vertex // Delay processing this vertex if any of its dependent vertices are yet to be processed. @@ -389,48 +390,37 @@ func (t *graphTraversal[V]) execute(ctx context.Context, graph *Graph[V]) error continue } if !t.markAsSeen(vertex) { - // Skip this vertex if it's already been processed by another go routine. + // Skip this vertex if it's already been processed by another routine. continue } - wg.Add(1) - go func() { - defer wg.Done() + eg.Go(func() error { if err := t.processVertex(ctx, vertex); err != nil { - errCh <- err - return + return err } + // Assign new status to the vertex upon successful processing. graph.updateStatus(vertex, t.requiredVertexStatus) vertexCh <- vertex - }() + return nil + }) } } - wg.Add(1) - go func() { - defer wg.Done() + eg.Go(func() error { for { select { case <-ctx.Done(): - return + return nil case vertex := <-vertexCh: vertexCount-- if vertexCount == 0 { - return + return nil } - processVertices(ctx, graph, &wg, t.findAdjacentVertices(graph, vertex), vertexCh) + processVertices(ctx, graph, eg, t.findAdjacentVertices(graph, vertex), vertexCh) } } - }() - processVertices(ctx, graph, &wg, t.findBoundaryVertices(graph), vertexCh) - wg.Wait() - close(errCh) - - for err := range errCh { - if err != nil { - return err - } - } - return nil + }) + processVertices(ctx, graph, eg, t.findBoundaryVertices(graph), vertexCh) + return eg.Wait() } func (t *graphTraversal[V]) markAsSeen(vertex V) bool { From 3c38bb1fb86a7040ec745d6a4c15b4a3004ad863 Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Fri, 1 Dec 2023 13:24:10 -0800 Subject: [PATCH 04/12] embed graph into labeled graph for decoupling --- internal/pkg/deploy/pipeline.go | 2 +- internal/pkg/graph/graph.go | 236 ++++++++++++++++-------------- internal/pkg/graph/graph_test.go | 38 ++--- internal/pkg/manifest/validate.go | 2 +- 4 files changed, 147 insertions(+), 131 deletions(-) diff --git a/internal/pkg/deploy/pipeline.go b/internal/pkg/deploy/pipeline.go index 355951a69a0..0f1b55a9957 100644 --- a/internal/pkg/deploy/pipeline.go +++ b/internal/pkg/deploy/pipeline.go @@ -781,7 +781,7 @@ func (stg *PipelineStage) buildActionsGraph(rankables []actionGraphNode) *graph. for _, r := range rankables { names = append(names, r.name) } - digraph := graph.New(names) + digraph := graph.New[string]() for _, r := range rankables { if r.depends_on == nil { diff --git a/internal/pkg/graph/graph.go b/internal/pkg/graph/graph.go index 842e8f3b96a..f53d4869bd3 100644 --- a/internal/pkg/graph/graph.go +++ b/internal/pkg/graph/graph.go @@ -24,8 +24,6 @@ const ( type Graph[V comparable] struct { vertices map[V]neighbors[V] // Adjacency list for each vertex. inDegrees map[V]int // Number of incoming edges for each vertex. - status map[V]string // status of each vertex. - lock sync.Mutex // lock used to mutate graph data. } // Edge represents one edge of a directed graph. @@ -37,34 +35,17 @@ type Edge[V comparable] struct { type neighbors[V comparable] map[V]bool // New initiates a new Graph. -func New[V comparable](vertices []V, opts ...GraphOption[V]) *Graph[V] { +func New[V comparable](vertices ...V) *Graph[V] { adj := make(map[V]neighbors[V]) inDegrees := make(map[V]int) for _, vertex := range vertices { adj[vertex] = make(neighbors[V]) inDegrees[vertex] = 0 } - g := &Graph[V]{ + return &Graph[V]{ vertices: adj, inDegrees: inDegrees, } - for _, opt := range opts { - opt(g) - } - return g -} - -// GraphOption allows you to initialize Graph with additional properties. -type GraphOption[V comparable] func(g *Graph[V]) - -// WithStatus sets the status of each vertex in the Graph. -func WithStatus[V comparable](status string) func(g *Graph[V]) { - return func(g *Graph[V]) { - g.status = make(map[V]string) - for vertex := range g.vertices { - g.status[vertex] = status - } - } } // Neighbors returns the list of connected vertices from vtx. @@ -161,77 +142,6 @@ func (g *Graph[V]) Roots() []V { return roots } -// updateStatus updates the status of a vertex. -func (g *Graph[V]) updateStatus(vertex V, status string) { - g.lock.Lock() - defer g.lock.Unlock() - g.status[vertex] = status -} - -// getStatus gets the status of a vertex. -func (g *Graph[V]) getStatus(vertex V) string { - g.lock.Lock() - defer g.lock.Unlock() - return g.status[vertex] -} - -// getLeaves returns the leaves of a given vertex. -func (g *Graph[V]) leaves() []V { - g.lock.Lock() - defer g.lock.Unlock() - var leaves []V - for vtx := range g.vertices { - if len(g.vertices[vtx]) == 0 { - leaves = append(leaves, vtx) - } - } - return leaves -} - -// getParents returns the parent vertices (incoming edges) of vertex. -func (g *Graph[V]) parents(vtx V) []V { - g.lock.Lock() - defer g.lock.Unlock() - var parents []V - for v, neighbors := range g.vertices { - if neighbors[vtx] { - parents = append(parents, v) - } - } - return parents -} - -// getChildren returns the child vertices (outgoing edges) of vertex. -func (g *Graph[V]) children(vtx V) []V { - g.lock.Lock() - defer g.lock.Unlock() - return g.Neighbors(vtx) -} - -// filterParents filters parents based on the vertex status. -func (g *Graph[V]) filterParents(vtx V, status string) []V { - parents := g.parents(vtx) - var filtered []V - for _, parent := range parents { - if g.getStatus(parent) == status { - filtered = append(filtered, parent) - } - } - return filtered -} - -// filterChildren filters children based on the vertex status. -func (g *Graph[V]) filterChildren(vtx V, status string) []V { - children := g.children(vtx) - var filtered []V - for _, child := range children { - if g.getStatus(child) == status { - filtered = append(filtered, child) - } - } - return filtered -} - func (g *Graph[V]) hasCycles(temp *findCycleTempVars[V], currVertex V) bool { temp.status[currVertex] = visiting for vertex := range g.vertices[currVertex] { @@ -314,6 +224,112 @@ func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], e return topo, nil } +// LabeledGraph extends a generic Graph by associating a label (or status) with each vertex. +// It is concurrency-safe, utilizing a mutex lock for synchronized access. +type LabeledGraph[V comparable] struct { + *Graph[V] + status map[V]string + lock sync.Mutex +} + +// NewLabeledGraph initializes a LabeledGraph with specified vertices and optional configurations. +// It creates a base Graph with the vertices and applies any LabeledGraphOption to configure additional properties. +func NewLabeledGraph[V comparable](vertices []V, opts ...LabeledGraphOption[V]) *LabeledGraph[V] { + g := New(vertices...) + lg := &LabeledGraph[V]{ + Graph: g, + status: make(map[V]string), + } + for _, opt := range opts { + opt(lg) + } + return lg +} + +// LabeledGraphOption allows you to initialize Graph with additional properties. +type LabeledGraphOption[V comparable] func(g *LabeledGraph[V]) + +// WithStatus sets the status of each vertex in the Graph. +func WithStatus[V comparable](status string) func(g *LabeledGraph[V]) { + return func(g *LabeledGraph[V]) { + g.status = make(map[V]string) + for vertex := range g.vertices { + g.status[vertex] = status + } + } +} + +// updateStatus updates the status of a vertex. +func (lg *LabeledGraph[V]) updateStatus(vertex V, status string) { + lg.lock.Lock() + defer lg.lock.Unlock() + lg.status[vertex] = status +} + +// getStatus gets the status of a vertex. +func (lg *LabeledGraph[V]) getStatus(vertex V) string { + lg.lock.Lock() + defer lg.lock.Unlock() + return lg.status[vertex] +} + +// getLeaves returns the leaves of a given vertex. +func (lg *LabeledGraph[V]) leaves() []V { + lg.lock.Lock() + defer lg.lock.Unlock() + var leaves []V + for vtx := range lg.vertices { + if len(lg.vertices[vtx]) == 0 { + leaves = append(leaves, vtx) + } + } + return leaves +} + +// getParents returns the parent vertices (incoming edges) of vertex. +func (lg *LabeledGraph[V]) parents(vtx V) []V { + lg.lock.Lock() + defer lg.lock.Unlock() + var parents []V + for v, neighbors := range lg.vertices { + if neighbors[vtx] { + parents = append(parents, v) + } + } + return parents +} + +// getChildren returns the child vertices (outgoing edges) of vertex. +func (lg *LabeledGraph[V]) children(vtx V) []V { + lg.lock.Lock() + defer lg.lock.Unlock() + return lg.Neighbors(vtx) +} + +// filterParents filters parents based on the vertex status. +func (lg *LabeledGraph[V]) filterParents(vtx V, status string) []V { + parents := lg.parents(vtx) + var filtered []V + for _, parent := range parents { + if lg.getStatus(parent) == status { + filtered = append(filtered, parent) + } + } + return filtered +} + +// filterChildren filters children based on the vertex status. +func (lg *LabeledGraph[V]) filterChildren(vtx V, status string) []V { + children := lg.children(vtx) + var filtered []V + for _, child := range children { + if lg.getStatus(child) == status { + filtered = append(filtered, child) + } + } + return filtered +} + /* UpwardTraversal performs an upward traversal on the graph starting from leaves (nodes with no children) and moving towards root nodes (nodes with children). @@ -322,18 +338,18 @@ It applies the specified process function to each vertex in the graph, skipping The traversal is concurrent and may process vertices in parallel. Returns an error if the traversal encounters any issues, or nil if successful. */ -func (g *Graph[V]) UpwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error { +func (lg *LabeledGraph[V]) UpwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error { traversal := &graphTraversal[V]{ mu: sync.Mutex{}, seen: make(map[V]struct{}), - findBoundaryVertices: func(g *Graph[V]) []V { return g.leaves() }, - findAdjacentVertices: func(g *Graph[V], v V) []V { return g.parents(v) }, - filterVerticesByStatus: func(g *Graph[V], v V, status string) []V { return g.filterChildren(v, status) }, + findBoundaryVertices: func(lg *LabeledGraph[V]) []V { return lg.leaves() }, + findAdjacentVertices: func(lg *LabeledGraph[V], v V) []V { return lg.parents(v) }, + filterVerticesByStatus: func(g *LabeledGraph[V], v V, status string) []V { return g.filterChildren(v, status) }, requiredVertexStatus: requiredVertexStatus, adjacentVertexSkipStatus: adjacentVertexSkipStatus, processVertex: processVertexFunc, } - return traversal.execute(ctx, g) + return traversal.execute(ctx, lg) } /* @@ -344,37 +360,37 @@ until reaching vertices with the "requiredVertexStatus" status. The traversal is concurrent and may process vertices in parallel. Returns an error if the traversal encounters any issues. */ -func (g *Graph[V]) DownwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error { +func (lg *LabeledGraph[V]) DownwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error { traversal := &graphTraversal[V]{ mu: sync.Mutex{}, seen: make(map[V]struct{}), - findBoundaryVertices: func(g *Graph[V]) []V { return g.Roots() }, - findAdjacentVertices: func(g *Graph[V], v V) []V { return g.children(v) }, - filterVerticesByStatus: func(g *Graph[V], v V, status string) []V { return g.filterParents(v, status) }, + findBoundaryVertices: func(lg *LabeledGraph[V]) []V { return lg.Roots() }, + findAdjacentVertices: func(lg *LabeledGraph[V], v V) []V { return lg.children(v) }, + filterVerticesByStatus: func(lg *LabeledGraph[V], v V, status string) []V { return lg.filterParents(v, status) }, requiredVertexStatus: requiredVertexStatus, adjacentVertexSkipStatus: adjacentVertexSkipStatus, processVertex: processVertexFunc, } - return traversal.execute(ctx, g) + return traversal.execute(ctx, lg) } type graphTraversal[V comparable] struct { mu sync.Mutex seen map[V]struct{} - findBoundaryVertices func(*Graph[V]) []V - findAdjacentVertices func(*Graph[V], V) []V - filterVerticesByStatus func(*Graph[V], V, string) []V + findBoundaryVertices func(*LabeledGraph[V]) []V + findAdjacentVertices func(*LabeledGraph[V], V) []V + filterVerticesByStatus func(*LabeledGraph[V], V, string) []V requiredVertexStatus string adjacentVertexSkipStatus string processVertex func(context.Context, V) error } -func (t *graphTraversal[V]) execute(ctx context.Context, graph *Graph[V]) error { +func (t *graphTraversal[V]) execute(ctx context.Context, lg *LabeledGraph[V]) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - vertexCount := len(graph.vertices) + vertexCount := len(lg.vertices) if vertexCount == 0 { return nil } @@ -382,7 +398,7 @@ func (t *graphTraversal[V]) execute(ctx context.Context, graph *Graph[V]) error vertexCh := make(chan V, vertexCount) defer close(vertexCh) - processVertices := func(ctx context.Context, graph *Graph[V], eg *errgroup.Group, vertices []V, vertexCh chan V) { + processVertices := func(ctx context.Context, graph *LabeledGraph[V], eg *errgroup.Group, vertices []V, vertexCh chan V) { for _, vertex := range vertices { vertex := vertex // Delay processing this vertex if any of its dependent vertices are yet to be processed. @@ -415,11 +431,11 @@ func (t *graphTraversal[V]) execute(ctx context.Context, graph *Graph[V]) error if vertexCount == 0 { return nil } - processVertices(ctx, graph, eg, t.findAdjacentVertices(graph, vertex), vertexCh) + processVertices(ctx, lg, eg, t.findAdjacentVertices(lg, vertex), vertexCh) } } }) - processVertices(ctx, graph, eg, t.findBoundaryVertices(graph), vertexCh) + processVertices(ctx, lg, eg, t.findBoundaryVertices(lg), vertexCh) return eg.Wait() } diff --git a/internal/pkg/graph/graph_test.go b/internal/pkg/graph/graph_test.go index 4c857af9a02..8cbee84c37c 100644 --- a/internal/pkg/graph/graph_test.go +++ b/internal/pkg/graph/graph_test.go @@ -14,7 +14,7 @@ import ( func TestGraph_Add(t *testing.T) { t.Run("success", func(t *testing.T) { // GIVEN - graph := New[string]([]string{}) + graph := New[string]() // WHEN // A <-> B @@ -45,7 +45,7 @@ func TestGraph_InDegree(t *testing.T) { wanted map[rune]int }{ "should return 0 for nodes that don't exist in the graph": { - graph: New[rune]([]rune{}), + graph: New[rune](), wanted: map[rune]int{ 'a': 0, @@ -53,7 +53,7 @@ func TestGraph_InDegree(t *testing.T) { }, "should return number of incoming edges for complex graph": { graph: func() *Graph[rune] { - g := New[rune]([]rune{}) + g := New[rune]() g.Add(Edge[rune]{'a', 'b'}) g.Add(Edge[rune]{'b', 'a'}) g.Add(Edge[rune]{'a', 'c'}) @@ -89,7 +89,7 @@ func TestGraph_Remove(t *testing.T) { }{ "edge deletion should be idempotent": { graph: func() *Graph[rune] { - g := New[rune]([]rune{}) + g := New[rune]() g.Add(Edge[rune]{'a', 'b'}) g.Add(Edge[rune]{'z', 'b'}) g.Remove(Edge[rune]{'a', 'b'}) @@ -124,13 +124,13 @@ func TestGraph_Remove(t *testing.T) { func TestGraph_IsAcyclic(t *testing.T) { testCases := map[string]struct { - graph *Graph[string] + graph Graph[string] isAcyclic bool cycle []string }{ "small non acyclic graph": { - graph: &Graph[string]{ + graph: Graph[string]{ vertices: map[string]neighbors[string]{ "A": {"B": true, "C": true}, "B": {"A": true}, @@ -141,7 +141,7 @@ func TestGraph_IsAcyclic(t *testing.T) { cycle: []string{"A", "B"}, }, "non acyclic": { - graph: &Graph[string]{ + graph: Graph[string]{ vertices: map[string]neighbors[string]{ "K": {"F": true}, "A": {"B": true, "C": true}, @@ -156,7 +156,7 @@ func TestGraph_IsAcyclic(t *testing.T) { cycle: []string{"A", "G", "E", "B"}, }, "acyclic": { - graph: &Graph[string]{ + graph: Graph[string]{ vertices: map[string]neighbors[string]{ "A": {"B": true, "C": true}, "B": {"D": true}, @@ -187,15 +187,15 @@ func TestGraph_Roots(t *testing.T) { wantedRoots []int }{ "should return nil if the graph is empty": { - graph: New[int]([]int{}), + graph: New[int](), }, "should return all the vertices if there are no edges in the graph": { - graph: New[int]([]int{1, 2, 3, 4, 5}), + graph: New[int](1, 2, 3, 4, 5), wantedRoots: []int{1, 2, 3, 4, 5}, }, "should return only vertices with no in degrees": { graph: func() *Graph[int] { - g := New[int]([]int{}) + g := New[int]() g.Add(Edge[int]{ From: 1, To: 3, @@ -232,7 +232,7 @@ func TestTopologicalOrder(t *testing.T) { "should return an error when a cycle is detected": { // frontend <-> backend graph: func() *Graph[string] { - g := New([]string{"frontend", "backend"}) + g := New("frontend", "backend") g.Add(Edge[string]{ From: "frontend", To: "backend", @@ -248,7 +248,7 @@ func TestTopologicalOrder(t *testing.T) { "should return the ranks for a graph that looks like a bus": { // vpc -> lb -> api graph: func() *Graph[string] { - g := New[string]([]string{}) + g := New[string]() g.Add(Edge[string]{ From: "vpc", To: "lb", @@ -271,7 +271,7 @@ func TestTopologicalOrder(t *testing.T) { // vpc -> rds -> backend // -> s3 -> api // -> frontend - g := New[string]([]string{}) + g := New[string]() g.Add(Edge[string]{ From: "vpc", To: "rds", @@ -308,7 +308,7 @@ func TestTopologicalOrder(t *testing.T) { graph: func() *Graph[string] { // warehouse -> orders -> frontend // payments -> - g := New[string]([]string{}) + g := New[string]() g.Add(Edge[string]{ From: "payments", To: "frontend", @@ -335,7 +335,7 @@ func TestTopologicalOrder(t *testing.T) { graph: func() *Graph[string] { // a -> b -> c -> d -> f // a -> e -> f - g := New[string]([]string{}) + g := New[string]() for _, edge := range []Edge[string]{{"a", "b"}, {"b", "c"}, {"c", "d"}, {"d", "f"}, {"a", "e"}, {"e", "f"}} { g.Add(edge) } @@ -371,9 +371,9 @@ func TestTopologicalOrder(t *testing.T) { } } -func buildGraphWithSingleParent() *Graph[string] { +func buildGraphWithSingleParent() *LabeledGraph[string] { vertices := []string{"A", "B", "C", "D"} - graph := New[string](vertices, WithStatus[string]("started")) + graph := NewLabeledGraph[string](vertices, WithStatus[string]("started")) graph.Add(Edge[string]{From: "D", To: "C"}) // D -> C graph.Add(Edge[string]{From: "C", To: "B"}) // C -> B graph.Add(Edge[string]{From: "B", To: "A"}) // B -> A @@ -395,7 +395,7 @@ func TestTraverseInDependencyOrder(t *testing.T) { }) t.Run("graph with multiple parents and boundary nodes", func(t *testing.T) { vertices := []string{"A", "B", "C", "D"} - graph := New[string](vertices, WithStatus[string]("started")) + graph := NewLabeledGraph[string](vertices, WithStatus[string]("started")) graph.Add(Edge[string]{From: "A", To: "C"}) graph.Add(Edge[string]{From: "A", To: "D"}) graph.Add(Edge[string]{From: "B", To: "D"}) diff --git a/internal/pkg/manifest/validate.go b/internal/pkg/manifest/validate.go index e3de0736184..a6598b3ae01 100644 --- a/internal/pkg/manifest/validate.go +++ b/internal/pkg/manifest/validate.go @@ -2331,7 +2331,7 @@ func validateNoCircularDependencies(deps map[string]containerDependency) error { } func buildDependencyGraph(deps map[string]containerDependency) (*graph.Graph[string], error) { - dependencyGraph := graph.New[string]([]string{}) + dependencyGraph := graph.New[string]() for name, containerDep := range deps { for dep := range containerDep.dependsOn { if _, ok := deps[dep]; !ok { From 0e6dede3c6e3d37bf178531788c95bd950fa8661 Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Fri, 1 Dec 2023 13:26:05 -0800 Subject: [PATCH 05/12] remove empty line in the comment --- internal/pkg/graph/graph.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/pkg/graph/graph.go b/internal/pkg/graph/graph.go index f53d4869bd3..61a589b7994 100644 --- a/internal/pkg/graph/graph.go +++ b/internal/pkg/graph/graph.go @@ -208,8 +208,7 @@ func (alg *TopologicalSorter[V]) traverse(g *Graph[V]) { // │ │ └── f rank: 2 // │ └── d rank: 1 // └── b rank: 0 -// -// └── e rank: 1 +// └── e rank: 1 func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], error) { if vertices, isAcyclic := digraph.IsAcyclic(); !isAcyclic { return nil, &errCycle[V]{ From 1aff8965541c797272b7884ef3c5001526e9c369 Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Fri, 1 Dec 2023 13:29:06 -0800 Subject: [PATCH 06/12] add names --- internal/pkg/deploy/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/deploy/pipeline.go b/internal/pkg/deploy/pipeline.go index 0f1b55a9957..7b63af2dd2d 100644 --- a/internal/pkg/deploy/pipeline.go +++ b/internal/pkg/deploy/pipeline.go @@ -781,7 +781,7 @@ func (stg *PipelineStage) buildActionsGraph(rankables []actionGraphNode) *graph. for _, r := range rankables { names = append(names, r.name) } - digraph := graph.New[string]() + digraph := graph.New[string](names...) for _, r := range rankables { if r.depends_on == nil { From b27beec09014c61335f7084b871fc8731b126239 Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Fri, 1 Dec 2023 13:30:35 -0800 Subject: [PATCH 07/12] return ctx.err --- internal/pkg/graph/graph.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/graph/graph.go b/internal/pkg/graph/graph.go index 61a589b7994..a483d5c27c4 100644 --- a/internal/pkg/graph/graph.go +++ b/internal/pkg/graph/graph.go @@ -424,7 +424,7 @@ func (t *graphTraversal[V]) execute(ctx context.Context, lg *LabeledGraph[V]) er for { select { case <-ctx.Done(): - return nil + return ctx.Err() case vertex := <-vertexCh: vertexCount-- if vertexCount == 0 { From 6e91d2b2293b54ef44bb35ee8a5b89472c3dfcbb Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Fri, 1 Dec 2023 13:34:47 -0800 Subject: [PATCH 08/12] fix comment --- internal/pkg/graph/graph.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/pkg/graph/graph.go b/internal/pkg/graph/graph.go index a483d5c27c4..85e13537902 100644 --- a/internal/pkg/graph/graph.go +++ b/internal/pkg/graph/graph.go @@ -203,12 +203,12 @@ func (alg *TopologicalSorter[V]) traverse(g *Graph[V]) { // // An example graph and their ranks is shown below to illustrate: // . -// ├── a rank: 0 -// │ ├── c rank: 1 -// │ │ └── f rank: 2 -// │ └── d rank: 1 -// └── b rank: 0 -// └── e rank: 1 +//├── a rank: 0 +//│ ├── c rank: 1 +//│ │ └── f rank: 2 +//│ └── d rank: 1 +//└── b rank: 0 +// └── e rank: 1 func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], error) { if vertices, isAcyclic := digraph.IsAcyclic(); !isAcyclic { return nil, &errCycle[V]{ From db6ae4d3af0b70d31033a31b5cc4ee53af736f61 Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Fri, 1 Dec 2023 13:40:29 -0800 Subject: [PATCH 09/12] cleanup code --- internal/pkg/deploy/pipeline.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/pkg/deploy/pipeline.go b/internal/pkg/deploy/pipeline.go index 7b63af2dd2d..88e106f6655 100644 --- a/internal/pkg/deploy/pipeline.go +++ b/internal/pkg/deploy/pipeline.go @@ -8,20 +8,20 @@ package deploy import ( "errors" "fmt" + "gopkg.in/yaml.v3" "path" "path/filepath" "regexp" "sort" "strings" - "gopkg.in/yaml.v3" - "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/copilot-cli/internal/pkg/config" "github.com/aws/copilot-cli/internal/pkg/graph" "github.com/aws/copilot-cli/internal/pkg/manifest" ) + const ( fmtInvalidRepo = "unable to parse the repository from the URL %+v" fmtErrMissingProperty = "missing `%s` in properties" @@ -781,7 +781,7 @@ func (stg *PipelineStage) buildActionsGraph(rankables []actionGraphNode) *graph. for _, r := range rankables { names = append(names, r.name) } - digraph := graph.New[string](names...) + digraph := graph.New(names...) for _, r := range rankables { if r.depends_on == nil { From e9934ac136abc09f8a9657ef622c3e39cdf5736f Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Fri, 1 Dec 2023 13:41:57 -0800 Subject: [PATCH 10/12] remove empty line --- internal/pkg/deploy/pipeline.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/pkg/deploy/pipeline.go b/internal/pkg/deploy/pipeline.go index 88e106f6655..bee9a60c5e1 100644 --- a/internal/pkg/deploy/pipeline.go +++ b/internal/pkg/deploy/pipeline.go @@ -21,7 +21,6 @@ import ( "github.com/aws/copilot-cli/internal/pkg/manifest" ) - const ( fmtInvalidRepo = "unable to parse the repository from the URL %+v" fmtErrMissingProperty = "missing `%s` in properties" From e9ec3e9f7083f4dbed5292c41b3ddd3639bddbfd Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Fri, 1 Dec 2023 13:45:10 -0800 Subject: [PATCH 11/12] change vertice to vtx --- internal/pkg/graph/graph_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/graph/graph_test.go b/internal/pkg/graph/graph_test.go index 8cbee84c37c..4193f3c8338 100644 --- a/internal/pkg/graph/graph_test.go +++ b/internal/pkg/graph/graph_test.go @@ -409,8 +409,8 @@ func TestTraverseInDependencyOrder(t *testing.T) { done <- struct{}{} }() - err := graph.DownwardTraversal(context.Background(), func(ctx context.Context, vertice string) error { - vtxChan <- vertice + err := graph.DownwardTraversal(context.Background(), func(ctx context.Context, vtx string) error { + vtxChan <- vtx return nil }, "started", "stopped") require.NoError(t, err, "Error during iteration") From 36dde936b47f4093152918e52b1a9afac11cce51 Mon Sep 17 00:00:00 2001 From: AdithyaKolla Date: Wed, 6 Dec 2023 12:14:24 -0800 Subject: [PATCH 12/12] address fb --- internal/pkg/graph/graph.go | 56 ++++++++++++++++---------------- internal/pkg/graph/graph_test.go | 2 +- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/internal/pkg/graph/graph.go b/internal/pkg/graph/graph.go index 85e13537902..0eb69f059dc 100644 --- a/internal/pkg/graph/graph.go +++ b/internal/pkg/graph/graph.go @@ -337,16 +337,16 @@ It applies the specified process function to each vertex in the graph, skipping The traversal is concurrent and may process vertices in parallel. Returns an error if the traversal encounters any issues, or nil if successful. */ -func (lg *LabeledGraph[V]) UpwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error { +func (lg *LabeledGraph[V]) UpwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, nextVertexSkipStatus, requiredVertexStatus string) error { traversal := &graphTraversal[V]{ - mu: sync.Mutex{}, - seen: make(map[V]struct{}), - findBoundaryVertices: func(lg *LabeledGraph[V]) []V { return lg.leaves() }, - findAdjacentVertices: func(lg *LabeledGraph[V], v V) []V { return lg.parents(v) }, - filterVerticesByStatus: func(g *LabeledGraph[V], v V, status string) []V { return g.filterChildren(v, status) }, - requiredVertexStatus: requiredVertexStatus, - adjacentVertexSkipStatus: adjacentVertexSkipStatus, - processVertex: processVertexFunc, + mu: sync.Mutex{}, + seen: make(map[V]struct{}), + findStartVertices: func(lg *LabeledGraph[V]) []V { return lg.leaves() }, + findNextVertices: func(lg *LabeledGraph[V], v V) []V { return lg.parents(v) }, + filterPreviousVerticesByStatus: func(g *LabeledGraph[V], v V, status string) []V { return g.filterChildren(v, status) }, + requiredVertexStatus: requiredVertexStatus, + nextVertexSkipStatus: nextVertexSkipStatus, + processVertex: processVertexFunc, } return traversal.execute(ctx, lg) } @@ -361,27 +361,27 @@ Returns an error if the traversal encounters any issues. */ func (lg *LabeledGraph[V]) DownwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error { traversal := &graphTraversal[V]{ - mu: sync.Mutex{}, - seen: make(map[V]struct{}), - findBoundaryVertices: func(lg *LabeledGraph[V]) []V { return lg.Roots() }, - findAdjacentVertices: func(lg *LabeledGraph[V], v V) []V { return lg.children(v) }, - filterVerticesByStatus: func(lg *LabeledGraph[V], v V, status string) []V { return lg.filterParents(v, status) }, - requiredVertexStatus: requiredVertexStatus, - adjacentVertexSkipStatus: adjacentVertexSkipStatus, - processVertex: processVertexFunc, + mu: sync.Mutex{}, + seen: make(map[V]struct{}), + findStartVertices: func(lg *LabeledGraph[V]) []V { return lg.Roots() }, + findNextVertices: func(lg *LabeledGraph[V], v V) []V { return lg.children(v) }, + filterPreviousVerticesByStatus: func(lg *LabeledGraph[V], v V, status string) []V { return lg.filterParents(v, status) }, + requiredVertexStatus: requiredVertexStatus, + nextVertexSkipStatus: adjacentVertexSkipStatus, + processVertex: processVertexFunc, } return traversal.execute(ctx, lg) } type graphTraversal[V comparable] struct { - mu sync.Mutex - seen map[V]struct{} - findBoundaryVertices func(*LabeledGraph[V]) []V - findAdjacentVertices func(*LabeledGraph[V], V) []V - filterVerticesByStatus func(*LabeledGraph[V], V, string) []V - requiredVertexStatus string - adjacentVertexSkipStatus string - processVertex func(context.Context, V) error + mu sync.Mutex + seen map[V]struct{} + findStartVertices func(*LabeledGraph[V]) []V + findNextVertices func(*LabeledGraph[V], V) []V + filterPreviousVerticesByStatus func(*LabeledGraph[V], V, string) []V + requiredVertexStatus string + nextVertexSkipStatus string + processVertex func(context.Context, V) error } func (t *graphTraversal[V]) execute(ctx context.Context, lg *LabeledGraph[V]) error { @@ -401,7 +401,7 @@ func (t *graphTraversal[V]) execute(ctx context.Context, lg *LabeledGraph[V]) er for _, vertex := range vertices { vertex := vertex // Delay processing this vertex if any of its dependent vertices are yet to be processed. - if len(t.filterVerticesByStatus(graph, vertex, t.adjacentVertexSkipStatus)) != 0 { + if len(t.filterPreviousVerticesByStatus(graph, vertex, t.nextVertexSkipStatus)) != 0 { continue } if !t.markAsSeen(vertex) { @@ -430,11 +430,11 @@ func (t *graphTraversal[V]) execute(ctx context.Context, lg *LabeledGraph[V]) er if vertexCount == 0 { return nil } - processVertices(ctx, lg, eg, t.findAdjacentVertices(lg, vertex), vertexCh) + processVertices(ctx, lg, eg, t.findNextVertices(lg, vertex), vertexCh) } } }) - processVertices(ctx, lg, eg, t.findBoundaryVertices(lg), vertexCh) + processVertices(ctx, lg, eg, t.findStartVertices(lg), vertexCh) return eg.Wait() } diff --git a/internal/pkg/graph/graph_test.go b/internal/pkg/graph/graph_test.go index 4193f3c8338..1f4a818e1a1 100644 --- a/internal/pkg/graph/graph_test.go +++ b/internal/pkg/graph/graph_test.go @@ -406,7 +406,7 @@ func TestTraverseInDependencyOrder(t *testing.T) { for _, vtx := range vertices { seen[vtx]++ } - done <- struct{}{} + close(done) }() err := graph.DownwardTraversal(context.Background(), func(ctx context.Context, vtx string) error {