diff --git a/core/handler/server/log.go b/core/handler/server/log.go index e1d1717..1c76964 100644 --- a/core/handler/server/log.go +++ b/core/handler/server/log.go @@ -16,10 +16,9 @@ package server import ( "context" - "encoding/json" + "io" "net/http" "strconv" - "time" "github.com/99nil/gopkg/ctr" "github.com/99nil/gopkg/sse" @@ -105,52 +104,35 @@ func logWatch() http.HandlerFunc { return } - sender, err := sse.NewSender(w) - if err != nil { - wrapper.InternalError(w, err) - return - } - sender.Ping() - ctx, cancel := context.WithCancel(r.Context()) defer cancel() ll := livelog.FromRequest(r) lineCh, closeCh, err := ll.Watch(ctx, strconv.FormatUint(stepS.ID, 10)) if err != nil { - sender.SendError("", err) + wrapper.InternalError(w, err) return } if closeCh == nil { - sender.Close() + wrapper.InternalError(w, "already closed") return } - pingChan := time.After(30 * time.Second) - timeoutChan := time.After(24 * time.Hour) - L: - for { + sender, err := sse.NewSender(w) + if err != nil { + wrapper.InternalError(w, err) + return + } + + errCh := make(chan error) + go func() { select { case <-ctx.Done(): - break L + case <-sender.WaitForClose(): case <-closeCh: - break L - case <-timeoutChan: - break L - case <-pingChan: - sender.Ping() - case line := <-lineCh: - data, err := json.Marshal(line) - if err != nil { - sender.SendError("", err) - continue - } - sender.SendMessage(&sse.Message{ - Event: "data", - Data: string(data), - }) + errCh <- io.EOF } - } - sender.Close() + }() + _ = sse.SendLoop[*livelog.Line](ctx, sender, lineCh, errCh) } } diff --git a/go.mod b/go.mod index 157fb18..aed6b3a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/zc2638/ink go 1.21.0 require ( - github.com/99nil/gopkg v0.0.0-20240111070951-f7ccbb7471f3 + github.com/99nil/gopkg v0.0.0-20240112073009-b6f03c6c05fc github.com/docker/distribution v2.8.2+incompatible github.com/docker/docker v24.0.5+incompatible github.com/docker/go-units v0.5.0 diff --git a/go.sum b/go.sum index d58bef4..ac63945 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= -github.com/99nil/gopkg v0.0.0-20240111070951-f7ccbb7471f3 h1:/ov/x6XC6td/zd7vNVe4fmoqNF2zzEITzAsbUnIGyXA= -github.com/99nil/gopkg v0.0.0-20240111070951-f7ccbb7471f3/go.mod h1:yQy7Xa2r1VHYfgqfCuDE6VGDmBt8xJgDVTZ2Bw3xTW0= +github.com/99nil/gopkg v0.0.0-20240112073009-b6f03c6c05fc h1:BLGRvIONB+Q+hqwNfDzJE4iWTv2KX/1rufbEYBF/d08= +github.com/99nil/gopkg v0.0.0-20240112073009-b6f03c6c05fc/go.mod h1:yQy7Xa2r1VHYfgqfCuDE6VGDmBt8xJgDVTZ2Bw3xTW0= github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= diff --git a/pkg/livelog/file.go b/pkg/livelog/file.go index 7ef6e14..4f1a8a0 100644 --- a/pkg/livelog/file.go +++ b/pkg/livelog/file.go @@ -113,6 +113,7 @@ func NewFile(cfg ConfigFile) (Interface, error) { type file struct { dir string rs sync.Map + mux sync.Mutex clients map[string]map[*subscriber]struct{} } @@ -137,7 +138,9 @@ func (f *file) List(ctx context.Context, id string) ([]*Line, error) { } func (f *file) Watch(ctx context.Context, id string) (<-chan *Line, <-chan struct{}, error) { + f.mux.Lock() clients, ok := f.clients[id] + f.mux.Unlock() if !ok { return nil, nil, nil } @@ -178,11 +181,13 @@ func (f *file) Write(ctx context.Context, id string, line *Line) error { return fmt.Errorf("log line write failed: %v", err) } + f.mux.Lock() clients, ok := f.clients[id] if !ok { f.clients[id] = make(map[*subscriber]struct{}) - return nil } + f.mux.Unlock() + for client := range clients { client.publish(line) } @@ -199,13 +204,22 @@ func (f *file) Create(_ context.Context, id string) error { if err != nil { return err } + fi := &fileItem{file: ff} f.rs.Store(id, fi) + + f.mux.Lock() + if _, ok := f.clients[id]; !ok { + f.clients[id] = make(map[*subscriber]struct{}) + } + f.mux.Unlock() return nil } func (f *file) Delete(_ context.Context, id string) error { + f.mux.Lock() clients, ok := f.clients[id] + f.mux.Unlock() if !ok { return nil }