From 674fa5fb06a5130651f318192828563339b29501 Mon Sep 17 00:00:00 2001 From: NoneBack Date: Fri, 27 Sep 2024 14:53:26 +0800 Subject: [PATCH] fix: rc panic when run tf multiple times --- TODO | 4 -- executor.go | 7 ++- flow.go | 2 +- graph.go | 4 +- graph_test.go | 144 ++++++++++++++++++++++++----------------------- node.go | 14 ++--- taskflow.go | 2 +- taskflow_test.go | 18 +++--- typology.go | 3 - visualizer.go | 4 -- 10 files changed, 98 insertions(+), 104 deletions(-) delete mode 100644 TODO delete mode 100644 typology.go diff --git a/TODO b/TODO deleted file mode 100644 index 5734cd3..0000000 --- a/TODO +++ /dev/null @@ -1,4 +0,0 @@ -- executor -- stateful task -- condition task schedule -- DAG observer & profilier \ No newline at end of file diff --git a/executor.go b/executor.go index 997e868..30cd102 100644 --- a/executor.go +++ b/executor.go @@ -73,8 +73,10 @@ func (e *ExecutorImpl) invoke_node(ctx *context.Context, node *Node) { case *Static: e.pool.Go(func() { defer e.wg.Done() - p.handle(ctx) + node.state.Store(kNodeStateRunning) + defer node.state.Store(kNodeStateFinished) + p.handle(ctx) node.drop() for _, n := range node.successors { // fmt.Println("put", n.Name) @@ -87,6 +89,8 @@ func (e *ExecutorImpl) invoke_node(ctx *context.Context, node *Node) { case *Subflow: e.pool.Go(func() { defer e.wg.Done() + node.state.Store(kNodeStateRunning) + defer node.state.Store(kNodeStateFinished) if !p.g.instancelized { p.handle(p) @@ -113,6 +117,7 @@ func (e *ExecutorImpl) invoke_node(ctx *context.Context, node *Node) { func (e *ExecutorImpl) schedule(node *Node) { e.wg.Add(1) e.wq.Put(node) + node.state.Store(kNodeStateWaiting) node.g.scheCond.Signal() } diff --git a/flow.go b/flow.go index a99b81d..1ad673c 100644 --- a/flow.go +++ b/flow.go @@ -17,7 +17,7 @@ type Subflow struct { func (sf *Subflow) Push(tasks ...*Task) { for _, task := range tasks { - sf.g.Push(task.node) + sf.g.push(task.node) } } diff --git a/graph.go b/graph.go index 3af6f79..f2fdac0 100644 --- a/graph.go +++ b/graph.go @@ -35,7 +35,7 @@ func (g *Graph) reset() { } } -func (g *Graph) Push(n ...*Node) { +func (g *Graph) push(n ...*Node) { g.nodes = append(g.nodes, n...) for _, node := range n { node.g = g @@ -43,6 +43,8 @@ func (g *Graph) Push(n ...*Node) { } func (g *Graph) setup() { + g.reset() + for _, node := range g.nodes { g.joinCounter.Increase() node.joinCounter.Set(len(node.dependents)) diff --git a/graph_test.go b/graph_test.go index 6c81bfc..7c2ed40 100644 --- a/graph_test.go +++ b/graph_test.go @@ -1,77 +1,79 @@ package gotaskflow -// func TestTopologicalSort(t *testing.T) { -// t.Run("TestEmptyGraph", func(t *testing.T) { -// graph := newGraph("empty") -// sorted, ok := graph.TopologicalSort() -// if !ok || len(sorted) != 0 { -// t.Errorf("expected true and an empty slice, got %v and %v", ok, sorted) -// } -// }) +import "testing" -// t.Run("TestSingleNodeGraph", func(t *testing.T) { -// graph := newGraph("single node") -// nodeA := newNode("A") -// graph.push(nodeA) -// sorted, ok := graph.TopologicalSort() -// if !ok || len(sorted) != 1 || sorted[0] != nodeA { -// t.Errorf("expected true and the single node, got %v and %v", ok, sorted) -// } -// }) +func TestTopologicalSort(t *testing.T) { + t.Run("TestEmptyGraph", func(t *testing.T) { + graph := newGraph("empty") + sorted, ok := graph.topologicalSort() + if !ok || len(sorted) != 0 { + t.Errorf("expected true and an empty slice, got %v and %v", ok, sorted) + } + }) -// t.Run("TestSimpleDAG", func(t *testing.T) { -// graph := newGraph("simple DAG") -// nodeA := newNode("A") -// nodeB := newNode("B") -// nodeC := newNode("C") -// nodeA.precede(nodeB) -// nodeB.precede(nodeC) -// graph.push(nodeA, nodeB, nodeC) -// sorted, ok := graph.TopologicalSort() -// if !ok || len(sorted) != 3 || sorted[0] != nodeA || sorted[1] != nodeB || sorted[2] != nodeC { -// t.Errorf("expected true and a correct sorted order, got %v and %v", ok, sorted) -// } -// }) + t.Run("TestSingleNodeGraph", func(t *testing.T) { + graph := newGraph("single node") + nodeA := newNode("A") + graph.push(nodeA) + sorted, ok := graph.topologicalSort() + if !ok || len(sorted) != 1 || sorted[0] != nodeA { + t.Errorf("expected true and the single node, got %v and %v", ok, sorted) + } + }) -// t.Run("TestComplexDAG", func(t *testing.T) { -// graph := newGraph("complex DAG") -// nodeA := newNode("A") -// nodeB := newNode("B") -// nodeC := newNode("C") -// nodeD := newNode("D") -// nodeE := newNode("E") -// nodeA.precede(nodeB) -// nodeA.precede(nodeC) -// nodeB.precede(nodeD) -// nodeC.precede(nodeD) -// nodeD.precede(nodeE) -// graph.push(nodeA, nodeB, nodeC, nodeD, nodeE) -// sorted, ok := graph.TopologicalSort() -// if !ok || len(sorted) != 5 { -// t.Errorf("expected true and a correct sorted order, got %v and %v", ok, sorted) -// } -// // Further check the ordering -// nodeIndex := make(map[*Node]int) -// for i, node := range sorted { -// nodeIndex[node] = i -// } -// if nodeIndex[nodeA] > nodeIndex[nodeB] || nodeIndex[nodeC] > nodeIndex[nodeD] { -// t.Errorf("unexpected sort order for complex DAG") -// } -// }) + t.Run("TestSimpleDAG", func(t *testing.T) { + graph := newGraph("simple DAG") + nodeA := newNode("A") + nodeB := newNode("B") + nodeC := newNode("C") + nodeA.precede(nodeB) + nodeB.precede(nodeC) + graph.push(nodeA, nodeB, nodeC) + sorted, ok := graph.topologicalSort() + if !ok || len(sorted) != 3 || sorted[0] != nodeA || sorted[1] != nodeB || sorted[2] != nodeC { + t.Errorf("expected true and a correct sorted order, got %v and %v", ok, sorted) + } + }) -// t.Run("TestGraphWithCycle", func(t *testing.T) { -// graph := newGraph("graph with cycle") -// nodeA := newNode("A") -// nodeB := newNode("B") -// nodeC := newNode("C") -// nodeA.precede(nodeB) -// nodeB.precede(nodeC) -// nodeC.precede(nodeA) // Creates a cycle -// graph.push(nodeA, nodeB, nodeC) -// _, ok := graph.TopologicalSort() -// if ok { -// t.Errorf("expected false due to cycle, got %v", ok) -// } -// }) -// } + t.Run("TestComplexDAG", func(t *testing.T) { + graph := newGraph("complex DAG") + nodeA := newNode("A") + nodeB := newNode("B") + nodeC := newNode("C") + nodeD := newNode("D") + nodeE := newNode("E") + nodeA.precede(nodeB) + nodeA.precede(nodeC) + nodeB.precede(nodeD) + nodeC.precede(nodeD) + nodeD.precede(nodeE) + graph.push(nodeA, nodeB, nodeC, nodeD, nodeE) + sorted, ok := graph.topologicalSort() + if !ok || len(sorted) != 5 { + t.Errorf("expected true and a correct sorted order, got %v and %v", ok, sorted) + } + // Further check the ordering + nodeIndex := make(map[*Node]int) + for i, node := range sorted { + nodeIndex[node] = i + } + if nodeIndex[nodeA] > nodeIndex[nodeB] || nodeIndex[nodeC] > nodeIndex[nodeD] { + t.Errorf("unexpected sort order for complex DAG") + } + }) + + t.Run("TestGraphWithCycle", func(t *testing.T) { + graph := newGraph("graph with cycle") + nodeA := newNode("A") + nodeB := newNode("B") + nodeC := newNode("C") + nodeA.precede(nodeB) + nodeB.precede(nodeC) + nodeC.precede(nodeA) // Creates a cycle + graph.push(nodeA, nodeB, nodeC) + _, ok := graph.topologicalSort() + if ok { + t.Errorf("expected false due to cycle, got %v", ok) + } + }) +} diff --git a/node.go b/node.go index 95d8fa2..365158e 100644 --- a/node.go +++ b/node.go @@ -2,16 +2,16 @@ package gotaskflow import ( "sync" + "sync/atomic" "github.com/noneback/go-taskflow/utils" ) -type kNodeState int32 - const ( - kNodeStateWaiting = 1 - kNodeStateRunning = 2 - kNodeStateFinished = 3 + kNodeStateIdle = int32(0) + kNodeStateWaiting = int32(1) + kNodeStateRunning = int32(2) + kNodeStateFinished = int32(3) ) type NodeType string @@ -28,7 +28,7 @@ type Node struct { Typ NodeType ptr interface{} rw *sync.RWMutex - state kNodeState + state atomic.Int32 joinCounter utils.RC g *Graph } @@ -55,7 +55,7 @@ func (n *Node) precede(v *Node) { func newNode(name string) *Node { return &Node{ name: name, - state: kNodeStateWaiting, + state: atomic.Int32{}, successors: make([]*Node, 0), dependents: make([]*Node, 0), rw: &sync.RWMutex{}, diff --git a/taskflow.go b/taskflow.go index 7936ae9..d9fb765 100644 --- a/taskflow.go +++ b/taskflow.go @@ -25,7 +25,7 @@ func NewTaskFlow(name string) *TaskFlow { func (tf *TaskFlow) Push(tasks ...*Task) { for _, task := range tasks { - tf.graph.Push(task.node) + tf.graph.push(task.node) } } diff --git a/taskflow_test.go b/taskflow_test.go index 5371b59..10102a5 100644 --- a/taskflow_test.go +++ b/taskflow_test.go @@ -4,13 +4,10 @@ import ( "context" "fmt" "log" - "net/http" _ "net/http/pprof" "os" "testing" - "github.com/felixge/fgprof" - gotaskflow "github.com/noneback/go-taskflow" ) @@ -53,19 +50,18 @@ func TestTaskFlow(t *testing.T) { panic(err) } }) + err := exector.Run(tf) if err != nil { panic(err) } + exector.Wait() + fmt.Print("########### second times") + exector.Run(tf) + exector.Wait() } func TestSubflow(t *testing.T) { - http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler()) - - go func() { - log.Println(http.ListenAndServe("localhost:6060", nil)) - }() - A, B, C := gotaskflow.NewTask("A", func(ctx *context.Context) { fmt.Println("A") @@ -137,8 +133,8 @@ func TestSubflow(t *testing.T) { if err := gotaskflow.Visualizer.Visualize(tf, os.Stdout); err != nil { log.Fatal(err) } - // tf.Reset() - // exector.Run(tf) + + exector.Run(tf) // exector.Wait() // if err := tf.Visualize(os.Stdout); err != nil { diff --git a/typology.go b/typology.go deleted file mode 100644 index 30182ab..0000000 --- a/typology.go +++ /dev/null @@ -1,3 +0,0 @@ -package gotaskflow - -// TODO: with task control diff --git a/visualizer.go b/visualizer.go index dc9a343..df08d20 100644 --- a/visualizer.go +++ b/visualizer.go @@ -42,12 +42,10 @@ func (v *visualizer) visualizeG(gv *graphviz.Graphviz, g *Graph, parentG *cgraph nodeMap[node.name] = vNode case *Subflow: vSubGraph := vGraph.SubGraph("cluster_"+node.name, 1) - // fmt.Println("vSubGraph", vSubGraph.Name(), node.name) err := v.visualizeG(gv, p.g, vSubGraph) if err != nil { return fmt.Errorf("graph %v visualize -> %w", g.name, ErrGraphIsCyclic) } - // fmt.Println("vSubGraph firstNode", vSubGraph.FirstNode().Name(), node.name) nodeMap[node.name] = vSubGraph.FirstNode() } @@ -55,8 +53,6 @@ func (v *visualizer) visualizeG(gv *graphviz.Graphviz, g *Graph, parentG *cgraph for _, node := range nodes { for _, deps := range node.dependents { - // fmt.Println("add edge", deps.name, "->", node.name) - if _, err := vGraph.CreateEdge("", nodeMap[deps.name], nodeMap[node.name]); err != nil { return fmt.Errorf("add edge %v - %v -> %w", deps.name, node.name, err) }