From 26a6f6b16a4f74d04b20636852231c2b2c5ecd32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Flc=E3=82=9B?= Date: Thu, 15 Feb 2024 18:53:29 +0800 Subject: [PATCH] feat(coordinator): Added Coordinator (#97) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Flcă‚› --- coordinator/README.md | 46 ++++++++++++++++++++++++ coordinator/coordinator.go | 24 +++++++++++++ coordinator/coordinator_test.go | 63 +++++++++++++++++++++++++++++++++ coordinator/manager.go | 63 +++++++++++++++++++++++++++++++++ coordinator/manager_test.go | 42 ++++++++++++++++++++++ 5 files changed, 238 insertions(+) create mode 100644 coordinator/README.md create mode 100644 coordinator/coordinator.go create mode 100644 coordinator/coordinator_test.go create mode 100644 coordinator/manager.go create mode 100644 coordinator/manager_test.go diff --git a/coordinator/README.md b/coordinator/README.md new file mode 100644 index 00000000..d366d570 --- /dev/null +++ b/coordinator/README.md @@ -0,0 +1,46 @@ +# Coordinator + +## Usage + +```go +package main + +import ( + "fmt" + "sync" + + "github.com/go-kratos-ecosystem/components/v2/coordinator" +) + +func main() { + var wg sync.WaitGroup + wg.Add(3) //nolint:gomnd + + go func() { + defer wg.Done() + if <-coordinator.Until("foo").Done(); true { + fmt.Println("foo") + } + }() + + go func() { + defer wg.Done() + if <-coordinator.Until("foo").Done(); true { + fmt.Println("foo 2") + } + }() + + go func() { + defer wg.Done() + if <-coordinator.Until("bar").Done(); true { + fmt.Println("bar") + } + }() + + coordinator.Until("foo").Close() + coordinator.Until("bar").Close() + + wg.Wait() +} + +``` \ No newline at end of file diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go new file mode 100644 index 00000000..c59ebe08 --- /dev/null +++ b/coordinator/coordinator.go @@ -0,0 +1,24 @@ +package coordinator + +import "sync" + +type Coordinator struct { + c chan struct{} + once sync.Once +} + +func NewCoordinator() *Coordinator { + return &Coordinator{ + c: make(chan struct{}), + } +} + +func (c *Coordinator) Done() <-chan struct{} { + return c.c +} + +func (c *Coordinator) Close() { + c.once.Do(func() { + close(c.c) + }) +} diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go new file mode 100644 index 00000000..6c1bedc0 --- /dev/null +++ b/coordinator/coordinator_test.go @@ -0,0 +1,63 @@ +package coordinator + +import ( + "sync" + "testing" + "time" +) + +func TestCoordinator(t *testing.T) { + c := NewCoordinator() + var wg sync.WaitGroup + + wg.Add(1) + + go func() { + defer wg.Done() + + timer := time.NewTimer(1 * time.Second) + defer timer.Stop() + + for { + select { + case <-c.Done(): + return + case <-timer.C: + t.Error("timeout") + return + } + } + }() + + c.Close() + wg.Wait() +} + +func TestCoordinator2(t *testing.T) { + c := NewCoordinator() + var wg sync.WaitGroup + + wg.Add(1) + + go func() { + defer wg.Done() + + timer := time.NewTimer(1 * time.Second) + defer timer.Stop() + + for { + select { + case <-c.Done(): + t.Error("timeout") + return + case <-timer.C: + return + } + } + }() + + time.Sleep(2 * time.Second) + + c.Close() + wg.Wait() +} diff --git a/coordinator/manager.go b/coordinator/manager.go new file mode 100644 index 00000000..3ce5ff7c --- /dev/null +++ b/coordinator/manager.go @@ -0,0 +1,63 @@ +package coordinator + +import "sync" + +type Manager struct { + coordinators map[string]*Coordinator + mu sync.RWMutex +} + +func NewManager() *Manager { + return &Manager{ + coordinators: make(map[string]*Coordinator), + } +} + +func (m *Manager) Until(identifier string) *Coordinator { + m.mu.Lock() + defer m.mu.Unlock() + + if c, ok := m.coordinators[identifier]; ok { + return c + } + + c := NewCoordinator() + m.coordinators[identifier] = c + + return c +} + +func (m *Manager) Close(identifier string) { + m.mu.Lock() + defer m.mu.Unlock() + + if c, ok := m.coordinators[identifier]; ok { + c.Close() + delete(m.coordinators, identifier) + } +} + +func (m *Manager) Clear() { + m.mu.Lock() + defer m.mu.Unlock() + + for _, c := range m.coordinators { + c.Close() + } + + m.coordinators = make(map[string]*Coordinator) +} + +var defaultManager = NewManager() + +func Until(identifier string) *Coordinator { + return defaultManager.Until(identifier) +} + +func Close(identifier string) { + defaultManager.Close(identifier) +} + +func Clear() { + defaultManager.Clear() +} diff --git a/coordinator/manager_test.go b/coordinator/manager_test.go new file mode 100644 index 00000000..96268c9f --- /dev/null +++ b/coordinator/manager_test.go @@ -0,0 +1,42 @@ +package coordinator + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestManager(t *testing.T) { + wg := sync.WaitGroup{} + ch := make(chan struct{}, 2) + + c1 := Until("foo") + c2 := Until("foo") + + assert.Same(t, c1, c2) + assert.Equal(t, 0, len(ch)) + + wg.Add(2) + go func() { + defer wg.Done() + + if <-c1.Done(); true { + ch <- struct{}{} + return + } + }() + go func() { + defer wg.Done() + + if <-c2.Done(); true { + ch <- struct{}{} + return + } + }() + + c1.Close() + + wg.Wait() + assert.Equal(t, 2, len(ch)) +}