-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchunked_io.go
132 lines (124 loc) · 2.6 KB
/
chunked_io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package netconf
import (
"bufio"
"fmt"
"io"
"strconv"
)
// NewChunkedRdr reads "chunked" xml messages according to RFC6242 so that individual xml
// messages can be sent over a ssh stream.
//
// see https://datatracker.ietf.org/doc/html/rfc6242#section-4.1
//
// Chunked message framing example where xxx is replaced with NETCONF messages:
//
// #4
// xxxx
// #12
// xxxxxxxxxxxx
// ##
//
// This has 2 chunks of sizes 4 and 12 bytes.
func NewChunkedRdr(in io.Reader) <-chan io.Reader {
buf := make([]byte, 4096)
rdrs := make(chan io.Reader)
chunked := bufio.NewReader(in)
go func() {
rdr, wtr := io.Pipe()
rdrs <- rdr
for {
// RFC max size is 4.3 billion (uint32)
chunkSize, err := readChunkSize(chunked)
if err == io.EOF {
wtr.Close()
close(rdrs)
return
}
if chunkSize == 0 {
// don't close reader
wtr.Close()
rdr, wtr = io.Pipe()
rdrs <- rdr
continue
}
remainingSize := chunkSize
for remainingSize > 0 {
grabSize := minInt64(chunkSize, int64(len(buf)))
readSize, err := chunked.Read(buf[:grabSize])
if err != nil {
panic(err)
}
wtr.Write(buf[:readSize])
remainingSize -= int64(readSize)
}
}
}()
return rdrs
}
func readChunkSize(r io.ByteReader) (int64, error) {
d := make([]byte, 0, 8)
lf := 0
hash := 0
pos := 0
for {
b, err := r.ReadByte()
if err != nil {
return 0, err
}
switch b {
case '\n':
lf++
if lf == 2 {
if hash == 2 {
if len(d) != 0 {
return 0, fmt.Errorf("illegal chunked delimitor")
}
return 0, nil
}
return strconv.ParseInt(string(d), 10, 64)
}
case '#':
hash++
case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
d = append(d, b)
default:
return 0, fmt.Errorf("illegal framing char %d", b)
}
pos++
}
}
func minInt64(a, b int64) int64 {
if a < b {
return a
}
return b
}
// chunkedWtr
type chunkedWtr struct {
raw io.Writer
}
func (cw chunkedWtr) Close() error {
_, err := fmt.Fprint(cw.raw, "\n##\n")
// we don't close delegate, it stays open for next chunked wtr
return err
}
func (cw chunkedWtr) Write(p []byte) (int, error) {
_, err := fmt.Fprintf(cw.raw, "\n#%d\n", len(p))
if err != nil {
return 0, err
}
// check the math, unclear if this check will ever be nec. or vary depending
// on underlying io
wrote, err := cw.raw.Write(p)
if err != nil {
return wrote, err
}
if wrote != len(p) {
return wrote, fmt.Errorf("only wrote %d of %d bytes", wrote, len(p))
}
return wrote, err
}
// NewChunkedWtr is the counterpart to NewMsgsRdr
func NewChunkedWtr(w io.Writer) io.WriteCloser {
return chunkedWtr{raw: w}
}