Skip to content

Commit dc7abf6

Browse files
author
Andrew Hare
committed
Create core and coretest packages
Signed-off-by: Andrew Hare <[email protected]>
1 parent 2a82ef3 commit dc7abf6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+959
-900
lines changed

.testignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
github.com/rename-this/vhs/cmd/vhs
22
github.com/rename-this/vhs/internal/smoke
3+
github.com/rename-this/vhs/coretest

capture/listener.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
"github.com/google/gopacket"
1010
"github.com/google/gopacket/pcap"
11-
"github.com/rename-this/vhs/session"
11+
"github.com/rename-this/vhs/core"
1212

1313
// See https://pkg.go.dev/github.com/google/gopacket?tab=doc#hdr-A_Final_Note
1414
_ "github.com/google/gopacket/layers"
@@ -26,7 +26,7 @@ func NewListener(cap *Capture) Listener {
2626
// given address and port.
2727
type Listener interface {
2828
Packets() <-chan gopacket.Packet
29-
Listen(session.Context)
29+
Listen(core.Context)
3030
Close()
3131
}
3232

@@ -46,9 +46,9 @@ func (l *listener) Packets() <-chan gopacket.Packet {
4646
}
4747

4848
// Listen starts listening.
49-
func (l *listener) Listen(ctx session.Context) {
49+
func (l *listener) Listen(ctx core.Context) {
5050
ctx.Logger = ctx.Logger.With().
51-
Str(session.LoggerKeyComponent, "listener").
51+
Str(core.LoggerKeyComponent, "listener").
5252
Logger()
5353

5454
for _, i := range l.Capture.Interfaces {
@@ -62,7 +62,7 @@ func (l *listener) Listen(ctx session.Context) {
6262

6363
type activateFn func(inactive *pcap.InactiveHandle) (*pcap.Handle, error)
6464

65-
func (l *listener) newHandle(ctx session.Context, i pcap.Interface, activate activateFn) (*pcap.Handle, error) {
65+
func (l *listener) newHandle(ctx core.Context, i pcap.Interface, activate activateFn) (*pcap.Handle, error) {
6666
ctx.Logger = ctx.Logger.With().
6767
Interface("interface", i).
6868
Logger()
@@ -98,7 +98,7 @@ func (l *listener) newHandle(ctx session.Context, i pcap.Interface, activate act
9898
return handle, nil
9999
}
100100

101-
func (l *listener) readPackets(ctx session.Context, dataSource gopacket.PacketDataSource, decoder gopacket.Decoder) {
101+
func (l *listener) readPackets(ctx core.Context, dataSource gopacket.PacketDataSource, decoder gopacket.Decoder) {
102102
source := gopacket.NewPacketSource(dataSource, decoder)
103103
source.Lazy = true
104104
source.NoCopy = true

capture/listener_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
"github.com/google/gopacket"
99
"github.com/google/gopacket/pcap"
10-
"github.com/rename-this/vhs/session"
10+
"github.com/rename-this/vhs/core"
1111
"gotest.tools/v3/assert"
1212
)
1313

@@ -36,7 +36,7 @@ func TestNewHandle(t *testing.T) {
3636
}
3737
for _, c := range cases {
3838
t.Run(c.desc, func(t *testing.T) {
39-
ctx := session.NewContexts(&session.Config{}, &session.FlowConfig{}, nil)
39+
ctx := core.NewContext(&core.Config{}, &core.FlowConfig{}, nil)
4040
l := NewListener(&Capture{})
4141
_, err := l.(*listener).newHandle(ctx, c.i, c.activate)
4242
if c.errContains == "" {
@@ -107,7 +107,7 @@ func TestReadPackets(t *testing.T) {
107107
t.Run(c.desc, func(t *testing.T) {
108108
packets := c.listener.Packets()
109109

110-
ctx := session.NewContexts(&session.Config{DebugPackets: true}, &session.FlowConfig{}, nil)
110+
ctx := core.NewContext(&core.Config{DebugPackets: true}, &core.FlowConfig{}, nil)
111111

112112
go c.listener.(*listener).readPackets(ctx, c.source, c.decoder)
113113
for _, d := range c.source.data {

cmd/vhs/main.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,16 @@ import (
1515
"time"
1616

1717
"github.com/rename-this/vhs/capture"
18+
"github.com/rename-this/vhs/core"
1819
"github.com/rename-this/vhs/file"
1920
"github.com/rename-this/vhs/flow"
2021
"github.com/rename-this/vhs/gcs"
2122
"github.com/rename-this/vhs/gzipx"
2223
"github.com/rename-this/vhs/httpx"
2324
"github.com/rename-this/vhs/internal/ioutilx"
2425
"github.com/rename-this/vhs/jsonx"
25-
"github.com/rename-this/vhs/middleware"
2626
"github.com/rename-this/vhs/plugin"
2727
"github.com/rename-this/vhs/s3compat"
28-
"github.com/rename-this/vhs/session"
2928
"github.com/rename-this/vhs/tcp"
3029

3130
_ "net/http/pprof"
@@ -48,8 +47,8 @@ func newRootCmd() *cobra.Command {
4847
Short: "A tool for capturing and recording network traffic.",
4948
}
5049

51-
cfg = &session.Config{}
52-
flowCfg = &session.FlowConfig{}
50+
cfg = &core.Config{}
51+
flowCfg = &core.FlowConfig{}
5352
inputLine string
5453
outputLines []string
5554
)
@@ -99,10 +98,10 @@ func newRootCmd() *cobra.Command {
9998
return cmd
10099
}
101100

102-
func root(cfg *session.Config, flowCfg *session.FlowConfig, inputLine string, outputLines []string, parser *flow.Parser, logWriter io.Writer) error {
101+
func root(cfg *core.Config, flowCfg *core.FlowConfig, inputLine string, outputLines []string, parser *flow.Parser, logWriter io.Writer) error {
103102
var (
104103
errs = make(chan error, errBufSize)
105-
ctx = session.NewContextsForWriter(cfg, flowCfg, errs, logWriter)
104+
ctx = core.NewContextForWriter(cfg, flowCfg, errs, logWriter)
106105
)
107106

108107
go func() {
@@ -210,13 +209,13 @@ func root(cfg *session.Config, flowCfg *session.FlowConfig, inputLine string, ou
210209
return nil
211210
}
212211

213-
func startMiddleware(ctx session.Context) (middleware.Middleware, error) {
212+
func startMiddleware(ctx core.Context) (core.Middleware, error) {
214213
if ctx.FlowConfig.Middleware == "" {
215214
ctx.Logger.Debug().Msg("no middleware configured")
216215
return nil, nil
217216
}
218217

219-
m, err := middleware.New(ctx, ctx.FlowConfig.Middleware)
218+
m, err := core.NewMiddleware(ctx, ctx.FlowConfig.Middleware)
220219
if err != nil {
221220
return nil, fmt.Errorf("failed to create middleware: %w", err)
222221
}
@@ -259,10 +258,10 @@ func defaultParser() *flow.Parser {
259258

260259
p.LoadSink("gcs", gcs.NewSink)
261260
p.LoadSink("s3compat", s3compat.NewSink)
262-
p.LoadSink("stdout", func(_ session.Context) (flow.Sink, error) {
261+
p.LoadSink("stdout", func(_ core.Context) (core.Sink, error) {
263262
return os.Stdout, nil
264263
})
265-
p.LoadSink("discard", func(_ session.Context) (flow.Sink, error) {
264+
p.LoadSink("discard", func(_ core.Context) (core.Sink, error) {
266265
return ioutilx.NopWriteCloser(ioutil.Discard), nil
267266
})
268267
p.LoadSink("tcp", tcp.NewSink)

cmd/vhs/main_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestRoot(t *testing.T) {
6666
}
6767
}()
6868
69-
return &session.FlowConfig{
69+
return &core.FlowConfig{
7070
7171
SourceDuration: 10 * time.Second,
7272
DrainDuration: 5 * time.Second,
@@ -114,7 +114,7 @@ func TestRoot(t *testing.T) {
114114
}
115115
}()
116116
117-
return &session.FlowConfig{
117+
return &core.FlowConfig{
118118
SourceDuration: 10 * time.Second,
119119
DrainDuration: 5 * time.Second,
120120
Addr: strings.TrimLeft(s.URL, "http://"),
@@ -153,7 +153,7 @@ func TestRoot(t *testing.T) {
153153
{
154154
desc: "missing middleware",
155155
init: func(ctx context.Context) *session.FlowConfig {
156-
return &session.FlowConfig{
156+
return &core.FlowConfig{
157157
Middleware: "../../testdata/no_such_file",
158158
}
159159
},
@@ -162,7 +162,7 @@ func TestRoot(t *testing.T) {
162162
{
163163
desc: "middleware crash immediately",
164164
init: func(ctx context.Context) *session.FlowConfig {
165-
return &session.FlowConfig{
165+
return &core.FlowConfig{
166166
SourceDuration: time.Second,
167167
Middleware: "../../testdata/crash_immediately.bash",
168168
}
@@ -173,7 +173,7 @@ func TestRoot(t *testing.T) {
173173
{
174174
desc: "middleware crash eventually",
175175
init: func(ctx context.Context) *session.FlowConfig {
176-
return &session.FlowConfig{
176+
return &core.FlowConfig{
177177
SourceDuration: 2 * time.Second,
178178
Middleware: "../../testdata/crash_eventually.bash",
179179
}
@@ -184,7 +184,7 @@ func TestRoot(t *testing.T) {
184184
{
185185
desc: "bad input line",
186186
init: func(ctx context.Context) *session.FlowConfig {
187-
return &session.FlowConfig{}
187+
return &core.FlowConfig{}
188188
},
189189
inputLine: "---",
190190
initializeErrContains: "invalid source: ---",
@@ -201,13 +201,13 @@ func TestRoot(t *testing.T) {
201201
buf bytes.Buffer
202202
)
203203
204-
parser.Sinks["testout"] = func(session.Context) (flow.Sink, error) {
204+
parser.Sinks["testout"] = func(core.Context) (flow.Sink, error) {
205205
return ioutilx.NopWriteCloser(&buf), nil
206206
}
207207
208208
var (
209209
logBuf bytes.Buffer
210-
cfg = &session.Config{
210+
cfg = &core.Config{
211211
ProfileHTTPAddr: ":81112",
212212
PrometheusAddr: ":8080",
213213
ProfilePathCPU: "/tmp/vhs_cpu_test.prof",

core/close.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package core
2+
3+
import "io"
4+
5+
// CloseSequentially closes a and if there is no
6+
// error, closes b.
7+
func CloseSequentially(a, b io.Closer) error {
8+
if err := a.Close(); err != nil {
9+
return err
10+
}
11+
if b != nil {
12+
return b.Close()
13+
}
14+
return nil
15+
}

core/close_test.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package core
2+
3+
import (
4+
"bytes"
5+
"errors"
6+
"io"
7+
"testing"
8+
9+
"github.com/rename-this/vhs/internal/ioutilx"
10+
"gotest.tools/assert"
11+
)
12+
13+
type TestDoubleOutputModifier struct {
14+
OptCloseErr error
15+
}
16+
17+
// Wrap wraps.
18+
func (m *TestDoubleOutputModifier) Wrap(w OutputWriter) (OutputWriter, error) {
19+
tdom := &testDoubleOutputModifier{w: w}
20+
if m.OptCloseErr == nil {
21+
return tdom, nil
22+
}
23+
24+
return &errWriteCloser{
25+
Writer: tdom,
26+
err: m.OptCloseErr,
27+
}, nil
28+
}
29+
30+
type testDoubleOutputModifier struct {
31+
w io.WriteCloser
32+
}
33+
34+
func (o *testDoubleOutputModifier) Write(p []byte) (int, error) {
35+
return o.w.Write(append(p, p...))
36+
}
37+
38+
func (o *testDoubleOutputModifier) Close() error {
39+
return o.w.Close()
40+
}
41+
42+
type errWriteCloser struct {
43+
io.Writer
44+
err error
45+
}
46+
47+
func (n *errWriteCloser) Close() error { return n.err }
48+
49+
func TestCloseSequentially(t *testing.T) {
50+
cases := []struct {
51+
desc string
52+
a OutputModifier
53+
b OutputModifier
54+
errContains string
55+
}{
56+
{
57+
desc: "no error",
58+
a: &TestDoubleOutputModifier{},
59+
b: &TestDoubleOutputModifier{},
60+
},
61+
{
62+
desc: "error a",
63+
a: &TestDoubleOutputModifier{OptCloseErr: errors.New("111")},
64+
b: &TestDoubleOutputModifier{},
65+
errContains: "111",
66+
},
67+
{
68+
desc: "error b",
69+
a: &TestDoubleOutputModifier{},
70+
b: &TestDoubleOutputModifier{OptCloseErr: errors.New("222")},
71+
errContains: "222",
72+
},
73+
}
74+
for _, c := range cases {
75+
t.Run(c.desc, func(t *testing.T) {
76+
var bufA bytes.Buffer
77+
a, err := c.a.Wrap(ioutilx.NopWriteCloser(&bufA))
78+
assert.NilError(t, err)
79+
80+
var bufB bytes.Buffer
81+
b, err := c.b.Wrap(ioutilx.NopWriteCloser(&bufB))
82+
assert.NilError(t, err)
83+
84+
err = CloseSequentially(a, b)
85+
if c.errContains == "" {
86+
assert.NilError(t, err)
87+
} else {
88+
assert.ErrorContains(t, err, c.errContains)
89+
}
90+
})
91+
}
92+
}

session/config.go core/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package session
1+
package core
22

33
import "time"
44

session/context.go core/context.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package session
1+
package core
22

33
import (
44
"context"
@@ -15,14 +15,14 @@ const (
1515
LoggerKeyComponent = "component"
1616
)
1717

18-
// NewContexts creates a new set of contexts.
19-
func NewContexts(cfg *Config, flowCfg *FlowConfig, errs chan error) Context {
20-
return NewContextsForWriter(cfg, flowCfg, errs, os.Stderr)
18+
// NewContext creates a new context.
19+
func NewContext(cfg *Config, flowCfg *FlowConfig, errs chan error) Context {
20+
return NewContextForWriter(cfg, flowCfg, errs, os.Stderr)
2121
}
2222

23-
// NewContextsForWriter creates a new set of contexts
23+
// NewContextForWriter creates a new context
2424
// with logs written to a specific writer.
25-
func NewContextsForWriter(cfg *Config, flowCfg *FlowConfig, errs chan error, w io.Writer) Context {
25+
func NewContextForWriter(cfg *Config, flowCfg *FlowConfig, errs chan error, w io.Writer) Context {
2626
var (
2727
sessionID = ksuid.New().String()
2828
registry = envelope.NewRegistry()

session/context_test.go core/context_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package session
1+
package core
22

33
import (
44
"errors"
@@ -12,7 +12,7 @@ func TestContexts(t *testing.T) {
1212
var (
1313
canceled = make(chan struct{}, 3)
1414
errs = make(chan error, 3)
15-
ctx1 = NewContexts(&Config{}, &FlowConfig{}, errs)
15+
ctx = NewContext(&Config{}, &FlowConfig{}, errs)
1616
)
1717

1818
fn := func(ctx Context) {
@@ -28,9 +28,9 @@ func TestContexts(t *testing.T) {
2828
}
2929
}
3030

31-
go fn(ctx1)
31+
go fn(ctx)
3232

33-
ctx1.Cancel()
33+
ctx.Cancel()
3434

3535
time.Sleep(500 * time.Millisecond)
3636

0 commit comments

Comments
 (0)