Skip to content

Commit

Permalink
fix the issue of duplicate log distribution during step completion
Browse files Browse the repository at this point in the history
  • Loading branch information
zc2638 committed Jan 16, 2024
1 parent c7c5d1f commit 37871b4
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
4 changes: 3 additions & 1 deletion core/handler/client/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,14 +509,16 @@ func handleLogUpload() http.HandlerFunc {
return
}

var opts []any
if isAll {
if err := ll.Reset(ctx, stepID); err != nil {
ctr.InternalError(w, err)
return
}
opts = []any{livelog.PublishOption(false)}
}
for _, line := range lines {
if err := ll.Write(ctx, stepID, line); err != nil {
if err := ll.Write(ctx, stepID, line, opts); err != nil {
ctr.InternalError(w, err)
return
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/livelog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (f *file) Watch(ctx context.Context, id string) (<-chan *Line, <-chan struc
return sub.handler, sub.waitForClose(), nil
}

func (f *file) Write(ctx context.Context, id string, line *Line) error {
func (f *file) Write(ctx context.Context, id string, line *Line, args ...any) error {
fi := f.get(id)
if fi == nil {
return fmt.Errorf("log stream not found for %s", id)
Expand All @@ -190,6 +190,11 @@ func (f *file) Write(ctx context.Context, id string, line *Line) error {
}
f.mux.Unlock()

for _, arg := range args {
if v, ok := arg.(PublishOption); ok && !bool(v) {
return nil
}
}
for client := range clients {
client.publish(line)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/livelog/livelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ const LineMaxBuffer = 3000
type Interface interface {
List(ctx context.Context, id string) ([]*Line, error)
Watch(ctx context.Context, id string) (<-chan *Line, <-chan struct{}, error)
Write(ctx context.Context, id string, line *Line) error
Write(ctx context.Context, id string, line *Line, args ...any) error
LineCount(ctx context.Context, id string) int
Reset(ctx context.Context, id string) error
Create(ctx context.Context, id string) error
Delete(ctx context.Context, id string) error
}

type PublishOption bool

type Config struct {
File *ConfigFile `json:"file,omitempty"`
}
Expand Down

0 comments on commit 37871b4

Please sign in to comment.