Skip to content

Commit

Permalink
fix: rc panic when run tf multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
noneback committed Sep 27, 2024
1 parent e4be081 commit 674fa5f
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 104 deletions.
4 changes: 0 additions & 4 deletions TODO

This file was deleted.

7 changes: 6 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ func (e *ExecutorImpl) invoke_node(ctx *context.Context, node *Node) {
case *Static:
e.pool.Go(func() {
defer e.wg.Done()
p.handle(ctx)
node.state.Store(kNodeStateRunning)
defer node.state.Store(kNodeStateFinished)

p.handle(ctx)
node.drop()
for _, n := range node.successors {
// fmt.Println("put", n.Name)
Expand All @@ -87,6 +89,8 @@ func (e *ExecutorImpl) invoke_node(ctx *context.Context, node *Node) {
case *Subflow:
e.pool.Go(func() {
defer e.wg.Done()
node.state.Store(kNodeStateRunning)
defer node.state.Store(kNodeStateFinished)

if !p.g.instancelized {
p.handle(p)
Expand All @@ -113,6 +117,7 @@ func (e *ExecutorImpl) invoke_node(ctx *context.Context, node *Node) {
func (e *ExecutorImpl) schedule(node *Node) {
e.wg.Add(1)
e.wq.Put(node)
node.state.Store(kNodeStateWaiting)
node.g.scheCond.Signal()
}

Expand Down
2 changes: 1 addition & 1 deletion flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Subflow struct {

func (sf *Subflow) Push(tasks ...*Task) {
for _, task := range tasks {
sf.g.Push(task.node)
sf.g.push(task.node)
}
}

Expand Down
4 changes: 3 additions & 1 deletion graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ func (g *Graph) reset() {
}
}

func (g *Graph) Push(n ...*Node) {
func (g *Graph) push(n ...*Node) {
g.nodes = append(g.nodes, n...)
for _, node := range n {
node.g = g
}
}

func (g *Graph) setup() {
g.reset()

for _, node := range g.nodes {
g.joinCounter.Increase()
node.joinCounter.Set(len(node.dependents))
Expand Down
144 changes: 73 additions & 71 deletions graph_test.go
Original file line number Diff line number Diff line change
@@ -1,77 +1,79 @@
package gotaskflow

// func TestTopologicalSort(t *testing.T) {
// t.Run("TestEmptyGraph", func(t *testing.T) {
// graph := newGraph("empty")
// sorted, ok := graph.TopologicalSort()
// if !ok || len(sorted) != 0 {
// t.Errorf("expected true and an empty slice, got %v and %v", ok, sorted)
// }
// })
import "testing"

// t.Run("TestSingleNodeGraph", func(t *testing.T) {
// graph := newGraph("single node")
// nodeA := newNode("A")
// graph.push(nodeA)
// sorted, ok := graph.TopologicalSort()
// if !ok || len(sorted) != 1 || sorted[0] != nodeA {
// t.Errorf("expected true and the single node, got %v and %v", ok, sorted)
// }
// })
func TestTopologicalSort(t *testing.T) {
t.Run("TestEmptyGraph", func(t *testing.T) {
graph := newGraph("empty")
sorted, ok := graph.topologicalSort()
if !ok || len(sorted) != 0 {
t.Errorf("expected true and an empty slice, got %v and %v", ok, sorted)
}
})

// t.Run("TestSimpleDAG", func(t *testing.T) {
// graph := newGraph("simple DAG")
// nodeA := newNode("A")
// nodeB := newNode("B")
// nodeC := newNode("C")
// nodeA.precede(nodeB)
// nodeB.precede(nodeC)
// graph.push(nodeA, nodeB, nodeC)
// sorted, ok := graph.TopologicalSort()
// if !ok || len(sorted) != 3 || sorted[0] != nodeA || sorted[1] != nodeB || sorted[2] != nodeC {
// t.Errorf("expected true and a correct sorted order, got %v and %v", ok, sorted)
// }
// })
t.Run("TestSingleNodeGraph", func(t *testing.T) {
graph := newGraph("single node")
nodeA := newNode("A")
graph.push(nodeA)
sorted, ok := graph.topologicalSort()
if !ok || len(sorted) != 1 || sorted[0] != nodeA {
t.Errorf("expected true and the single node, got %v and %v", ok, sorted)
}
})

// t.Run("TestComplexDAG", func(t *testing.T) {
// graph := newGraph("complex DAG")
// nodeA := newNode("A")
// nodeB := newNode("B")
// nodeC := newNode("C")
// nodeD := newNode("D")
// nodeE := newNode("E")
// nodeA.precede(nodeB)
// nodeA.precede(nodeC)
// nodeB.precede(nodeD)
// nodeC.precede(nodeD)
// nodeD.precede(nodeE)
// graph.push(nodeA, nodeB, nodeC, nodeD, nodeE)
// sorted, ok := graph.TopologicalSort()
// if !ok || len(sorted) != 5 {
// t.Errorf("expected true and a correct sorted order, got %v and %v", ok, sorted)
// }
// // Further check the ordering
// nodeIndex := make(map[*Node]int)
// for i, node := range sorted {
// nodeIndex[node] = i
// }
// if nodeIndex[nodeA] > nodeIndex[nodeB] || nodeIndex[nodeC] > nodeIndex[nodeD] {
// t.Errorf("unexpected sort order for complex DAG")
// }
// })
t.Run("TestSimpleDAG", func(t *testing.T) {
graph := newGraph("simple DAG")
nodeA := newNode("A")
nodeB := newNode("B")
nodeC := newNode("C")
nodeA.precede(nodeB)
nodeB.precede(nodeC)
graph.push(nodeA, nodeB, nodeC)
sorted, ok := graph.topologicalSort()
if !ok || len(sorted) != 3 || sorted[0] != nodeA || sorted[1] != nodeB || sorted[2] != nodeC {
t.Errorf("expected true and a correct sorted order, got %v and %v", ok, sorted)
}
})

// t.Run("TestGraphWithCycle", func(t *testing.T) {
// graph := newGraph("graph with cycle")
// nodeA := newNode("A")
// nodeB := newNode("B")
// nodeC := newNode("C")
// nodeA.precede(nodeB)
// nodeB.precede(nodeC)
// nodeC.precede(nodeA) // Creates a cycle
// graph.push(nodeA, nodeB, nodeC)
// _, ok := graph.TopologicalSort()
// if ok {
// t.Errorf("expected false due to cycle, got %v", ok)
// }
// })
// }
t.Run("TestComplexDAG", func(t *testing.T) {
graph := newGraph("complex DAG")
nodeA := newNode("A")
nodeB := newNode("B")
nodeC := newNode("C")
nodeD := newNode("D")
nodeE := newNode("E")
nodeA.precede(nodeB)
nodeA.precede(nodeC)
nodeB.precede(nodeD)
nodeC.precede(nodeD)
nodeD.precede(nodeE)
graph.push(nodeA, nodeB, nodeC, nodeD, nodeE)
sorted, ok := graph.topologicalSort()
if !ok || len(sorted) != 5 {
t.Errorf("expected true and a correct sorted order, got %v and %v", ok, sorted)
}
// Further check the ordering
nodeIndex := make(map[*Node]int)
for i, node := range sorted {
nodeIndex[node] = i
}
if nodeIndex[nodeA] > nodeIndex[nodeB] || nodeIndex[nodeC] > nodeIndex[nodeD] {
t.Errorf("unexpected sort order for complex DAG")
}
})

t.Run("TestGraphWithCycle", func(t *testing.T) {
graph := newGraph("graph with cycle")
nodeA := newNode("A")
nodeB := newNode("B")
nodeC := newNode("C")
nodeA.precede(nodeB)
nodeB.precede(nodeC)
nodeC.precede(nodeA) // Creates a cycle
graph.push(nodeA, nodeB, nodeC)
_, ok := graph.topologicalSort()
if ok {
t.Errorf("expected false due to cycle, got %v", ok)
}
})
}
14 changes: 7 additions & 7 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package gotaskflow

import (
"sync"
"sync/atomic"

"github.com/noneback/go-taskflow/utils"
)

type kNodeState int32

const (
kNodeStateWaiting = 1
kNodeStateRunning = 2
kNodeStateFinished = 3
kNodeStateIdle = int32(0)
kNodeStateWaiting = int32(1)
kNodeStateRunning = int32(2)
kNodeStateFinished = int32(3)
)

type NodeType string
Expand All @@ -28,7 +28,7 @@ type Node struct {
Typ NodeType
ptr interface{}
rw *sync.RWMutex
state kNodeState
state atomic.Int32
joinCounter utils.RC
g *Graph
}
Expand All @@ -55,7 +55,7 @@ func (n *Node) precede(v *Node) {
func newNode(name string) *Node {
return &Node{
name: name,
state: kNodeStateWaiting,
state: atomic.Int32{},
successors: make([]*Node, 0),
dependents: make([]*Node, 0),
rw: &sync.RWMutex{},
Expand Down
2 changes: 1 addition & 1 deletion taskflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewTaskFlow(name string) *TaskFlow {

func (tf *TaskFlow) Push(tasks ...*Task) {
for _, task := range tasks {
tf.graph.Push(task.node)
tf.graph.push(task.node)
}
}

Expand Down
18 changes: 7 additions & 11 deletions taskflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@ import (
"context"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"os"
"testing"

"github.com/felixge/fgprof"

gotaskflow "github.com/noneback/go-taskflow"
)

Expand Down Expand Up @@ -53,19 +50,18 @@ func TestTaskFlow(t *testing.T) {
panic(err)
}
})

err := exector.Run(tf)
if err != nil {
panic(err)
}
exector.Wait()
fmt.Print("########### second times")
exector.Run(tf)
exector.Wait()
}

func TestSubflow(t *testing.T) {
http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())

go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()

A, B, C :=
gotaskflow.NewTask("A", func(ctx *context.Context) {
fmt.Println("A")
Expand Down Expand Up @@ -137,8 +133,8 @@ func TestSubflow(t *testing.T) {
if err := gotaskflow.Visualizer.Visualize(tf, os.Stdout); err != nil {
log.Fatal(err)
}
// tf.Reset()
// exector.Run(tf)

exector.Run(tf)
// exector.Wait()

// if err := tf.Visualize(os.Stdout); err != nil {
Expand Down
3 changes: 0 additions & 3 deletions typology.go

This file was deleted.

4 changes: 0 additions & 4 deletions visualizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,17 @@ func (v *visualizer) visualizeG(gv *graphviz.Graphviz, g *Graph, parentG *cgraph
nodeMap[node.name] = vNode
case *Subflow:
vSubGraph := vGraph.SubGraph("cluster_"+node.name, 1)
// fmt.Println("vSubGraph", vSubGraph.Name(), node.name)
err := v.visualizeG(gv, p.g, vSubGraph)
if err != nil {
return fmt.Errorf("graph %v visualize -> %w", g.name, ErrGraphIsCyclic)
}
// fmt.Println("vSubGraph firstNode", vSubGraph.FirstNode().Name(), node.name)

nodeMap[node.name] = vSubGraph.FirstNode()
}
}

for _, node := range nodes {
for _, deps := range node.dependents {
// fmt.Println("add edge", deps.name, "->", node.name)

if _, err := vGraph.CreateEdge("", nodeMap[deps.name], nodeMap[node.name]); err != nil {
return fmt.Errorf("add edge %v - %v -> %w", deps.name, node.name, err)
}
Expand Down

0 comments on commit 674fa5f

Please sign in to comment.