forked from minio/sidekick
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache-utils.go
176 lines (160 loc) · 4.21 KB
/
cache-utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// Copyright (c) 2020 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
import (
"io"
"net/http"
"strconv"
)
// ResponseRecorder returns a wrapped response writer to
// get underlying http.Response for cache handler.
type ResponseRecorder struct {
StatusCode int
header http.Header // header needed for Header() method
Flushed bool // Flushed is whether the Handler called Flush.
ch chan bool // blocks until header is written or Write started
sentResponse bool
wroteHeader bool
pr *io.PipeReader
pw *io.PipeWriter
}
// NewRecorder returns an initialized ResponseRecorder.
func NewRecorder() *ResponseRecorder {
pr, pw := io.Pipe()
return &ResponseRecorder{
ch: make(chan bool),
header: make(http.Header),
StatusCode: 200,
pw: pw,
pr: pr,
}
}
// Header needed for implementing "net/http".ResponseWriter
func (rw *ResponseRecorder) Header() http.Header {
return rw.header
}
// writeHeader writes status code if no header written yet.
func (rw *ResponseRecorder) writeHeader() {
if rw.wroteHeader {
return
}
rw.WriteHeader(200)
}
// Write implements http.ResponseWriter. The data in buf is written to
// the pipeWriter
func (rw *ResponseRecorder) Write(buf []byte) (int, error) {
rw.writeHeader()
if !rw.sentResponse {
rw.sendResponse()
}
return rw.pw.Write(buf)
}
func (rw *ResponseRecorder) sendResponse() {
if rw.sentResponse {
return
}
rw.sentResponse = true
rw.ch <- true
}
// cleanup pipe writer after proxy ServeHTTP finishes.
func (rw *ResponseRecorder) finish() {
if !rw.wroteHeader {
rw.WriteHeader(200)
}
if !rw.sentResponse {
rw.sendResponse()
}
rw.pw.Close()
rw.Flush()
}
// WriteHeader implements http.ResponseWriter.
func (rw *ResponseRecorder) WriteHeader(code int) {
if rw.wroteHeader {
return
}
rw.StatusCode = code
rw.wroteHeader = true
if rw.header == nil {
rw.header = make(http.Header)
}
if !rw.sentResponse {
rw.sendResponse()
}
}
// Flush calls underlying Flush method
func (rw *ResponseRecorder) Flush() {
if !rw.wroteHeader {
rw.WriteHeader(200)
}
rw.Flushed = true
}
// Result returns the response generated by the handler. It blocks on the
// rw.ch until header some content has been written
// The returned Response will have at least its StatusCode,
// Header, Body
func (rw *ResponseRecorder) Result() *http.Response {
// block until header/writes are started
<-rw.ch
res := &http.Response{
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
StatusCode: rw.StatusCode,
Header: rw.header.Clone(),
}
res.Body = rw.pr
contentLengthStr := res.Header.Get("Content-Length")
if contentLengthStr != "" {
size, err := strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
res.ContentLength = -1
} else {
res.ContentLength = size
}
}
return res
}
type multiWriter struct {
backendWriter io.Writer
cacheWriter *io.PipeWriter
pipeClosed bool
}
// multiWriter writes to backend and cache - if cache write
// fails close the pipe, but continue writing to the backend
func (t *multiWriter) Write(p []byte) (n int, err error) {
n, err = t.backendWriter.Write(p)
if err != nil {
if !t.pipeClosed {
t.cacheWriter.CloseWithError(err)
}
return
}
if n != len(p) {
err = io.ErrShortWrite
return
}
// ignore errors writing to cache
if !t.pipeClosed {
_, cerr := t.cacheWriter.Write(p)
if cerr != nil {
t.pipeClosed = true
t.cacheWriter.CloseWithError(cerr)
}
}
return len(p), nil
}
func cacheMultiWriter(w1 io.Writer, w2 *io.PipeWriter) io.Writer {
return &multiWriter{backendWriter: w1, cacheWriter: w2}
}