-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathroute.go
113 lines (91 loc) · 2.2 KB
/
route.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package flightorder
import (
"context"
"fmt"
"sync"
)
// Route is responsible for tickets processing.
type Route struct {
allocator TicketAllocator
last *Ticket
recorder *recorder
mux sync.Mutex
}
// RouteParams sets route parameters.
type RouteParams struct {
// TicketAllocator sets custom ticket allocator.
// Optional.
TicketAllocator TicketAllocator
}
// NewRoute creates new route for tickets processing.
func NewRoute(params RouteParams) *Route {
if params.TicketAllocator == nil {
params.TicketAllocator = StdAllocator{}
}
return &Route{
allocator: params.TicketAllocator,
}
}
// Ticket takes a new ticket.
func (r *Route) Ticket() *Ticket {
r.mux.Lock()
defer r.mux.Unlock()
ticket := r.allocator.AcquireTicket()
ticket.prev = r.last
r.last = ticket
if r.recorder != nil {
r.recorder.takeCall(ticket)
}
return ticket
}
// CompleteTicket completes a ticket.
// Waits for previous taken tickets to complete first, if any.
// Completion function is optional.
func (r *Route) CompleteTicket(ctx context.Context, t *Ticket, completion func(ctx context.Context) error) error {
if completion == nil {
completion = func(ctx context.Context) error { return nil }
}
if r.recorder != nil {
r.recorder.completeCall(t)
}
if t.prev == nil {
return r.completeTail(ctx, t, completion)
}
if err := r.waitFor(ctx, t.prev); err != nil {
return fmt.Errorf("wait for previous ticket: %w", err)
}
r.allocator.ReleaseTicket(t.prev)
t.prev = nil
return r.completeTail(ctx, t, completion)
}
func (r *Route) completeTail(ctx context.Context, t *Ticket, f func(ctx context.Context) error) error {
r.mux.Lock()
defer r.mux.Unlock()
// Last ticket on a trip. No tickets ahead.
if r.last == t {
r.last = nil
r.allocator.ReleaseTicket(t)
if r.recorder != nil {
r.recorder.recordCompleted(t)
}
return f(ctx)
}
if err := f(ctx); err != nil {
return err
}
// There is a ticket ahead.
// Mark current ticket as completed.
t.completed <- struct{}{}
if r.recorder != nil {
r.recorder.recordCompleted(t)
}
return nil
}
func (r *Route) waitFor(ctx context.Context, f *Ticket) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-f.completed:
return nil
}
}