Skip to content

Commit

Permalink
refactor sse
Browse files Browse the repository at this point in the history
  • Loading branch information
zc2638 committed Jan 11, 2024
1 parent f3a2764 commit 720f240
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 184 deletions.
2 changes: 1 addition & 1 deletion core/clients/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"strconv"
"strings"

"github.com/99nil/gopkg/sse"
"github.com/go-resty/resty/v2"

v1 "github.com/zc2638/ink/pkg/api/core/v1"
"github.com/zc2638/ink/pkg/livelog"
"github.com/zc2638/ink/pkg/sse"
)

type Server interface {
Expand Down
45 changes: 18 additions & 27 deletions core/handler/server/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ package server
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"

"github.com/99nil/gopkg/ctr"
"github.com/99nil/gopkg/sse"

"github.com/zc2638/ink/core/handler/wrapper"
storageV1 "github.com/zc2638/ink/pkg/api/storage/v1"
Expand Down Expand Up @@ -106,37 +105,27 @@ func logWatch() http.HandlerFunc {
return
}

h := w.Header()
h.Set("Content-Type", "text/event-stream")
h.Set("Cache-Control", "no-cache")
h.Set("Connection", "keep-alive")
h.Set("X-Accel-Buffering", "no")

f, ok := w.(http.Flusher)
if !ok {
sender, err := sse.NewSender(w)
if err != nil {
wrapper.InternalError(w, err)
return
}

_, _ = io.WriteString(w, ": ping\n\n")
f.Flush()
sender.Ping()

ctx, cancel := context.WithCancel(r.Context())
defer cancel()

ll := livelog.FromRequest(r)
lineCh, closeCh, err := ll.Watch(ctx, strconv.FormatUint(stepS.ID, 10))
if err != nil {
_, _ = fmt.Fprintf(w, "event: error\ndata: %v\n\n", err)
f.Flush()
sender.SendError("", err)
return
}
if closeCh == nil {
_, _ = io.WriteString(w, "event: error\ndata: eof\n\n")
f.Flush()
sender.Close()
return
}

enc := json.NewEncoder(w)
pingChan := time.After(30 * time.Second)
timeoutChan := time.After(24 * time.Hour)
L:
Expand All @@ -149,17 +138,19 @@ func logWatch() http.HandlerFunc {
case <-timeoutChan:
break L
case <-pingChan:
_, _ = io.WriteString(w, ": ping\n\n")
f.Flush()
sender.Ping()
case line := <-lineCh:
_, _ = io.WriteString(w, "event: data\ndata: ")
_ = enc.Encode(line)
_, _ = io.WriteString(w, "\n\n")
f.Flush()
data, err := json.Marshal(line)
if err != nil {
sender.SendError("", err)
continue
}
sender.SendMessage(&sse.Message{
Event: "data",
Data: string(data),
})
}
}

_, _ = io.WriteString(w, "event: error\ndata: eof\n\n")
f.Flush()
sender.Close()
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/zc2638/ink
go 1.21.0

require (
github.com/99nil/gopkg v0.0.0-20231117031104-b92c1eee778d
github.com/99nil/gopkg v0.0.0-20240111070951-f7ccbb7471f3
github.com/docker/distribution v2.8.2+incompatible
github.com/docker/docker v24.0.5+incompatible
github.com/docker/go-units v0.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
github.com/99nil/gopkg v0.0.0-20231117031104-b92c1eee778d h1:aeQmaBZiFhZ+vigb8enMx1pIw0xkRHKgrEaFMO/iMcQ=
github.com/99nil/gopkg v0.0.0-20231117031104-b92c1eee778d/go.mod h1:yQy7Xa2r1VHYfgqfCuDE6VGDmBt8xJgDVTZ2Bw3xTW0=
github.com/99nil/gopkg v0.0.0-20240111070951-f7ccbb7471f3 h1:/ov/x6XC6td/zd7vNVe4fmoqNF2zzEITzAsbUnIGyXA=
github.com/99nil/gopkg v0.0.0-20240111070951-f7ccbb7471f3/go.mod h1:yQy7Xa2r1VHYfgqfCuDE6VGDmBt8xJgDVTZ2Bw3xTW0=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
Expand Down
153 changes: 0 additions & 153 deletions pkg/sse/sse.go

This file was deleted.

0 comments on commit 720f240

Please sign in to comment.