Skip to content

Commit

Permalink
feat(cli): add --follow flag to logs command (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlemesh committed Oct 1, 2024
1 parent 955bda8 commit 88cfd8a
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 30 deletions.
192 changes: 176 additions & 16 deletions cmd/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ package cmd
import (
"encoding/json"
"fmt"
"io"
"reanahub/reana-client-go/client"
"reanahub/reana-client-go/client/operations"
"reanahub/reana-client-go/pkg/config"
"reanahub/reana-client-go/pkg/displayer"
"reanahub/reana-client-go/pkg/filterer"
"strings"
"time"

"github.com/jedib0t/go-pretty/v6/text"
log "github.com/sirupsen/logrus"

"github.com/spf13/cobra"
"golang.org/x/exp/slices"
Expand All @@ -27,15 +30,18 @@ import (
const logsDesc = `
Get workflow logs.
The ` + "``logs``" + ` command allows to retrieve logs of running workflow. Note that
only finished steps of the workflow are returned, the logs of the currently
processed step is not returned until it is finished.
The ` + "``logs``" + ` command allows to retrieve logs of a running workflow.
Either retrive logs and print the result or follow the logs of a running workflow/job.
Examples:
$ reana-client logs -w myanalysis.42
$ reana-client logs -w myanalysis.42 -s 1st_ste
$ reana-client logs -w myanalysis.42 --json
$ reana-client logs -w myanalysis.42 --filter status=running
$ reana-client logs -w myanalysis.42 --filter step=1st_step --follow
`

const logsFilterFlagDesc = `Filter job logs to include only those steps that
Expand Down Expand Up @@ -65,13 +71,22 @@ type jobLogItem struct {
FinishedAt *string `json:"finished_at"`
}

// logsOptions struct that contains the options of the logs command.
type logsOptions struct {
token string
workflow string
jsonOutput bool
filters []string
page int64
size int64
follow bool
interval int64
}

// logsCommandRunner struct that executes logs command.
type logsCommandRunner struct {
api *client.API
options *logsOptions
}

// newLogsCmd creates a command to get workflow logs.
Expand All @@ -84,7 +99,12 @@ func newLogsCmd() *cobra.Command {
Long: logsDesc,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return o.run(cmd)
api, err := client.ApiClient()
if err != nil {
log.Error(err)

Check warning on line 104 in cmd/logs.go

View check run for this annotation

Codecov / codecov/patch

cmd/logs.go#L104

Added line #L104 was not covered by tests
}
runner := newLogsCommandRunner(api, o)
return runner.run(cmd)
},
}

Expand All @@ -101,12 +121,33 @@ func newLogsCmd() *cobra.Command {
f.StringSliceVar(&o.filters, "filter", []string{}, logsFilterFlagDesc)
f.Int64Var(&o.page, "page", 1, "Results page number (to be used with --size).")
f.Int64Var(&o.size, "size", 0, "Size of results per page (to be used with --page).")
f.BoolVar(
&o.follow,
"follow",
false,
"Follow the logs of the of running workflow or job (similar to `tail -f`).",
)
f.Int64VarP(
&o.interval,
"interval",
"i",
10,
"Sleep time in seconds between log polling if log following is enabled. [default=10]",
)

return cmd
}

func (o *logsOptions) run(cmd *cobra.Command) error {
filters, err := parseLogsFilters(o.filters)
// newLogsCommandRunner creates a new logs command runner.
func newLogsCommandRunner(api *client.API, options *logsOptions) *logsCommandRunner {
return &logsCommandRunner{api: api, options: options}
}

// run executes the logs command.
func (r *logsCommandRunner) run(cmd *cobra.Command) error {
r.validateOptions(cmd.OutOrStdout())

filters, err := parseLogsFilters(r.options.filters)
if err != nil {
return err
}
Expand All @@ -116,25 +157,136 @@ func (o *logsOptions) run(cmd *cobra.Command) error {
}

logsParams := operations.NewGetWorkflowLogsParams()
logsParams.SetAccessToken(&o.token)
logsParams.SetWorkflowIDOrName(o.workflow)
logsParams.SetPage(&o.page)
logsParams.SetAccessToken(&r.options.token)
logsParams.SetWorkflowIDOrName(r.options.workflow)
logsParams.SetPage(&r.options.page)
logsParams.SetSteps(steps)
if cmd.Flags().Changed("size") {
logsParams.SetSize(&o.size)
logsParams.SetSize(&r.options.size)
}

if r.options.follow {
return r.followLogs(logsParams, cmd, steps)
}

return r.retrieveLogs(filters, logsParams, cmd, steps)
}

// followLogs follows the logs of a running workflow or job.
func (r *logsCommandRunner) followLogs(
logsParams *operations.GetWorkflowLogsParams,
cmd *cobra.Command,
steps []string,
) error {
stepLength := len(steps)
var step, previousLogs string
stdout := cmd.OutOrStdout()

if stepLength > 0 {
step = steps[0]
}

if stepLength > 1 {
displayer.DisplayMessage(
"Only one step can be followed at a time, ignoring additional steps.",
displayer.Warning,
false,
stdout,
)
logsParams.SetSteps([]string{step})
}

msg := "Following logs for workflow: " + r.options.workflow
if step != "" {
msg += ", step: " + step
}
displayer.DisplayMessage(msg, displayer.Info, false, stdout)

workflowStatusParams := operations.NewGetWorkflowStatusParams()
workflowStatusParams.SetAccessToken(&r.options.token)
workflowStatusParams.SetWorkflowIDOrName(r.options.workflow)

for {
newLogs, status, err := r.getLogsWithStatus(step, logsParams, workflowStatusParams)
if err != nil {
return err
}

fmt.Fprint(stdout, strings.TrimPrefix(newLogs, previousLogs))

if slices.Contains(config.WorkflowCompletedStatuses, status) {
fmt.Fprintln(stdout)
displayer.DisplayMessage("Finished, status: "+status, displayer.Info, false, stdout)
return nil
}

time.Sleep(time.Duration(r.options.interval) * time.Second)
previousLogs = newLogs
}
}

api, err := client.ApiClient()
// getData retrieves logs and status of a workflow or a job.
func (r *logsCommandRunner) getLogsWithStatus(
step string,
logsParams *operations.GetWorkflowLogsParams,
workflowStatusParams *operations.GetWorkflowStatusParams,
) (string, string, error) {
workflowLogs, err := r.getLogs(logsParams)
if err != nil {
return err
return "", "", err
}
logsResp, err := api.Operations.GetWorkflowLogs(logsParams)

if step != "" {
job := getFirstJob(workflowLogs.JobLogs)
if job == nil {
return "", "", fmt.Errorf("step %s not found", step)
}
return job.Logs, job.Status, nil
}

statusResponse, err := r.api.Operations.GetWorkflowStatus(workflowStatusParams)
if err != nil {
return err
return "", "", err
}

return *workflowLogs.WorkflowLogs, statusResponse.GetPayload().Status, nil
}

// getLogs retrieves logs of a workflow and unmarshals data into logs structure.
func (r *logsCommandRunner) getLogs(logsParams *operations.GetWorkflowLogsParams) (logs, error) {
var workflowLogs logs
logsResp, err := r.api.Operations.GetWorkflowLogs(logsParams)
if err != nil {
return workflowLogs, err
}

err = json.Unmarshal([]byte(logsResp.GetPayload().Logs), &workflowLogs)
if err != nil {
return workflowLogs, err
}
return workflowLogs, nil
}

// validateOptions validates the options of the logs command.
func (r *logsCommandRunner) validateOptions(writer io.Writer) {
if r.options.jsonOutput && r.options.follow {
displayer.DisplayMessage(
"Ignoring --json as it cannot be used together with --follow.",
displayer.Warning,
false,
writer,
)
}
}

// retrieveLogs retrieves and prints logs of a workflow.
func (r *logsCommandRunner) retrieveLogs(
filters filterer.Filters,
logsParams *operations.GetWorkflowLogsParams,
cmd *cobra.Command,
steps []string,
) error {
workflowLogs, err := r.getLogs(logsParams)
if err != nil {
return err
}
Expand All @@ -144,15 +296,23 @@ func (o *logsOptions) run(cmd *cobra.Command) error {
return err
}

if o.jsonOutput {
if r.options.jsonOutput {
err := displayer.DisplayJsonOutput(workflowLogs, cmd.OutOrStdout())
if err != nil {
return err
}
} else {
displayHumanFriendlyLogs(cmd, workflowLogs, steps)
}
return nil
}

// getFirstJob returns the first job in the given map,
// or nil if the map is empty.
func getFirstJob(items map[string]jobLogItem) *jobLogItem {
for _, item := range items {
return &item
}
return nil
}

Expand Down
109 changes: 109 additions & 0 deletions cmd/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,115 @@ func TestLogs(t *testing.T) {
expected: []string{"Field 'page': Must be at least 1."},
wantError: true,
},
"follow workflow": {
serverResponses: map[string]ServerResponse{
fmt.Sprintf(logsPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "logs_complete.json",
},
fmt.Sprintf(statusPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "status_finished.json",
},
},
args: []string{"-w", workflowName, "--follow", "-i", "0"},
expected: []string{
"==> Following logs for workflow: my_workflow",
"workflow logs",
"==> Finished, status: finished",
},
unwanted: []string{
"job1",
"step",
},
},
"follow job with multiple steps, size, interval and json flags": {
serverResponses: map[string]ServerResponse{
fmt.Sprintf(logsPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "logs_running.json",
additionalResponseFiles: []string{"logs_complete.json"},
},
},
args: []string{
"-w",
workflowName,
"--follow",
"--json",
"--filter",
"step=job1",
"--filter",
"step=job2",
"--size",
"1",
"-i",
"0",
},
expected: []string{
"Ignoring --json as it cannot be used together with --follow.",
"Only one step can be followed at a time, ignoring additional steps.",
"==> Following logs for workflow: my_workflow, step: job1",
"workflow 1 logs",
"==> Finished, status: finished",
},
},
"follow job that does not exist": {
serverResponses: map[string]ServerResponse{
fmt.Sprintf(logsPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "logs_empty.json",
},
},
args: []string{"-w", workflowName, "--follow", "--filter", "step=job1"},
expected: []string{
"step job1 not found",
},
wantError: true,
},
"follow logs when server returns logs error": {
serverResponses: map[string]ServerResponse{
fmt.Sprintf(logsPathTemplate, workflowName): {
statusCode: http.StatusInternalServerError,
},
},
args: []string{"-w", workflowName, "--follow"},
wantError: true,
},
"follow logs when server returns status error": {
serverResponses: map[string]ServerResponse{
fmt.Sprintf(logsPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "logs_complete.json",
},
fmt.Sprintf(statusPathTemplate, workflowName): {
statusCode: http.StatusInternalServerError,
},
},
args: []string{"-w", workflowName, "--follow"},
wantError: true,
},
"follow logs when server returns html response": {
serverResponses: map[string]ServerResponse{
fmt.Sprintf(logsPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "page.html",
},
},
args: []string{"-w", workflowName, "--follow"},
expected: []string{"invalid character '<' looking for beginning of value"},
wantError: true,
},
"follow logs when server returns malformed logs": {
serverResponses: map[string]ServerResponse{
fmt.Sprintf(logsPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "logs_malformed.json",
},
},
args: []string{"-w", workflowName, "--follow"},
expected: []string{"invalid character 'm' looking for beginning of value"},
wantError: true,
},
}

for name, params := range tests {
Expand Down
Loading

0 comments on commit 88cfd8a

Please sign in to comment.