Skip to content

Commit b515758

Browse files
szuecsAlexanderYastrebov
authored andcommitted
feature: filter fifoWithBody (zalando#2685)
* feature: filter fifoWithBody that works similar to fifo(), but release deferred until body streaming to client was finished * fix: fifo() and fifoWithBody() with canceled requests Both filters did not check for canceled context from request before semaphore.Acquire, see golang/go#63615 Signed-off-by: Sandor Szücs <[email protected]>
1 parent df800e1 commit b515758

File tree

7 files changed

+460
-89
lines changed

7 files changed

+460
-89
lines changed

docs/reference/filters.md

+25
Original file line numberDiff line numberDiff line change
@@ -2819,6 +2819,31 @@ Example:
28192819
fifo(100, 150, "10s")
28202820
```
28212821
2822+
### fifoWithBody
2823+
2824+
This Filter is similar to the [lifo](#lifo) filter in regards to
2825+
parameters and status codes.
2826+
Performance considerations are similar to [fifo](#fifo).
2827+
2828+
The difference between fifo and fifoWithBody is that fifo will decrement
2829+
the concurrency as soon as the backend sent response headers and
2830+
fifoWithBody will decrement the concurrency if the response body was
2831+
served. Normally both are very similar, but if you have a fully async
2832+
component that serves multiple website fragments, this would decrement
2833+
concurrency too early.
2834+
2835+
Parameters:
2836+
2837+
* MaxConcurrency specifies how many goroutines are allowed to work on this queue (int)
2838+
* MaxQueueSize sets the queue size (int)
2839+
* Timeout sets the timeout to get request scheduled (time)
2840+
2841+
Example:
2842+
2843+
```
2844+
fifoWithBody(100, 150, "10s")
2845+
```
2846+
28222847
### lifo
28232848
28242849
This Filter changes skipper to handle the route with a bounded last in

filters/builtin/builtin.go

+1
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ func Filters() []filters.Spec {
216216
auth.NewForwardToken(),
217217
auth.NewForwardTokenField(),
218218
scheduler.NewFifo(),
219+
scheduler.NewFifoWithBody(),
219220
scheduler.NewLIFO(),
220221
scheduler.NewLIFOGroup(),
221222
rfc.NewPath(),

filters/filters.go

+1
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ const (
327327
SetDynamicBackendUrl = "setDynamicBackendUrl"
328328
ApiUsageMonitoringName = "apiUsageMonitoring"
329329
FifoName = "fifo"
330+
FifoWithBodyName = "fifoWithBody"
330331
LifoName = "lifo"
331332
LifoGroupName = "lifoGroup"
332333
RfcPathName = "rfcPath"

filters/scheduler/fifo.go

+33-19
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,31 @@ import (
1111
"github.com/zalando/skipper/scheduler"
1212
)
1313

14-
const (
15-
fifoKey string = "fifo"
16-
)
17-
1814
type (
19-
fifoSpec struct{}
15+
fifoSpec struct {
16+
typ string
17+
}
2018
fifoFilter struct {
2119
config scheduler.Config
2220
queue *scheduler.FifoQueue
21+
typ string
2322
}
2423
)
2524

2625
func NewFifo() filters.Spec {
27-
return &fifoSpec{}
26+
return &fifoSpec{
27+
typ: filters.FifoName,
28+
}
29+
}
30+
31+
func NewFifoWithBody() filters.Spec {
32+
return &fifoSpec{
33+
typ: filters.FifoWithBodyName,
34+
}
2835
}
2936

30-
func (*fifoSpec) Name() string {
31-
return filters.FifoName
37+
func (s *fifoSpec) Name() string {
38+
return s.typ
3239
}
3340

3441
// CreateFilter creates a fifoFilter, that will use a semaphore based
@@ -65,6 +72,7 @@ func (s *fifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
6572
}
6673

6774
return &fifoFilter{
75+
typ: s.typ,
6876
config: scheduler.Config{
6977
MaxConcurrency: cc,
7078
MaxQueueSize: qs,
@@ -132,23 +140,29 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) {
132140
}
133141

134142
// ok
135-
pending, _ := ctx.StateBag()[fifoKey].([]func())
136-
ctx.StateBag()[fifoKey] = append(pending, done)
143+
pending, _ := ctx.StateBag()[f.typ].([]func())
144+
ctx.StateBag()[f.typ] = append(pending, done)
137145
}
138146

139147
// Response will decrease the number of inflight requests to release
140148
// the concurrency reservation for the request.
141149
func (f *fifoFilter) Response(ctx filters.FilterContext) {
142-
pending, ok := ctx.StateBag()[fifoKey].([]func())
143-
if !ok {
144-
return
145-
}
146-
last := len(pending) - 1
147-
if last < 0 {
148-
return
150+
switch f.typ {
151+
case filters.FifoName:
152+
pending, ok := ctx.StateBag()[f.typ].([]func())
153+
if !ok {
154+
return
155+
}
156+
last := len(pending) - 1
157+
if last < 0 {
158+
return
159+
}
160+
pending[last]()
161+
ctx.StateBag()[f.typ] = pending[:last]
162+
163+
case filters.FifoWithBodyName:
164+
// nothing to do here, handled in the proxy after copyStream()
149165
}
150-
pending[last]()
151-
ctx.StateBag()[fifoKey] = pending[:last]
152166
}
153167

154168
// HandleErrorResponse is to opt-in for filters to get called

0 commit comments

Comments
 (0)