Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(graph): add traverse methods container start and shutdown #5508

Merged
merged 14 commits into from
Dec 6, 2023
5 changes: 3 additions & 2 deletions internal/pkg/deploy/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ package deploy
import (
"errors"
"fmt"
"gopkg.in/yaml.v3"
"path"
"path/filepath"
"regexp"
"sort"
"strings"

"gopkg.in/yaml.v3"

"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/copilot-cli/internal/pkg/config"
"github.com/aws/copilot-cli/internal/pkg/graph"
Expand Down Expand Up @@ -780,7 +781,7 @@ func (stg *PipelineStage) buildActionsGraph(rankables []actionGraphNode) *graph.
for _, r := range rankables {
names = append(names, r.name)
}
digraph := graph.New(names...)
digraph := graph.New(names)

for _, r := range rankables {
if r.depends_on == nil {
Expand Down
233 changes: 225 additions & 8 deletions internal/pkg/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
// Package graph provides functionality for directed graphs.
package graph

import (
"context"
"sync"

"golang.org/x/sync/errgroup"
)

// vertexStatus denotes the visiting status of a vertex when running DFS in a graph.
type vertexStatus int

Expand All @@ -17,6 +24,8 @@ const (
type Graph[V comparable] struct {
vertices map[V]neighbors[V] // Adjacency list for each vertex.
inDegrees map[V]int // Number of incoming edges for each vertex.
status map[V]string // status of each vertex.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little odd for a generic graph to have a "status" field. Maybe consider creating a more specific struct and inherit from Graph[V comparable] in this pkg? This way we can also consolidate graphTraversal which is already very deeply coupled with this struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just created a new LabeledGraph[V comparable] which inherits *Graph[V] that would reduce the coupling between Graph[V comparable].
Does this address the concern on coupling and generic graph to have a "status" field 🤔

lock sync.Mutex // lock used to mutate graph data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a lock in the graph struct instead of defining it in the caller? It doesn't seem we apply the lock to all methods to make this struct truly atomic...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified the code where every method of LabeledGraph[V comparable] struct uses lock .

Couple of reasons why i want to use lock declaring over here instead of caller.

  1. If the lock is managed within the data structure, it simplifies the usage for the caller. The caller does not need to worry about acquiring and releasing the lock, making the API easier to use
  2. The caller may not have full context or understanding of the graph's internals, which increases the risk of incorrect or insufficient locking, leading to potential data races or deadlocks.

}

// Edge represents one edge of a directed graph.
Expand All @@ -28,17 +37,34 @@ type Edge[V comparable] struct {
type neighbors[V comparable] map[V]bool

// New initiates a new Graph.
func New[V comparable](vertices ...V) *Graph[V] {
func New[V comparable](vertices []V, opts ...GraphOption[V]) *Graph[V] {
adj := make(map[V]neighbors[V])
inDegrees := make(map[V]int)
for _, vertex := range vertices {
adj[vertex] = make(neighbors[V])
inDegrees[vertex] = 0
}
return &Graph[V]{
g := &Graph[V]{
vertices: adj,
inDegrees: inDegrees,
}
for _, opt := range opts {
opt(g)
}
return g
}

// GraphOption allows you to initialize Graph with additional properties.
type GraphOption[V comparable] func(g *Graph[V])

// WithStatus sets the status of each vertex in the Graph.
func WithStatus[V comparable](status string) func(g *Graph[V]) {
return func(g *Graph[V]) {
g.status = make(map[V]string)
for vertex := range g.vertices {
g.status[vertex] = status
}
}
}

// Neighbors returns the list of connected vertices from vtx.
Expand Down Expand Up @@ -135,6 +161,77 @@ func (g *Graph[V]) Roots() []V {
return roots
}

// updateStatus updates the status of a vertex.
func (g *Graph[V]) updateStatus(vertex V, status string) {
g.lock.Lock()
defer g.lock.Unlock()
g.status[vertex] = status
}

// getStatus gets the status of a vertex.
func (g *Graph[V]) getStatus(vertex V) string {
g.lock.Lock()
defer g.lock.Unlock()
return g.status[vertex]
}

// getLeaves returns the leaves of a given vertex.
func (g *Graph[V]) leaves() []V {
g.lock.Lock()
defer g.lock.Unlock()
var leaves []V
for vtx := range g.vertices {
if len(g.vertices[vtx]) == 0 {
leaves = append(leaves, vtx)
}
}
return leaves
}

// getParents returns the parent vertices (incoming edges) of vertex.
func (g *Graph[V]) parents(vtx V) []V {
g.lock.Lock()
defer g.lock.Unlock()
var parents []V
for v, neighbors := range g.vertices {
if neighbors[vtx] {
parents = append(parents, v)
}
}
return parents
}

// getChildren returns the child vertices (outgoing edges) of vertex.
func (g *Graph[V]) children(vtx V) []V {
g.lock.Lock()
defer g.lock.Unlock()
return g.Neighbors(vtx)
}

// filterParents filters parents based on the vertex status.
func (g *Graph[V]) filterParents(vtx V, status string) []V {
parents := g.parents(vtx)
var filtered []V
for _, parent := range parents {
if g.getStatus(parent) == status {
filtered = append(filtered, parent)
}
}
return filtered
}

// filterChildren filters children based on the vertex status.
func (g *Graph[V]) filterChildren(vtx V, status string) []V {
children := g.children(vtx)
var filtered []V
for _, child := range children {
if g.getStatus(child) == status {
filtered = append(filtered, child)
}
}
return filtered
}

func (g *Graph[V]) hasCycles(temp *findCycleTempVars[V], currVertex V) bool {
temp.status[currVertex] = visiting
for vertex := range g.vertices[currVertex] {
Expand Down Expand Up @@ -196,12 +293,13 @@ func (alg *TopologicalSorter[V]) traverse(g *Graph[V]) {
//
// An example graph and their ranks is shown below to illustrate:
// .
//├── a rank: 0
//│ ├── c rank: 1
//│ │ └── f rank: 2
//│ └── d rank: 1
//└── b rank: 0
// └── e rank: 1
// ├── a rank: 0
// │ ├── c rank: 1
// │ │ └── f rank: 2
// │ └── d rank: 1
// └── b rank: 0
//
KollaAdithya marked this conversation as resolved.
Show resolved Hide resolved
// └── e rank: 1
func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], error) {
if vertices, isAcyclic := digraph.IsAcyclic(); !isAcyclic {
return nil, &errCycle[V]{
Expand All @@ -215,3 +313,122 @@ func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], e
topo.traverse(digraph)
return topo, nil
}

/*
UpwardTraversal performs an upward traversal on the graph starting from leaves (nodes with no children)
and moving towards root nodes (nodes with children).
It applies the specified process function to each vertex in the graph, skipping vertices with the
"adjacentVertexSkipStatus" status, and continuing traversal until reaching vertices with the "requiredVertexStatus" status.
The traversal is concurrent and may process vertices in parallel.
Returns an error if the traversal encounters any issues, or nil if successful.
*/
func (g *Graph[V]) UpwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error {
traversal := &graphTraversal[V]{
mu: sync.Mutex{},
seen: make(map[V]struct{}),
findBoundaryVertices: func(g *Graph[V]) []V { return g.leaves() },
findAdjacentVertices: func(g *Graph[V], v V) []V { return g.parents(v) },
filterVerticesByStatus: func(g *Graph[V], v V, status string) []V { return g.filterChildren(v, status) },
requiredVertexStatus: requiredVertexStatus,
adjacentVertexSkipStatus: adjacentVertexSkipStatus,
processVertex: processVertexFunc,
}
return traversal.execute(ctx, g)
}

/*
DownwardTraversal performs a downward traversal on the graph starting from root nodes (nodes with no parents)
and moving towards leaf nodes (nodes with parents). It applies the specified process function to each
vertex in the graph, skipping vertices with the "adjacentVertexSkipStatus" status, and continuing traversal
until reaching vertices with the "requiredVertexStatus" status.
The traversal is concurrent and may process vertices in parallel.
Returns an error if the traversal encounters any issues.
*/
func (g *Graph[V]) DownwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error {
traversal := &graphTraversal[V]{
mu: sync.Mutex{},
seen: make(map[V]struct{}),
findBoundaryVertices: func(g *Graph[V]) []V { return g.Roots() },
findAdjacentVertices: func(g *Graph[V], v V) []V { return g.children(v) },
filterVerticesByStatus: func(g *Graph[V], v V, status string) []V { return g.filterParents(v, status) },
requiredVertexStatus: requiredVertexStatus,
adjacentVertexSkipStatus: adjacentVertexSkipStatus,
processVertex: processVertexFunc,
}
return traversal.execute(ctx, g)
}

type graphTraversal[V comparable] struct {
mu sync.Mutex
seen map[V]struct{}
findBoundaryVertices func(*Graph[V]) []V
findAdjacentVertices func(*Graph[V], V) []V
filterVerticesByStatus func(*Graph[V], V, string) []V
requiredVertexStatus string
adjacentVertexSkipStatus string
processVertex func(context.Context, V) error
}

func (t *graphTraversal[V]) execute(ctx context.Context, graph *Graph[V]) error {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

vertexCount := len(graph.vertices)
if vertexCount == 0 {
return nil
}
eg, ctx := errgroup.WithContext(ctx)
vertexCh := make(chan V, vertexCount)
defer close(vertexCh)

processVertices := func(ctx context.Context, graph *Graph[V], eg *errgroup.Group, vertices []V, vertexCh chan V) {
for _, vertex := range vertices {
vertex := vertex
// Delay processing this vertex if any of its dependent vertices are yet to be processed.
if len(t.filterVerticesByStatus(graph, vertex, t.adjacentVertexSkipStatus)) != 0 {
continue
}
if !t.markAsSeen(vertex) {
// Skip this vertex if it's already been processed by another routine.
continue
}
eg.Go(func() error {
if err := t.processVertex(ctx, vertex); err != nil {
return err
}
// Assign new status to the vertex upon successful processing.
graph.updateStatus(vertex, t.requiredVertexStatus)
vertexCh <- vertex
return nil
})
}
}

eg.Go(func() error {
iamhopaul123 marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-ctx.Done():
return nil
case vertex := <-vertexCh:
vertexCount--
if vertexCount == 0 {
return nil
}
processVertices(ctx, graph, eg, t.findAdjacentVertices(graph, vertex), vertexCh)
}
}
})
processVertices(ctx, graph, eg, t.findBoundaryVertices(graph), vertexCh)
return eg.Wait()
}

func (t *graphTraversal[V]) markAsSeen(vertex V) bool {
t.mu.Lock()
defer t.mu.Unlock()
if _, seen := t.seen[vertex]; seen {
return false
}
t.seen[vertex] = struct{}{}
return true
}
Loading
Loading