diff --git a/core/clients/server.go b/core/clients/server.go index 8e050f1..d8ed098 100644 --- a/core/clients/server.go +++ b/core/clients/server.go @@ -21,6 +21,7 @@ import ( "io" "strconv" "strings" + "time" "github.com/99nil/gopkg/sse" "github.com/go-resty/resty/v2" @@ -74,7 +75,7 @@ type server struct { func (s *server) V1() ServerV1 { addr := strings.TrimSuffix(s.Address, "/") - rc := resty.New().SetBaseURL(addr + "/api/core/v1") + rc := resty.New().SetBaseURL(addr + "/api/core/v1").SetTimeout(time.Minute) return &serverV1{rc: rc} } @@ -363,7 +364,8 @@ func (c *serverV1) LogWatch(ctx context.Context, namespace, name string, number, if strings.ToLower(message.Data) == "eof" { return io.EOF } - return errors.New(message.Data) + errCh <- errors.New(message.Data) + return nil } if message.Event != "data" { return nil diff --git a/core/command/daemon.go b/core/command/daemon.go index 3fb581d..e6d2a07 100644 --- a/core/command/daemon.go +++ b/core/command/daemon.go @@ -82,7 +82,6 @@ func NewDaemon() *cobra.Command { srv.WriteTimeout = 0 srv.Handler = handler.New(log, db, ll, sched) log.Info(fmt.Sprintf("Daemon listen on %s", srv.Addr)) - return srv.RunAndStop(context.Background()) }, } diff --git a/core/handler/handler.go b/core/handler/handler.go index a52e93c..e6e3941 100644 --- a/core/handler/handler.go +++ b/core/handler/handler.go @@ -18,6 +18,7 @@ import ( "context" "log/slog" "net/http" + "regexp" "time" "github.com/99nil/gopkg/ctr" @@ -102,11 +103,17 @@ func serviceMiddleware(log *wslog.Logger, ll livelog.Interface, sched scheduler. } } +var logWatchRe = regexp.MustCompile(`/api/core/.+/box/.+/.+/build/.+/logs/.+/.+`) + func timeoutMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if logWatchRe.MatchString(r.URL.Path) { + next.ServeHTTP(w, r) + return + } + ctx, cancel := context.WithTimeout(r.Context(), constant.DefaultHTTPTimeout) defer cancel() - next.ServeHTTP(w, r.WithContext(ctx)) }) } diff --git a/core/handler/server/log.go b/core/handler/server/log.go index 1c76964..9664baa 100644 --- a/core/handler/server/log.go +++ b/core/handler/server/log.go @@ -16,6 +16,7 @@ package server import ( "context" + "fmt" "io" "net/http" "strconv" @@ -114,6 +115,7 @@ func logWatch() http.HandlerFunc { return } if closeCh == nil { + // TODO step pending 时 livelog 未创建,导致 close nil 的处理 wrapper.InternalError(w, "already closed") return } @@ -128,9 +130,12 @@ func logWatch() http.HandlerFunc { go func() { select { case <-ctx.Done(): + fmt.Println("============= Context Done =============") case <-sender.WaitForClose(): + fmt.Println("============= Sender Done =============") case <-closeCh: errCh <- io.EOF + fmt.Println("============= Watch Close =============") } }() _ = sse.SendLoop[*livelog.Line](ctx, sender, lineCh, errCh)