From 91bee05353c42f28ec457618ffa4fbe333f77388 Mon Sep 17 00:00:00 2001 From: Maciej Plonski Date: Mon, 15 Jan 2024 14:27:56 +0100 Subject: [PATCH] process cloudevents for live logs --- go.mod | 8 ++ go.sum | 18 +++ internal/imagerunner/async.go | 205 +++++------------------------ internal/imagerunner/async_test.go | 79 ----------- internal/saucecloud/imagerunner.go | 23 ++-- 5 files changed, 72 insertions(+), 261 deletions(-) delete mode 100644 internal/imagerunner/async_test.go diff --git a/go.mod b/go.mod index 13734aa35..f2fefcfda 100644 --- a/go.mod +++ b/go.mod @@ -34,17 +34,25 @@ require ( require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/cloudevents/sdk-go/v2 v2.14.0 // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/distribution v2.8.3+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.1.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/rivo/uniseg v0.4.4 // indirect + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.8.0 // indirect + go.uber.org/zap v1.21.0 // indirect golang.org/x/tools v0.6.0 // indirect ) diff --git a/go.sum b/go.sum index 18225b3d5..f7f9dedb0 100644 --- a/go.sum +++ b/go.sum @@ -57,6 +57,7 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= github.com/bmatcuk/doublestar/v4 v4.0.2 h1:X0krlUVAVmtr2cRoTqR8aDMrDqnB36ht8wpWTiQ3jsA= @@ -70,6 +71,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s= +github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -188,6 +191,7 @@ github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -238,6 +242,8 @@ github.com/jedib0t/go-pretty/v6 v6.2.1 h1:O/3XdNfyWSyVLLIt1EeDhfP8AhNMjtBSh0MuZ4 github.com/jedib0t/go-pretty/v6 v6.2.1/go.mod h1:+nE9fyyHGil+PuISTCrp7avEdo6bqoMwqZnuiK2r2a0= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -294,8 +300,12 @@ github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -402,8 +412,15 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= +go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= +go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190219172222-a4c6cb3142f2/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -658,6 +675,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/imagerunner/async.go b/internal/imagerunner/async.go index 00dd4b08f..a3c6c9041 100644 --- a/internal/imagerunner/async.go +++ b/internal/imagerunner/async.go @@ -4,150 +4,16 @@ import ( "bufio" "encoding/json" "fmt" - "log" "net/http" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/gorilla/websocket" - "github.com/santhosh-tekuri/jsonschema/v5" ) -var SCHEMA = ` -{ - "properties": { - "kind": { - "enum": [ - "notice", - "log", - "ping" - ] - }, - "runnerID": { - "type": "string" - } - }, - "allOf": [ - { - "if": { - "properties": { - "kind": { - "const": "log" - } - } - }, - "then": { - "properties": { - "lines": { - "type": "array", - "items": { - "type": "object", - "properties": { - "id": { - "type": "string" - }, - "containerName": { - "type": "string" - }, - "message": { - "type": "string" - } - } - } - } - }, - "additionalProperties": true - } - }, - { - "if": { - "properties": { - "kind": { - "const": "notice" - } - } - }, - "then": { - "properties": { - "severity": { - "enum": [ - "info", - "warning", - "error" - ] - }, - "message": { - "type": "string" - } - }, - "additionalProperties": true - } - }, - { - "if": { - "properties": { - "kind": { - "const": "ping" - } - } - }, - "then": { - "properties": { - "message": { - "type": "string" - } - }, - "additionalProperties": true - } - } - - ], - "additionalProperties": true -} -` - -const ( - NOTICE = "notice" - LOG = "log" - PING = "ping" -) - -type AsyncEventI interface { - GetKind() string - GetRunnerID() string -} - type AsyncEvent struct { - Kind string `json:"kind"` - RunnerID string `json:"runnerID"` -} - -func (a *AsyncEvent) GetKind() string { - return a.Kind -} - -func (a *AsyncEvent) GetRunnerID() string { - return a.RunnerID -} - -type LogLine struct { - ID string `json:"id"` - ContainerName string `json:"containerName"` - Message string `json:"message"` -} - -type LogEvent struct { - AsyncEvent - Lines []LogLine `json:"lines"` -} - -type PingEvent struct { - AsyncEvent - Message string `json:"message"` -} - -type NoticeEvent struct { - AsyncEvent - Severity string `json:"severity"` - Message string `json:"message"` + Type string + LineSequence string + Data map[string]string } type AsyncEventTransportI interface { @@ -205,55 +71,52 @@ func (aet *SseAsyncEventTransport) Close() error { } type AsyncEventManagerI interface { - ParseEvent(event string) (AsyncEventI, error) + ParseEvent(event string) (*AsyncEvent, error) } type AsyncEventManager struct { - schema *jsonschema.Schema } func NewAsyncEventManager() (*AsyncEventManager, error) { - schema, err := jsonschema.CompileString("schema.json", SCHEMA) - if err != nil { - return nil, err - } - - asyncEventManager := AsyncEventManager{ - schema: schema, - } + asyncEventManager := AsyncEventManager{} return &asyncEventManager, nil } -func (a *AsyncEventManager) ParseEvent(event string) (AsyncEventI, error) { - err := a.schema.Validate(event) +func parseLineSequence(cloudEvent *cloudevents.Event) (string, error) { + // The extension is not necessarily present, so ignore errors. + _lineseq, _ := cloudEvent.Context.GetExtension("linesequence") + lineseq, ok := _lineseq.(string) + if !ok { + return "", fmt.Errorf("linesequence is not a string") + } + return lineseq, nil +} + +func (a *AsyncEventManager) ParseEvent(event string) (*AsyncEvent, error) { + readEvent := cloudevents.NewEvent() + err := json.Unmarshal([]byte(event), &readEvent) + if err != nil { + return nil, err + } + + data := map[string]string{} + err = readEvent.DataAs(&data) if err != nil { return nil, err } - v := AsyncEvent{} - if err := json.Unmarshal([]byte(event), &v); err != nil { - log.Fatal(err) + + asyncEvent := AsyncEvent{ + Type: readEvent.Type(), + Data: data, } - if v.GetKind() == LOG { - logEvent := LogEvent{} - if err := json.Unmarshal([]byte(event), &logEvent); err != nil { - log.Fatal(err) - } - return &logEvent, nil - } else if v.GetKind() == NOTICE { - noticeEvent := NoticeEvent{} - if err := json.Unmarshal([]byte(event), ¬iceEvent); err != nil { - log.Fatal(err) - } - return ¬iceEvent, nil - } else if v.GetKind() == PING { - pingEvent := PingEvent{} - if err := json.Unmarshal([]byte(event), &pingEvent); err != nil { - log.Fatal(err) + if asyncEvent.Type == "com.saucelabs.so.v1.log" { + asyncEvent.LineSequence, err = parseLineSequence(&readEvent) + if err != nil { + return nil, err } - return &pingEvent, nil } - return nil, fmt.Errorf("unknown event type: %s", v.GetKind()) + return &asyncEvent, nil } diff --git a/internal/imagerunner/async_test.go b/internal/imagerunner/async_test.go deleted file mode 100644 index 70bedc8e8..000000000 --- a/internal/imagerunner/async_test.go +++ /dev/null @@ -1,79 +0,0 @@ -package imagerunner - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestLogEvent(t *testing.T) { - manager, err := NewAsyncEventManager() - assert.NoError(t, err) - - eventMsg := `{ - "kind": "log", - "runnerID": "myrunner", - "lines": [ - { - "id": "1", - "containerName": "mycontainer", - "message": "hello" - } - ] - }` - - event, err := manager.ParseEvent(eventMsg) - assert.NoError(t, err) - assert.Equal(t, "log", event.GetKind()) - assert.Equal(t, "myrunner", event.GetRunnerID()) - - logEvent, ok := event.(*LogEvent) - assert.True(t, ok) - assert.Len(t, logEvent.Lines, 1) - assert.Equal(t, "1", logEvent.Lines[0].ID) - assert.Equal(t, "hello", logEvent.Lines[0].Message) - assert.Equal(t, "mycontainer", logEvent.Lines[0].ContainerName) -} - -func TestNoticeEvent(t *testing.T) { - manager, err := NewAsyncEventManager() - assert.NoError(t, err) - - eventMsg := `{ - "kind": "notice", - "runnerID": "myrunner", - "severity": "info", - "message": "hello" - }` - - event, err := manager.ParseEvent(eventMsg) - assert.NoError(t, err) - assert.Equal(t, "notice", event.GetKind()) - assert.Equal(t, "myrunner", event.GetRunnerID()) - - noticeEvent, ok := event.(*NoticeEvent) - assert.True(t, ok) - assert.Equal(t, "notice", noticeEvent.GetKind()) - assert.Equal(t, "hello", noticeEvent.Message) - assert.Equal(t, "info", noticeEvent.Severity) -} - -func TestPingEvent(t *testing.T) { - manager, err := NewAsyncEventManager() - assert.NoError(t, err) - - eventMsg := `{ - "kind": "ping", - "runnerID": "myrunner", - "message": "hello" - }` - - event, err := manager.ParseEvent(eventMsg) - assert.NoError(t, err) - assert.Equal(t, "ping", event.GetKind()) - assert.Equal(t, "myrunner", event.GetRunnerID()) - - pingEvent, ok := event.(*PingEvent) - assert.True(t, ok) - assert.Equal(t, "hello", pingEvent.Message) -} diff --git a/internal/saucecloud/imagerunner.go b/internal/saucecloud/imagerunner.go index 8bde4028d..873d8dddd 100644 --- a/internal/saucecloud/imagerunner.go +++ b/internal/saucecloud/imagerunner.go @@ -475,27 +475,28 @@ func (r *ImgRunner) handleAsyncEventsOneshot(ctx context.Context, id string, las case <-ctx.Done(): return lastseq, ctx.Err() default: - msg, err := transport.ReadMessage() + readMessage, err := transport.ReadMessage() if err != nil { return lastseq, err } - if msg == "" { + if readMessage == "" { return lastseq, errors.New("empty message") } - event, err := r.asyncEventManager.ParseEvent(msg) + event, err := r.asyncEventManager.ParseEvent(readMessage) if err != nil { return lastseq, err } - if event.GetKind() == "log" { - logEvent := event.(*imagerunner.LogEvent) - for _, line := range logEvent.Lines { - lastseq = line.ID - log.Info().Msgf("[%s] %s", line.ContainerName, line.Message) + switch event.Type { + case "com.saucelabs.so.v1.ping": + case "com.saucelabs.so.v1.log": + if event.LineSequence != "" { + lastseq = event.LineSequence } - } else if event.GetKind() == "notice" { - noticeEvent := event.(*imagerunner.NoticeEvent) - log.Info().Msgf("[%s] %s", noticeEvent.Severity, noticeEvent.Message) + log.Info().Msgf("[%s] %s", event.Data["containerName"], event.Data["line"]) + default: + err := errors.New("unknown event type") + log.Err(err).Msgf("unknown even type: %s", event.Type) } } }