Skip to content

Commit

Permalink
chore: traverse func container start and shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
KollaAdithya committed Nov 30, 2023
1 parent 3369798 commit a966d10
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 25 deletions.
243 changes: 235 additions & 8 deletions internal/pkg/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
// Package graph provides functionality for directed graphs.
package graph

import (
"context"
"sync"
)

// vertexStatus denotes the visiting status of a vertex when running DFS in a graph.
type vertexStatus int

Expand All @@ -17,6 +22,8 @@ const (
type Graph[V comparable] struct {
vertices map[V]neighbors[V] // Adjacency list for each vertex.
inDegrees map[V]int // Number of incoming edges for each vertex.
status map[V]string // status of each vertex.
lock sync.Mutex // lock used to mutate graph data.
}

// Edge represents one edge of a directed graph.
Expand All @@ -28,17 +35,34 @@ type Edge[V comparable] struct {
type neighbors[V comparable] map[V]bool

// New initiates a new Graph.
func New[V comparable](vertices ...V) *Graph[V] {
func New[V comparable](vertices []V, opts ...GraphOption[V]) *Graph[V] {
adj := make(map[V]neighbors[V])
inDegrees := make(map[V]int)
for _, vertex := range vertices {
adj[vertex] = make(neighbors[V])
inDegrees[vertex] = 0
}
return &Graph[V]{
g := &Graph[V]{
vertices: adj,
inDegrees: inDegrees,
}
for _, opt := range opts {
opt(g)
}
return g
}

// GraphOption allows you to initialize Graph with additional properties.
type GraphOption[V comparable] func(g *Graph[V])

// WithStatus sets the status of each vertex in the Graph.
func WithStatus[V comparable](status string) func(g *Graph[V]) {
return func(g *Graph[V]) {
g.status = make(map[V]string)
for vertex := range g.vertices {
g.status[vertex] = status
}
}
}

// Neighbors returns the list of connected vertices from vtx.
Expand Down Expand Up @@ -135,6 +159,77 @@ func (g *Graph[V]) Roots() []V {
return roots
}

// updateStatus updates the status of a vertex.
func (g *Graph[V]) updateStatus(vertex V, status string) {
g.lock.Lock()
defer g.lock.Unlock()
g.status[vertex] = status
}

// getStatus gets the status of a vertex.
func (g *Graph[V]) getStatus(vertex V) string {
g.lock.Lock()
defer g.lock.Unlock()
return g.status[vertex]
}

// getLeaves returns the leaves of a given vertex.
func (g *Graph[V]) leaves() []V {
g.lock.Lock()
defer g.lock.Unlock()
var leaves []V
for vtx := range g.vertices {
if len(g.vertices[vtx]) == 0 {
leaves = append(leaves, vtx)
}
}
return leaves
}

// getParents returns the parent vertices (incoming edges) of vertex.
func (g *Graph[V]) parents(vtx V) []V {
g.lock.Lock()
defer g.lock.Unlock()
var parents []V
for v, neighbors := range g.vertices {
if neighbors[vtx] {
parents = append(parents, v)
}
}
return parents
}

// getChildren returns the child vertices (outgoing edges) of vertex.
func (g *Graph[V]) children(vtx V) []V {
g.lock.Lock()
defer g.lock.Unlock()
return g.Neighbors(vtx)
}

// filterParents filters parents based on the vertex status.
func (g *Graph[V]) filterParents(vtx V, status string) []V {
parents := g.parents(vtx)
var filtered []V
for _, parent := range parents {
if g.getStatus(parent) == status {
filtered = append(filtered, parent)
}
}
return filtered
}

// filterChildren filters children based on the vertex status.
func (g *Graph[V]) filterChildren(vtx V, status string) []V {
children := g.children(vtx)
var filtered []V
for _, child := range children {
if g.getStatus(child) == status {
filtered = append(filtered, child)
}
}
return filtered
}

func (g *Graph[V]) hasCycles(temp *findCycleTempVars[V], currVertex V) bool {
temp.status[currVertex] = visiting
for vertex := range g.vertices[currVertex] {
Expand Down Expand Up @@ -196,12 +291,13 @@ func (alg *TopologicalSorter[V]) traverse(g *Graph[V]) {
//
// An example graph and their ranks is shown below to illustrate:
// .
//├── a rank: 0
//│ ├── c rank: 1
//│ │ └── f rank: 2
//│ └── d rank: 1
//└── b rank: 0
// └── e rank: 1
// ├── a rank: 0
// │ ├── c rank: 1
// │ │ └── f rank: 2
// │ └── d rank: 1
// └── b rank: 0
//
// └── e rank: 1
func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], error) {
if vertices, isAcyclic := digraph.IsAcyclic(); !isAcyclic {
return nil, &errCycle[V]{
Expand All @@ -215,3 +311,134 @@ func TopologicalOrder[V comparable](digraph *Graph[V]) (*TopologicalSorter[V], e
topo.traverse(digraph)
return topo, nil
}

/*
UpwardTraversal performs an upward traversal on the graph starting from leaves (nodes with no children)
and moving towards root nodes (nodes with children).
It applies the specified process function to each vertex in the graph, skipping vertices with the
"adjacentVertexSkipStatus" status, and continuing traversal until reaching vertices with the "requiredVertexStatus" status.
The traversal is concurrent and may process vertices in parallel.
Returns an error if the traversal encounters any issues, or nil if successful.
*/
func (g *Graph[V]) UpwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error {
traversal := &graphTraversal[V]{
mu: sync.Mutex{},
seen: make(map[V]struct{}),
findBoundaryVertices: func(g *Graph[V]) []V { return g.leaves() },
findAdjacentVertices: func(g *Graph[V], v V) []V { return g.parents(v) },
filterVerticesByStatus: func(g *Graph[V], v V, status string) []V { return g.filterChildren(v, status) },
requiredVertexStatus: requiredVertexStatus,
adjacentVertexSkipStatus: adjacentVertexSkipStatus,
processVertex: processVertexFunc,
}
return traversal.execute(ctx, g)
}

/*
DownwardTraversal performs a downward traversal on the graph starting from root nodes (nodes with no parents)
and moving towards leaf nodes (nodes with parents). It applies the specified process function to each
vertex in the graph, skipping vertices with the "adjacentVertexSkipStatus" status, and continuing traversal
until reaching vertices with the "requiredVertexStatus" status.
The traversal is concurrent and may process vertices in parallel.
Returns an error if the traversal encounters any issues.
*/
func (g *Graph[V]) DownwardTraversal(ctx context.Context, processVertexFunc func(context.Context, V) error, adjacentVertexSkipStatus, requiredVertexStatus string) error {
traversal := &graphTraversal[V]{
mu: sync.Mutex{},
seen: make(map[V]struct{}),
findBoundaryVertices: func(g *Graph[V]) []V { return g.Roots() },
findAdjacentVertices: func(g *Graph[V], v V) []V { return g.children(v) },
filterVerticesByStatus: func(g *Graph[V], v V, status string) []V { return g.filterParents(v, status) },
requiredVertexStatus: requiredVertexStatus,
adjacentVertexSkipStatus: adjacentVertexSkipStatus,
processVertex: processVertexFunc,
}
return traversal.execute(ctx, g)
}

type graphTraversal[V comparable] struct {
mu sync.Mutex
seen map[V]struct{}
findBoundaryVertices func(*Graph[V]) []V
findAdjacentVertices func(*Graph[V], V) []V
filterVerticesByStatus func(*Graph[V], V, string) []V
requiredVertexStatus string
adjacentVertexSkipStatus string
processVertex func(context.Context, V) error
}

func (t *graphTraversal[V]) execute(ctx context.Context, graph *Graph[V]) error {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

vertexCount := len(graph.vertices)
if vertexCount == 0 {
return nil
}

var wg sync.WaitGroup
vertexCh := make(chan V, vertexCount)
errCh := make(chan error, vertexCount)

processVertices := func(ctx context.Context, graph *Graph[V], wg *sync.WaitGroup, vertices []V, vertexCh chan V) {
for _, vertex := range vertices {
vertex := vertex
// Delay processing this vertex if any of its dependent vertices are yet to be processed.
if len(t.filterVerticesByStatus(graph, vertex, t.adjacentVertexSkipStatus)) != 0 {
continue
}
if !t.markAsSeen(vertex) {
// Skip this vertex if it's already been processed by another go routine.
continue
}
wg.Add(1)
go func() {
defer wg.Done()
if err := t.processVertex(ctx, vertex); err != nil {
errCh <- err
return
}
graph.updateStatus(vertex, t.requiredVertexStatus)
vertexCh <- vertex
}()
}
}

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case vertex := <-vertexCh:
vertexCount--
if vertexCount == 0 {
return
}
processVertices(ctx, graph, &wg, t.findAdjacentVertices(graph, vertex), vertexCh)
}
}
}()
processVertices(ctx, graph, &wg, t.findBoundaryVertices(graph), vertexCh)
wg.Wait()
close(errCh)

for err := range errCh {
if err != nil {
return err
}
}
return nil
}

func (t *graphTraversal[V]) markAsSeen(vertex V) bool {
t.mu.Lock()
defer t.mu.Unlock()
if _, seen := t.seen[vertex]; seen {
return false
}
t.seen[vertex] = struct{}{}
return true
}
Loading

0 comments on commit a966d10

Please sign in to comment.