-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
87 lines (73 loc) · 2.12 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/davidroman0O/retrypool"
)
// Define the data type for tasks
type TaskData struct {
ID int
}
// Implement the Worker interface
type DynamicWorker struct{}
func (w *DynamicWorker) Run(ctx context.Context, data TaskData) error {
if data.ID%2 == 0 {
// Simulate an error on even IDs
return fmt.Errorf("failed to process task ID: %d", data.ID)
}
fmt.Printf("Processed task ID: %d successfully.\n", data.ID)
return nil
}
// It will loop forever but it's to demonstrate the dynamic retry policy
func main() {
ctx := context.Background()
// Shared variable to hold the retry policy
var retryPolicy retrypool.RetryPolicy[TaskData]
retryPolicy = retrypool.FixedDelayRetryPolicy[TaskData]{Delay: time.Millisecond * 500}
// Mutex to protect access to retryPolicy
var policyMu sync.RWMutex
// Override the DelayFunc to use the dynamic retryPolicy
delayFunc := func(retries int, err error, config *retrypool.Config[TaskData]) time.Duration {
policyMu.RLock()
defer policyMu.RUnlock()
return retryPolicy.ComputeDelay(retries, err, config)
}
// Create the pool with the initial retry policy
pool := retrypool.New[TaskData](
ctx,
[]retrypool.Worker[TaskData]{&DynamicWorker{}},
retrypool.WithDelayFunc(delayFunc),
)
// Function to adjust the retry policy based on metrics
go func() {
for {
time.Sleep(time.Second * 2)
failedTasks := pool.GetMetricsSnapshot().TasksFailed
if failedTasks > 3 {
policyMu.Lock()
fmt.Println("Adjusting retry policy to use exponential backoff.")
retryPolicy = retrypool.ExponentialBackoffRetryPolicy[TaskData]{
BaseDelay: time.Second,
MaxDelay: time.Second * 10,
MaxJitter: time.Second,
}
policyMu.Unlock()
}
}
}()
// Submit tasks
for i := 1; i <= 10; i++ {
taskID := i
err := pool.Submit(TaskData{ID: taskID})
if err != nil {
fmt.Printf("Error submitting task ID %d: %v\n", taskID, err)
}
}
pool.WaitWithCallback(ctx, func(queueSize, processingCount, deadTaskCount int) bool {
return queueSize > 0 || processingCount > 0
}, time.Second/2)
// Close the pool
pool.Close()
}