Skip to content

Commit

Permalink
stdinservice: Simplify (Remove resource usage monitoring feature)
Browse files Browse the repository at this point in the history
We are not aware of anyone using this.  If you do, please speak up!

PiperOrigin-RevId: 704283192
  • Loading branch information
gnoack authored and copybara-github committed Dec 9, 2024
1 parent c218ff9 commit 1edb5d5
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 107 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ syntax = "proto3";

package fleetspeak.stdinservice;

import "fleetspeak/src/common/proto/fleetspeak_monitoring/resource.proto";
import "google/protobuf/timestamp.proto";

option go_package = "github.com/google/fleetspeak/fleetspeak/src/client/stdinservice/proto/fleetspeak_stdinservice";
Expand All @@ -19,8 +18,8 @@ message OutputMessage {
bytes stdout = 1;
bytes stderr = 2;

fleetspeak.monitoring.AggregatedResourceUsage resource_usage = 3;

// When the message was generated.
google.protobuf.Timestamp timestamp = 4;

reserved 3;
}
55 changes: 16 additions & 39 deletions fleetspeak/src/client/stdinservice/stdinservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"fmt"
"os/exec"

log "github.com/golang/glog"
anypb "google.golang.org/protobuf/types/known/anypb"
tspb "google.golang.org/protobuf/types/known/timestamppb"

"github.com/google/fleetspeak/fleetspeak/src/client/internal/monitoring"
"github.com/google/fleetspeak/fleetspeak/src/client/service"

sspb "github.com/google/fleetspeak/fleetspeak/src/client/stdinservice/proto/fleetspeak_stdinservice"
Expand Down Expand Up @@ -72,8 +70,6 @@ func (s *StdinService) Start(sc service.Context) error {

// ProcessMessage processes an incoming message from the server side.
func (s *StdinService) ProcessMessage(ctx context.Context, m *fspb.Message) error {
om := &sspb.OutputMessage{}

if m.MessageType != "StdinServiceInputMessage" {
return fmt.Errorf("unrecognized common.Message.message_type: %q", m.MessageType)
}
Expand All @@ -83,42 +79,23 @@ func (s *StdinService) ProcessMessage(ctx context.Context, m *fspb.Message) erro
return fmt.Errorf("error while unmarshalling common.Message.data: %v", err)
}

cmd := exec.CommandContext(ctx, s.ssConf.Cmd, im.Args...)

inBuf := bytes.NewBuffer(im.Input)
var outBuf, errBuf bytes.Buffer
var stdout, stderr bytes.Buffer

cmd.Stdin = inBuf
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf

if err := cmd.Start(); err != nil {
return fmt.Errorf("error while starting a process: %v", err)
}

ruf := monitoring.ResourceUsageFetcher{}
initialRU, ruErr := ruf.ResourceUsageForPID(cmd.Process.Pid)
if ruErr != nil {
log.Errorf("Failed to get resource usage for process: %v", ruErr)
}
cmd := exec.CommandContext(ctx, s.ssConf.Cmd, im.Args...)
cmd.Stdin = bytes.NewBuffer(im.Input)
cmd.Stdout = &stdout
cmd.Stderr = &stdout

err := cmd.Wait()
err := cmd.Run()
if err != nil {
return err
}

om.Stdout = outBuf.Bytes()
om.Stderr = errBuf.Bytes()

finalRU := ruf.ResourceUsageFromFinishedCmd(cmd)
aggRU, ruErr := monitoring.AggregateResourceUsageForFinishedCmd(initialRU, finalRU)
if ruErr != nil {
log.Errorf("Aggregation of resource-usage data failed: %v", ruErr)
} else {
om.ResourceUsage = aggRU
om := &sspb.OutputMessage{
Stdout: stdout.Bytes(),
Stderr: stderr.Bytes(),
Timestamp: tspb.Now(),
}

om.Timestamp = tspb.Now()
if err := s.respond(ctx, om); err != nil {
return fmt.Errorf("error while trying to send a response to the requesting server: %v", err)
}
Expand All @@ -133,17 +110,17 @@ func (s *StdinService) Stop() error {

func (s *StdinService) respond(ctx context.Context, om *sspb.OutputMessage) error {
// TODO: Chunk the response into ~1MB parts.
data, err := anypb.New(om)
if err != nil {
return err
}

m := &fspb.Message{
Destination: &fspb.Address{
ServiceName: s.conf.Name,
},
MessageType: "StdinServiceOutputMessage",
}

var err error
m.Data, err = anypb.New(om)
if err != nil {
return err
Data: data,
}

return s.sc.Send(ctx, service.AckMessage{M: m})
Expand Down
12 changes: 0 additions & 12 deletions fleetspeak/src/client/stdinservice/stdinservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,6 @@ while time.time() - t0 < 1.:
t.Fatal(err)
}

if om.ResourceUsage == nil {
t.Fatalf("unexpected output; StdinServiceOutputMessage.resource_usage not set: %q", om)
}

if om.ResourceUsage.MeanUserCpuRate <= 0 {
t.Fatalf("unexpected output; StdinServiceOutputMessage.resource_usage.mean_user_cpu_rate not set: %q", om)
}

if om.ResourceUsage.MeanSystemCpuRate <= 0 {
t.Fatalf("unexpected output; StdinServiceOutputMessage.resource_usage.mean_system_cpu_rate not set: %q", om)
}

// We don't test for ResourceUsage.MeanResidentMemory because memory is currently not being
// queried after the process has terminated. It's only queried right after launching the command
// in which case it can be recorded as "0" which would be indistinguishable from it not being set
Expand Down

0 comments on commit 1edb5d5

Please sign in to comment.