From 8dfa889d6fd408cdadf7b2c302eac2e0ef705456 Mon Sep 17 00:00:00 2001 From: Sveinn Date: Mon, 21 Oct 2024 11:53:35 -0400 Subject: [PATCH] New live-feed, new analysis options and CSV export (#18) --- README.md | 79 +++++--- client/client.go | 307 ++++++++++++++++++++++++++--- client/table.go | 120 +++++++++-- cmd/hperf/analyze.go | 126 +----------- cmd/hperf/csv.go | 54 +++++ cmd/hperf/{stat.go => download.go} | 20 +- cmd/hperf/main.go | 30 ++- cmd/hperf/server.go | 19 +- server/server.go | 72 +++++-- shared/shared.go | 43 +++- 10 files changed, 633 insertions(+), 237 deletions(-) create mode 100644 cmd/hperf/csv.go rename cmd/hperf/{stat.go => download.go} (71%) diff --git a/README.md b/README.md index 88471e5..b4f46e7 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,7 @@ hperf is a tool for active measurements of the maximum achievable bandwidth between N peers, measuring RX/TX bandwidth for each peers. ## What is hperf for -Hperf was made to test networks in large infrastructure. It's highly scalable and cabaple of running parallel tests over -a long period of time. +Hperf was made to test networks in large infrastructure. It's highly scalable and capable of running parallel tests over a long period of time. ## Common use cases - Debugging link/nic MTU issues @@ -19,18 +18,19 @@ a long period of time. The binary can act as both client and server. ### Client -The client part of hperf is responsible for orchestrating the servers. Its only job is to send commands to the -servers and receive incremental stats updates. It can be executed from any machine that can talk to the servers. +The client part of hperf is responsible for orchestrating the servers. Its only job is to send commands to the servers and receive incremental stats updates. It can be executed from any machine that can talk to the servers. ### Servers -Servers are the machines we are testing. To launch the hperf command in servers mode, simply use the `server` command: -NOTE: `server` is the only command you can execute on the servers. All other commands are executed from the client. +Servers are the machines we are testing. To launch the hperf command in servers mode use the `server` command: + ```bash $ ./hperf server --help ``` + This command will start an API and websocket on the given `--address` and save test results to `--storage-path`. WARNING: do not expose `--address` to the internet +NOTE: if the `--address` is not the same as your external IP addres used for communications between servers then you need to set `--real-ip`, otherwise the server will report internal IPs in the stats and it will run the test against itself, causing invalid results. ### The listen command Hperf can run tests without a specific `client` needing to be constantly connected. Once the `client` has started a test, the `client` can @@ -57,13 +57,14 @@ go install github.com/minio/hperf/cmd/hperf@latest ### Server Run server with default settings: -NOTE: this will place all test result files in the same directory. +NOTE: this will place all test result files in the same directory and use 0.0.0.0 as bind ip. We do not recommend this for larger tests. ```bash $ ./hperf server ``` -Run the server with custom `--address` and `--storage-path` + +Run the server with custom `--address`, `--real-ip` and `--storage-path` ```bash -$ ./hperf server --address 10.10.2.10:5000 --storage-path /tmp/hperf/ +$ ./hperf server --address 10.10.2.10:5000 --real-ip 150.150.20.2 --storage-path /tmp/hperf/ ``` ### Client @@ -85,45 +86,48 @@ host per file line. NOTE: Be careful not to re-use the ID's if you care about fetching results at a later date. ```bash -# get test results -./hperf stat --hosts 1.1.1.{1...100} --id [my_test_id] -# save test results -./hperf stat --hosts 1.1.1.{1...100} --id [my_test_id] --output /tmp/test.out - -# analyze test results -./hperf analyze --file /tmp/test.out - # listen in on a running test ./hperf listen --hosts 1.1.1.{1...100} --id [my_test_id] # stop a running test ./hperf stop --hosts 1.1.1.{1...100} --id [my_test_id] + +# download test results +./hperf download --hosts 1.1.1.{1...100} --id [my_test_id] --file /tmp/test.out + +# analyze test results +./hperf analyze --file /tmp/test.out +# analyze test results with full print output +./hperf analyze --file /tmp/test.out --print-stats --print-errors + +# Generate a .csv file from a .json test file +./hperf csv --file /tmp/test.out ``` ## Analysis -The analyze command will print statistics for the 10th and 90th percentiles and all datapoints in between. -The format used is: +The analyze command will print statistics for the 10th and 90th percentiles and all datapoints in between. Additionally, you can use the `--print-stats` and `--print-erros` flags for a more verbose output. + +The analysis will show: - 10th percentile: total, low, avarage, high - in between: total, low, avarage, high - 90th percentile: total, low, avarage, high -## Available Statistics +## Statistics - Payload Roundtrip (RMS high/low): - Payload transfer time (Microseconds) - Time to first byte (TTFB high/low): - This is the amount of time (Microseconds) it takes between a request being made and the first byte being requested by the receiver - - Transferred bytes (TX): + - Transferred bytes (TX high/low): - Bandwidth throughput in KB/s, MB/s, GB/s, etc.. + - Transferred bytes (TX total): + - Total transferred bytes (not per second) - Request count (#TX): - The number of HTTP/s requests made - Error Count (#ERR): - Number of encountered errors - Dropped Packets (#Dropped): - - Total dropped packets on the server (total for all time) - - Memory (MemUsed): - - Total memory in use (total for all time) - - CPU (CPUUsed): - - Total memory in use (total for all time) + - Memory (Mem high/low/used): + - CPU (CPU high/low/used): ## Example: 20 second HTTP payload transfer test using multiple sockets This test will use 12 concurrent workers to send http requests with a payload without any timeout between requests. @@ -138,11 +142,30 @@ This will perform a 20 second bandwidth test with 12 concurrent HTTP streams: $ ./hperf bandwidth --hosts file:./hosts --id http-test-2 --duration 20 --concurrency 12 ``` -## Example: 5 Minute latency test using a 2000 Byte buffer, with a delay of 50ms between requests +## Example: 5 Minute latency test using a 1000 Byte buffer, with a delay of 50ms between requests This test will send a single round trip request between servers to test base latency and reachability: ``` $ ./hperf latency --hosts file:./hosts --id http-test-2 --duration 360 --concurrency 1 --requestDelay 50 ---bufferSize 2000 --payloadSize 2000 +--bufferSize 1000 --payloadSize 1000 +``` + +# Full test scenario with analysis and csv export +## On the server +```bash +$ ./hperf server --address 10.10.2.10:5000 --real-ip 150.150.20.2 --storage-path /tmp/hperf/ ``` +## The client + + + + + + + + + + + + diff --git a/client/client.go b/client/client.go index 6c675bd..5f0f565 100644 --- a/client/client.go +++ b/client/client.go @@ -18,13 +18,17 @@ package client import ( + "bufio" "bytes" "context" + "encoding/csv" "encoding/json" "errors" "fmt" + "math" "net/http" "os" + "reflect" "runtime/debug" "slices" "strconv" @@ -209,7 +213,7 @@ func handleWSConnection(ctx context.Context, c *shared.Config, host string, id i } switch signal.SType { case shared.Stats: - go praseDataPoint(signal.DataPoint, c) + go collectDataPointv2(signal.DataPoint) case shared.ListTests: go parseTestList(signal.TestList) case shared.GetTest: @@ -242,28 +246,34 @@ func receiveJSONDataPoint(data []byte, _ *shared.Config) { responseLock.Lock() defer responseLock.Unlock() - if bytes.Contains(data, []byte("Error")) { + if bytes.HasPrefix(data, shared.ErrorPoint.String()) { dp := new(shared.TError) - err := json.Unmarshal(data, &dp) + err := json.Unmarshal(data[1:], &dp) if err != nil { PrintError(err) return } responseERR = append(responseERR, *dp) - } else { + } else if bytes.HasPrefix(data, shared.DataPoint.String()) { dp := new(shared.DP) - err := json.Unmarshal(data, &dp) + err := json.Unmarshal(data[1:], &dp) if err != nil { PrintError(err) return } responseDPS = append(responseDPS, *dp) + } else { + PrintError(fmt.Errorf("Uknown data point: %s", data)) } } -func keepAliveLoop(ctx context.Context, tickerfunc func() (shouldExit bool)) error { +func keepAliveLoop(ctx context.Context, c *shared.Config, tickerfunc func() (shouldExit bool)) error { + start := time.Now() for ctx.Err() == nil { time.Sleep(1 * time.Second) + if time.Since(start).Seconds() > float64(c.Duration)+20 { + return errors.New("Total duration reached 20 seconds past the configured duration") + } select { case <-ctx.Done(): @@ -298,7 +308,7 @@ func Listen(ctx context.Context, c shared.Config) (err error) { } }) - return keepAliveLoop(ctx, nil) + return keepAliveLoop(ctx, &c, nil) } func Stop(ctx context.Context, c shared.Config) (err error) { @@ -316,7 +326,7 @@ func Stop(ctx context.Context, c shared.Config) (err error) { } }) - return keepAliveLoop(ctx, nil) + return keepAliveLoop(ctx, &c, nil) } func RunTest(ctx context.Context, c shared.Config) (err error) { @@ -334,7 +344,74 @@ func RunTest(ctx context.Context, c shared.Config) (err error) { } }) - return keepAliveLoop(ctx, nil) + printCount := 0 + + printOnTick := func() bool { + if len(responseDPS) == 0 { + return false + } + printCount++ + + to := new(shared.TestOutput) + to.ErrCount = len(responseERR) + to.TXL = math.MaxInt64 + to.RMSL = math.MaxInt64 + to.TTFBL = math.MaxInt64 + to.ML = responseDPS[0].MemoryUsedPercent + to.CL = responseDPS[0].CPUUsedPercent + tt := responseDPS[0].Type + + for i := range responseDPS { + to.TXC += responseDPS[i].TXCount + to.TXT += responseDPS[i].TXTotal + to.DP += responseDPS[i].DroppedPackets + + if to.TXL > responseDPS[i].TX { + to.TXL = responseDPS[i].TX + } + if to.RMSL > responseDPS[i].RMSL { + to.RMSL = responseDPS[i].RMSL + } + if to.TTFBL > responseDPS[i].TTFBL { + to.TTFBL = responseDPS[i].TTFBL + } + if to.ML > responseDPS[i].MemoryUsedPercent { + to.ML = responseDPS[i].MemoryUsedPercent + } + if to.CL > responseDPS[i].CPUUsedPercent { + to.CL = responseDPS[i].CPUUsedPercent + } + + if to.TXH < responseDPS[i].TX { + to.TXH = responseDPS[i].TX + } + if to.RMSH < responseDPS[i].RMSH { + to.RMSH = responseDPS[i].RMSH + } + if to.TTFBH < responseDPS[i].TTFBH { + to.TTFBH = responseDPS[i].TTFBH + } + if to.MH < responseDPS[i].MemoryUsedPercent { + to.MH = responseDPS[i].MemoryUsedPercent + } + if to.CH < responseDPS[i].CPUUsedPercent { + to.CH = responseDPS[i].CPUUsedPercent + } + } + + for i := range responseERR { + fmt.Println(responseERR[i]) + } + + if printCount%10 == 1 { + printRealTimeHeaders(tt) + } + printRealTimeRow(SuccessStyle, to, tt) + + return false + } + + return keepAliveLoop(ctx, &c, printOnTick) } func ListTests(ctx context.Context, c shared.Config) (err error) { @@ -352,7 +429,7 @@ func ListTests(ctx context.Context, c shared.Config) (err error) { } }) - err = keepAliveLoop(ctx, nil) + err = keepAliveLoop(ctx, &c, nil) if err != nil { return } @@ -400,7 +477,7 @@ func DeleteTests(ctx context.Context, c shared.Config) (err error) { } }) - return keepAliveLoop(ctx, nil) + return keepAliveLoop(ctx, &c, nil) } func parseTestList(list []shared.TestInfo) { @@ -415,7 +492,7 @@ func parseTestList(list []shared.TestInfo) { } } -func GetTest(ctx context.Context, c shared.Config) (err error) { +func DownloadTest(ctx context.Context, c shared.Config) (err error) { cancelContext, cancel := context.WithCancel(ctx) defer cancel() err = initializeClient(cancelContext, &c) @@ -431,7 +508,7 @@ func GetTest(ctx context.Context, c shared.Config) (err error) { } }) - _ = keepAliveLoop(ctx, nil) + _ = keepAliveLoop(ctx, &c, nil) slices.SortFunc(responseERR, func(a shared.TError, b shared.TError) int { if a.Created.Before(b.Created) { @@ -449,39 +526,209 @@ func GetTest(ctx context.Context, c shared.Config) (err error) { } }) - if c.Output != "" { - f, err := os.Create(c.Output) + f, err := os.Create(c.File) + if err != nil { + return err + } + defer f.Close() + for i := range responseDPS { + _, err := shared.WriteStructAndNewLineToFile(f, shared.DataPoint, responseDPS[i]) if err != nil { return err } - for i := range responseDPS { - _, err := shared.WriteStructAndNewLineToFile(f, responseDPS[i]) + } + for i := range responseERR { + _, err := shared.WriteStructAndNewLineToFile(f, shared.ErrorPoint, responseERR[i]) + if err != nil { + return err + } + } + + return nil +} + +func AnalyzeTest(ctx context.Context, c shared.Config) (err error) { + _, cancel := context.WithCancel(ctx) + defer cancel() + + f, err := os.Open(c.File) + if err != nil { + return err + } + defer f.Close() + + dps := make([]shared.DP, 0) + errors := make([]shared.TError, 0) + + s := bufio.NewScanner(f) + for s.Scan() { + b := s.Bytes() + if bytes.HasPrefix(b[1:], shared.ErrorPoint.String()) { + dperr := new(shared.TError) + err := json.Unmarshal(b, dperr) if err != nil { return err } - } - for i := range responseERR { - _, err := shared.WriteStructAndNewLineToFile(f, responseERR[i]) + errors = append(errors, *dperr) + } else if bytes.HasPrefix(b, shared.DataPoint.String()) { + dp := new(shared.DP) + err := json.Unmarshal(b[1:], dp) if err != nil { return err } + dps = append(dps, *dp) + } else { + shared.DEBUG(ErrorStyle.Render("Unknown data point encountered: ", string(b))) } - return nil } - printDataPointHeaders(responseDPS[0].Type) - for i := range responseDPS { - dp := responseDPS[i] - sp1 := strings.Split(dp.Local, ":") - sp2 := strings.Split(sp1[0], ".") - s1 := lipgloss.NewStyle().Background(lipgloss.Color(getHex(sp2[len(sp2)-1]))) - printTableRow(s1, &dp, dp.Type) + if c.PrintFull { + printDataPointHeaders(dps[0].Type) + for i := range dps { + dp := dps[i] + sp1 := strings.Split(dp.Local, ":") + sp2 := strings.Split(sp1[0], ".") + s1 := lipgloss.NewStyle().Background(lipgloss.Color(getHex(sp2[len(sp2)-1]))) + printTableRow(s1, &dp, dp.Type) + } } - for i := range responseERR { - PrintTError(responseERR[i]) + if c.PrintErrors { + for i := range errors { + PrintTError(errors[i]) + } + } + + dps10 := math.Ceil((float64(len(dps)) / 100) * 10) + dps90 := math.Floor((float64(len(dps)) / 100) * 90) + + slices.SortFunc(dps, func(a shared.DP, b shared.DP) int { + if a.RMSH < b.RMSH { + return -1 + } else { + return 1 + } + }) + + dps10s := make([]shared.DP, 0) + dps50s := make([]shared.DP, 0) + dps90s := make([]shared.DP, 0) + + // total, sum, low, mean, high + dps10stats := []int64{0, 0, math.MaxInt64, 0, 0} + dps50stats := []int64{0, 0, math.MaxInt64, 0, 0} + dps90stats := []int64{0, 0, math.MaxInt64, 0, 0} + + for i := range dps { + if i <= int(dps10) { + dps10s = append(dps10s, dps[i]) + updateBracketStats(dps10stats, dps[i]) + } else if i >= int(dps90) { + dps90s = append(dps90s, dps[i]) + updateBracketStats(dps90stats, dps[i]) + } else { + dps50s = append(dps50s, dps[i]) + updateBracketStats(dps50stats, dps[i]) + } } + fmt.Println("") + fmt.Println("") + fmt.Println("") + + fmt.Println(" First 10% of data points") + printBracker(dps10stats, SuccessStyle) + fmt.Println("") + fmt.Println(" Between 10% and 90%") + printBracker(dps50stats, WarningStyle) + fmt.Println("") + fmt.Println(" Last 10% of data points") + printBracker(dps90stats, ErrorStyle) + fmt.Println("") return nil } + +func printBracker(b []int64, style lipgloss.Style) { + fmt.Println(style.Render( + fmt.Sprintf(" Total %d | Low %d | Avg %d | High %d | Microseconds ", + b[0], + b[2], + b[3], + b[4], + ), + )) +} + +func updateBracketStats(b []int64, dp shared.DP) { + b[0]++ + b[1] += dp.RMSH + if dp.RMSH < b[2] { + b[2] = dp.RMSH + } + b[3] = b[1] / b[0] + if dp.RMSH > b[4] { + b[4] = dp.RMSH + } +} + +func MakeCSV(ctx context.Context, c shared.Config) (err error) { + byteValue, err := os.ReadFile(c.File) + if err != nil { + return err + } + + file, err := os.Create(c.File + ".csv") + if err != nil { + return err + } + defer file.Close() + + fb := bytes.NewBuffer(byteValue) + scanner := bufio.NewScanner(fb) + + writer := csv.NewWriter(file) + defer writer.Flush() + if err := writer.Write(getStructFields(new(shared.DP))); err != nil { + return err + } + + for scanner.Scan() { + b := scanner.Bytes() + if bytes.HasPrefix(b, shared.DataPoint.String()) { + dp := new(shared.DP) + err = json.Unmarshal(b[1:], dp) + if err != nil { + return err + } + + if err := writer.Write(dpToSlice(dp)); err != nil { + return err + } + } + } + + return nil +} + +// Function to get field names of the struct +func getStructFields(s interface{}) []string { + t := reflect.TypeOf(s).Elem() + fields := make([]string, t.NumField()) + for i := 0; i < t.NumField(); i++ { + fields[i] = t.Field(i).Tag.Get("json") + if fields[i] == "" { + fields[i] = t.Field(i).Name + } + } + return fields +} + +func dpToSlice(dp *shared.DP) (data []string) { + v := reflect.ValueOf(dp).Elem() + data = make([]string, v.NumField()) + for i := 0; i < v.NumField(); i++ { + data[i] = fmt.Sprintf("%v", v.Field(i).Interface()) + } + return +} diff --git a/client/table.go b/client/table.go index aa34ff0..e4b1e9f 100644 --- a/client/table.go +++ b/client/table.go @@ -37,7 +37,7 @@ type column struct { width int } -var headerSlice = make([]header, 16) +var headerSlice = make([]header, header_length) type HeaderField int @@ -46,18 +46,26 @@ const ( Created Local Remote - PMSH - PMSL + RMSH + RMSL TTFBH TTFBL TX + TXH + TXL + TXT TXCount ErrCount DroppedPackets MemoryUsage + MemoryHigh + MemoryLow CPUUsage + CPUHigh + CPULow ID HumanTime + header_length ) func initHeaders() { @@ -65,16 +73,23 @@ func initHeaders() { headerSlice[Created] = header{"Created", 8} headerSlice[Local] = header{"Local", 15} headerSlice[Remote] = header{"Remote", 15} - headerSlice[PMSH] = header{"RMSH", 8} - headerSlice[PMSL] = header{"RMSL", 8} + headerSlice[RMSH] = header{"RMSH", 8} + headerSlice[RMSL] = header{"RMSL", 8} headerSlice[TTFBH] = header{"TTFBH", 8} headerSlice[TTFBL] = header{"TTFBL", 8} headerSlice[TX] = header{"TX", 10} + headerSlice[TXL] = header{"TXLow", 10} + headerSlice[TXH] = header{"TXHigh", 10} + headerSlice[TXT] = header{"TXTotal", 15} headerSlice[TXCount] = header{"#TX", 10} headerSlice[ErrCount] = header{"#ERR", 6} headerSlice[DroppedPackets] = header{"#Dropped", 9} headerSlice[MemoryUsage] = header{"MemUsed", 7} + headerSlice[MemoryHigh] = header{"MemHigh", 7} + headerSlice[MemoryLow] = header{"MemLow", 7} headerSlice[CPUUsage] = header{"CPUUsed", 7} + headerSlice[CPUHigh] = header{"CPUHigh", 7} + headerSlice[CPULow] = header{"CPULow", 7} headerSlice[ID] = header{"ID", 30} headerSlice[HumanTime] = header{"Time", 30} } @@ -89,9 +104,13 @@ func GenerateFormatString(columnCount int) (fs string) { var ( ListHeaders = []HeaderField{IntNumber, ID, HumanTime} BandwidthHeaders = []HeaderField{Created, Local, Remote, TX, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} - LatencyHeaders = []HeaderField{Created, Local, Remote, PMSH, PMSL, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} - HTTPHeaders = []HeaderField{Created, Local, Remote, PMSH, PMSL, TTFBH, TTFBL, TX, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} - FullDataPointHeaders = []HeaderField{Created, Local, Remote, PMSH, PMSL, TTFBH, TTFBL, TX, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} + LatencyHeaders = []HeaderField{Created, Local, Remote, RMSH, RMSL, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} + HTTPHeaders = []HeaderField{Created, Local, Remote, RMSH, RMSL, TTFBH, TTFBL, TX, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} + FullDataPointHeaders = []HeaderField{Created, Local, Remote, RMSH, RMSL, TTFBH, TTFBL, TX, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} + + RealTimeLatencyHeaders = []HeaderField{ErrCount, TXCount, RMSH, RMSL, DroppedPackets, MemoryHigh, MemoryLow, CPUHigh, CPULow} + RealTimeBandwidthHeaders = []HeaderField{ErrCount, TXCount, TXH, TXL, TXT, DroppedPackets, MemoryHigh, MemoryLow, CPUHigh, CPULow} + RealTimeHTTPHeaders = []HeaderField{ErrCount, TXCount, TXH, TXL, TXT, RMSH, RMSL, TTFBH, TTFBL, DroppedPackets, MemoryHigh, MemoryLow, CPUHigh, CPULow} ) var ( @@ -140,6 +159,71 @@ func printDataPointHeaders(t shared.TestType) { } } +func printRealTimeHeaders(t shared.TestType) { + switch t { + case shared.BandwidthTest: + printHeader(RealTimeBandwidthHeaders) + case shared.LatencyTest: + printHeader(RealTimeLatencyHeaders) + case shared.HTTPTest: + printHeader(RealTimeHTTPHeaders) + default: + } +} + +func printRealTimeRow(style lipgloss.Style, entry *shared.TestOutput, t shared.TestType) { + switch t { + case shared.LatencyTest: + PrintColumns( + style, + column{formatInt(int64(entry.ErrCount)), headerSlice[ErrCount].width}, + column{formatUint(entry.TXC), headerSlice[TXCount].width}, + column{formatInt(entry.RMSH), headerSlice[RMSH].width}, + column{formatInt(entry.RMSL), headerSlice[RMSL].width}, + column{formatInt(int64(entry.DP)), headerSlice[DroppedPackets].width}, + column{formatInt(int64(entry.MH)), headerSlice[MemoryHigh].width}, + column{formatInt(int64(entry.ML)), headerSlice[MemoryLow].width}, + column{formatInt(int64(entry.CH)), headerSlice[CPUHigh].width}, + column{formatInt(int64(entry.CL)), headerSlice[CPULow].width}, + ) + return + case shared.BandwidthTest: + PrintColumns( + style, + column{formatInt(int64(entry.ErrCount)), headerSlice[ErrCount].width}, + column{formatUint(entry.TXH), headerSlice[TXH].width}, + column{formatUint(entry.TXL), headerSlice[TXL].width}, + column{formatUint(entry.TXT), headerSlice[TXT].width}, + column{formatInt(int64(entry.DP)), headerSlice[DroppedPackets].width}, + column{formatInt(int64(entry.MH)), headerSlice[MemoryHigh].width}, + column{formatInt(int64(entry.ML)), headerSlice[MemoryLow].width}, + column{formatInt(int64(entry.CH)), headerSlice[CPUHigh].width}, + column{formatInt(int64(entry.CL)), headerSlice[CPULow].width}, + ) + return + case shared.HTTPTest: + PrintColumns( + style, + column{formatInt(int64(entry.ErrCount)), headerSlice[ErrCount].width}, + column{formatUint(entry.TXC), headerSlice[TXCount].width}, + column{formatUint(entry.TXH), headerSlice[TXH].width}, + column{formatUint(entry.TXL), headerSlice[TXL].width}, + column{formatUint(entry.TXT), headerSlice[TXT].width}, + column{formatInt(entry.RMSH), headerSlice[RMSH].width}, + column{formatInt(entry.RMSL), headerSlice[RMSL].width}, + column{formatInt(entry.TTFBH), headerSlice[TTFBH].width}, + column{formatInt(entry.TTFBL), headerSlice[TTFBL].width}, + column{formatInt(int64(entry.DP)), headerSlice[DroppedPackets].width}, + column{formatInt(int64(entry.MH)), headerSlice[MemoryHigh].width}, + column{formatInt(int64(entry.ML)), headerSlice[MemoryLow].width}, + column{formatInt(int64(entry.CH)), headerSlice[CPUHigh].width}, + column{formatInt(int64(entry.CL)), headerSlice[CPULow].width}, + ) + default: + shared.DEBUG("Unknown test type, not printing table") + } +} + func printTableRow(style lipgloss.Style, entry *shared.DP, t shared.TestType) { switch t { case shared.LatencyTest: @@ -148,8 +232,8 @@ func printTableRow(style lipgloss.Style, entry *shared.DP, t shared.TestType) { column{entry.Created.Format("15:04:05"), headerSlice[Created].width}, column{strings.Split(entry.Local, ":")[0], headerSlice[Local].width}, column{strings.Split(entry.Remote, ":")[0], headerSlice[Remote].width}, - column{formatInt(entry.RMSH), headerSlice[PMSH].width}, - column{formatInt(entry.RMSL), headerSlice[PMSL].width}, + column{formatInt(entry.RMSH), headerSlice[RMSH].width}, + column{formatInt(entry.RMSL), headerSlice[RMSL].width}, column{formatUint(entry.TXCount), headerSlice[TXCount].width}, column{formatInt(int64(entry.ErrCount)), headerSlice[ErrCount].width}, column{formatInt(int64(entry.DroppedPackets)), headerSlice[DroppedPackets].width}, @@ -176,8 +260,8 @@ func printTableRow(style lipgloss.Style, entry *shared.DP, t shared.TestType) { column{entry.Created.Format("15:04:05"), headerSlice[Created].width}, column{strings.Split(entry.Local, ":")[0], headerSlice[Local].width}, column{strings.Split(entry.Remote, ":")[0], headerSlice[Remote].width}, - column{formatInt(entry.RMSH), headerSlice[PMSH].width}, - column{formatInt(entry.RMSL), headerSlice[PMSL].width}, + column{formatInt(entry.RMSH), headerSlice[RMSH].width}, + column{formatInt(entry.RMSL), headerSlice[RMSL].width}, column{formatInt(entry.TTFBH), headerSlice[TTFBH].width}, column{formatInt(entry.TTFBL), headerSlice[TTFBH].width}, column{shared.BandwidthBytesToString(entry.TX), headerSlice[TX].width}, @@ -192,6 +276,18 @@ func printTableRow(style lipgloss.Style, entry *shared.DP, t shared.TestType) { } } +func collectDataPointv2(r *shared.DataReponseToClient) { + if r == nil { + return + } + + responseLock.Lock() + defer responseLock.Unlock() + + responseDPS = append(responseDPS, r.DPS...) + responseERR = append(responseERR, r.Errors...) +} + func praseDataPoint(r *shared.DataReponseToClient, c *shared.Config) { if r == nil { return diff --git a/cmd/hperf/analyze.go b/cmd/hperf/analyze.go index 25d935f..cc23f5e 100644 --- a/cmd/hperf/analyze.go +++ b/cmd/hperf/analyze.go @@ -18,19 +18,8 @@ package main import ( - "bufio" - "bytes" - "context" - "encoding/json" - "fmt" - "math" - "os" - "slices" - - "github.com/charmbracelet/lipgloss" "github.com/minio/cli" "github.com/minio/hperf/client" - "github.com/minio/hperf/shared" ) var analyzeCMD = cli.Command{ @@ -42,6 +31,8 @@ var analyzeCMD = cli.Command{ hostsFlag, portFlag, fileFlag, + printStatsFlag, + printErrFlag, }, CustomHelpTemplate: `NAME: {{.HelpName}} - {{.Usage}} @@ -55,6 +46,8 @@ FLAGS: EXAMPLES: 1. Analyze test results in file '/tmp/latency-test-1': {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1 --file latency-test-1 + 1. Analyze test results and print full output: + {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1 --file latency-test-1 --print-full `, } @@ -63,114 +56,5 @@ func runAnalyze(ctx *cli.Context) error { if err != nil { return err } - return AnalyzeTest(GlobalContext, *config) -} - -func AnalyzeTest(ctx context.Context, c shared.Config) (err error) { - _, cancel := context.WithCancel(ctx) - defer cancel() - - f, err := os.Open(c.File) - if err != nil { - return err - } - - dps := make([]shared.DP, 0) - errors := make([]shared.TError, 0) - - s := bufio.NewScanner(f) - for s.Scan() { - b := s.Bytes() - if !bytes.Contains(b, []byte("Error")) { - dp := new(shared.DP) - err := json.Unmarshal(b, dp) - if err != nil { - return err - } - dps = append(dps, *dp) - } else { - dperr := new(shared.TError) - err := json.Unmarshal(b, dperr) - if err != nil { - return err - } - errors = append(errors, *dperr) - } - } - - // adjust stats - for i := range dps { - // Highest RMSH can never be 0, but it's the default value of golang int64. - // if we find a 0 we just set it to an impossibly high value. - if dps[i].RMSH == 0 { - dps[i].RMSH = 999999999 - } - } - - dps10 := math.Ceil((float64(len(dps)) / 100) * 10) - dps90 := math.Floor((float64(len(dps)) / 100) * 90) - - slices.SortFunc(dps, func(a shared.DP, b shared.DP) int { - if a.RMSH < b.RMSH { - return -1 - } else { - return 1 - } - }) - - dps10s := make([]shared.DP, 0) - dps50s := make([]shared.DP, 0) - dps90s := make([]shared.DP, 0) - - // total, sum, low, mean, high - dps10stats := []int64{0, 0, 999999999, 0, 0} - dps50stats := []int64{0, 0, 999999999, 0, 0} - dps90stats := []int64{0, 0, 999999999, 0, 0} - - for i := range dps { - if i <= int(dps10) { - dps10s = append(dps10s, dps[i]) - updateBracketStats(dps10stats, dps[i]) - } else if i >= int(dps90) { - dps90s = append(dps90s, dps[i]) - updateBracketStats(dps90stats, dps[i]) - } else { - dps50s = append(dps50s, dps[i]) - updateBracketStats(dps50stats, dps[i]) - } - } - - for i := range errors { - client.PrintTError(errors[i]) - } - - printBracker(dps10stats, "? < 10%", client.SuccessStyle) - printBracker(dps50stats, "10% < ? < 90%", client.WarningStyle) - printBracker(dps90stats, "? > 90%", client.ErrorStyle) - - return nil -} - -func printBracker(b []int64, tag string, style lipgloss.Style) { - fmt.Println(style.Render( - fmt.Sprintf(" %s | Total %d | Low %d | Avg %d | High %d | Microseconds ", - tag, - b[0], - b[2], - b[3], - b[4], - ), - )) -} - -func updateBracketStats(b []int64, dp shared.DP) { - b[0]++ - b[1] += dp.RMSH - if dp.RMSH < b[2] { - b[2] = dp.RMSH - } - b[3] = b[1] / b[0] - if dp.RMSH > b[4] { - b[4] = dp.RMSH - } + return client.AnalyzeTest(GlobalContext, *config) } diff --git a/cmd/hperf/csv.go b/cmd/hperf/csv.go new file mode 100644 index 0000000..047d8a3 --- /dev/null +++ b/cmd/hperf/csv.go @@ -0,0 +1,54 @@ +// Copyright (c) 2015-2024 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "github.com/minio/cli" + "github.com/minio/hperf/client" +) + +var csvCMD = cli.Command{ + Name: "csv", + Usage: "Transform a test file to csv file", + Action: runCSV, + Flags: []cli.Flag{ + fileFlag, + }, + CustomHelpTemplate: `NAME: + {{.HelpName}} - {{.Usage}} + +USAGE: + {{.HelpName}} [FLAGS] + +FLAGS: + {{range .VisibleFlags}}{{.}} + {{end}} +EXAMPLES: + 1. Transform a test file to csv file: + {{.Prompt}} {{.HelpName}} --file /tmp/output-file +`, +} + +func runCSV(ctx *cli.Context) error { + config, err := parseConfig(ctx) + if err != nil { + return err + } + + return client.MakeCSV(GlobalContext, *config) +} diff --git a/cmd/hperf/stat.go b/cmd/hperf/download.go similarity index 71% rename from cmd/hperf/stat.go rename to cmd/hperf/download.go index 2fbd7bd..a541229 100644 --- a/cmd/hperf/stat.go +++ b/cmd/hperf/download.go @@ -22,16 +22,16 @@ import ( "github.com/minio/hperf/client" ) -var statTestsCMD = cli.Command{ - Name: "stat", - Usage: "print stats for a given test from the selected hosts", - Action: runStat, +var statDownloadCMD = cli.Command{ + Name: "download", + Usage: "Download stats for tests by ID", + Action: runDownload, Flags: []cli.Flag{ dnsServerFlag, hostsFlag, portFlag, testIDFlag, - outputFlag, + fileFlag, }, CustomHelpTemplate: `NAME: {{.HelpName}} - {{.Usage}} @@ -43,18 +43,16 @@ FLAGS: {{range .VisibleFlags}}{{.}} {{end}} EXAMPLES: - 1. Print stats by ID for hosts '10.10.10.1' and '10.10.10.2': - {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id - 2. Save stats by ID for hosts '10.10.10.1' and '10.10.10.2': - {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id --output /tmp/output-file + 1. Download test by ID for hosts '10.10.10.1' and '10.10.10.2': + {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id --file /tmp/output-file `, } -func runStat(ctx *cli.Context) error { +func runDownload(ctx *cli.Context) error { config, err := parseConfig(ctx) if err != nil { return err } - return client.GetTest(GlobalContext, *config) + return client.DownloadTest(GlobalContext, *config) } diff --git a/cmd/hperf/main.go b/cmd/hperf/main.go index 8212708..5b5b2a0 100644 --- a/cmd/hperf/main.go +++ b/cmd/hperf/main.go @@ -26,10 +26,10 @@ import ( "os" "os/signal" "runtime" + "strconv" "syscall" "time" - "github.com/google/uuid" "github.com/minio/cli" "github.com/minio/hperf/client" "github.com/minio/hperf/shared" @@ -137,10 +137,6 @@ var ( Name: "id", Usage: "specify custom ID per test", } - outputFlag = cli.StringFlag{ - Name: "output", - Usage: "set output file path/name", - } fileFlag = cli.StringFlag{ Name: "file", Usage: "input file path", @@ -155,6 +151,14 @@ var ( EnvVar: "HPERF_DNS_SERVER", Usage: "use a custom DNS server to resolve hosts", } + printStatsFlag = cli.BoolFlag{ + Name: "print-stats", + Usage: "Print stat points", + } + printErrFlag = cli.BoolFlag{ + Name: "print-errors", + Usage: "Print errors", + } ) var ( @@ -165,13 +169,14 @@ var ( Commands = []cli.Command{ analyzeCMD, bandwidthCMD, + csvCMD, deleteCMD, latencyCMD, listenCMD, listTestsCMD, requestsCMD, serverCMD, - statTestsCMD, + statDownloadCMD, stopCMD, } ) @@ -248,15 +253,22 @@ func parseConfig(ctx *cli.Context) (*shared.Config, error) { Save: ctx.BoolT(saveTestFlag.Name), TestID: ctx.String(testIDFlag.Name), RestartOnError: ctx.BoolT(restartOnErrorFlag.Name), - Output: ctx.String(outputFlag.Name), File: ctx.String(fileFlag.Name), + PrintFull: ctx.Bool(printStatsFlag.Name), + PrintErrors: ctx.Bool(printErrFlag.Name), } switch ctx.Command.Name { case "latency", "bandwidth", "http", "get": if ctx.String("id") == "" { - uid := uuid.NewString() - config.TestID = uid + "-" + time.Now().Format("2006-01-02-15-04-05") + config.TestID = strconv.Itoa(int(time.Now().Unix())) + } + case "download": + if ctx.String("id") == "" { + err = errors.New("--id is required") + } + if ctx.String("file") == "" { + err = errors.New("--file is required") } case "analyze": if ctx.String("file") == "" { diff --git a/cmd/hperf/server.go b/cmd/hperf/server.go index c7c0124..80c9f38 100644 --- a/cmd/hperf/server.go +++ b/cmd/hperf/server.go @@ -37,7 +37,12 @@ var ( Value: "0.0.0.0:9010", Usage: "bind to the specified address", } - + realIPFlag = cli.StringFlag{ + Name: "real-ip", + EnvVar: "HPERF_REAL_IP", + Value: "", + Usage: "The real IP used to connect to other servers. If the --address is bound to the real IP then this flag can be skipped.", + } storagePathFlag = cli.StringFlag{ Name: "storage-path", EnvVar: "HPERF_STORAGE_PATH", @@ -49,7 +54,7 @@ var ( Name: "server", Usage: "start an interactive server", Action: runServer, - Flags: []cli.Flag{addressFlag, storagePathFlag, debugFlag}, + Flags: []cli.Flag{addressFlag, realIPFlag, storagePathFlag, debugFlag}, CustomHelpTemplate: `NAME: {{.HelpName}} - {{.Usage}} @@ -68,11 +73,19 @@ EXAMPLES: 3. Run HPerf server with custom file path and custom address {{.Prompt}} {{.HelpName}} --storage-path /path/on/disk --address 0.0.0.0:9000 + + 4. Run HPerf server with custom file path and floating(real) ip + {{.Prompt}} {{.HelpName}} --storage-path /path/on/disk --address 0.0.0.0:9000 --real-ip 152.121.12.4 `, } ) func runServer(ctx *cli.Context) error { shared.DebugEnabled = debug - return server.RunServer(GlobalContext, ctx.String("address"), ctx.String("storage-path")) + return server.RunServer( + GlobalContext, + ctx.String("address"), + ctx.String("real-ip"), + ctx.String("storage-path"), + ) } diff --git a/server/server.go b/server/server.go index ad25caf..be7f4c3 100644 --- a/server/server.go +++ b/server/server.go @@ -25,6 +25,7 @@ import ( "fmt" "io" "log" + "math" "net" "net/http" "os" @@ -54,6 +55,7 @@ var ( WriteBufferSize: 1000000, }) bindAddress = "0.0.0.0:9000" + realIP = "" testFolderSuffix = "hperf-tests" basePath = "./" tests = make([]*test, 0) @@ -97,7 +99,7 @@ func (t *test) AddError(err error, id string) { t.errMap[id] = struct{}{} } -func RunServer(ctx context.Context, address string, storagePath string) (err error) { +func RunServer(ctx context.Context, address string, rIP string, storagePath string) (err error) { cancelContext, cancel := context.WithCancel(ctx) defer cancel() @@ -128,6 +130,7 @@ func RunServer(ctx context.Context, address string, storagePath string) (err err } bindAddress = address + realIP = rIP shared.INFO("starting 'hperf' server on:", bindAddress) err = startAPIandWS(cancelContext) if err != nil { @@ -385,7 +388,11 @@ func newTest(c *shared.Config) (t *test, err error) { for i := range c.Hosts { - if net.JoinHostPort(c.Hosts[i], c.Port) == bindAddress { + joinedHostPort := net.JoinHostPort(c.Hosts[i], c.Port) + if realIP != "" && strings.Contains(joinedHostPort, realIP) { + continue + } + if joinedHostPort == bindAddress { continue } t.Readers = append(t.Readers, @@ -404,7 +411,8 @@ func newTest(c *shared.Config) (t *test, err error) { } type netPerfReader struct { - m sync.Mutex + hasStats bool + m sync.Mutex buf []byte @@ -436,18 +444,19 @@ type asyncReader struct { } func (a *asyncReader) Read(b []byte) (n int, err error) { + a.pr.m.Lock() if !a.ttfbRegistered { - a.ttfbRegistered = true since := time.Since(a.start).Microseconds() - a.pr.m.Lock() + a.ttfbRegistered = true if since > a.pr.TTFBH { a.pr.TTFBH = since } if since < a.pr.TTFBL { a.pr.TTFBL = since } - a.pr.m.Unlock() } + a.pr.hasStats = true + a.pr.m.Unlock() if a.ctx.Err() != nil { return 0, io.EOF @@ -488,7 +497,9 @@ func createAndRunTest(con *websocket.Conn, signal shared.WebsocketSignal) { go startPerformanceReader(test, test.Readers[i]) } - listenToLiveTests(con, signal) + conUID := uuid.NewString() + test.cons[conUID] = con + for { if test.ctx.Err() != nil { return @@ -551,28 +562,32 @@ func sendAndSaveData(t *test) (err error) { if err != nil { t.AddError(err, "datapoint-marshaling") } - t.DataFile.Write(append(fileb, []byte{10}...)) + t.DataFile.Write(shared.DataPoint.String()) + t.DataFile.Write(fileb) + t.DataFile.Write([]byte{10}) } } t.DPS = make([]shared.DP, 0) t.M.Lock() - errMapClone := make([]shared.TError, 0) + errorsClone := make([]shared.TError, 0) for _, v := range t.errors { - errMapClone = append(errMapClone, v) + errorsClone = append(errorsClone, v) } t.errors = make([]shared.TError, 0) t.errMap = make(map[string]struct{}) t.M.Unlock() - for i := range errMapClone { - wss.DataPoint.Errors = append(wss.DataPoint.Errors, errMapClone[i]) + for i := range errorsClone { + wss.DataPoint.Errors = append(wss.DataPoint.Errors, errorsClone[i]) if t.Config.Save { - fileb, err := json.Marshal(errMapClone[i]) + fileb, err := json.Marshal(errorsClone[i]) if err != nil { t.AddError(err, "error-marshaling") } - t.DataFile.Write(append(fileb, []byte{10}...)) + t.DataFile.Write(shared.ErrorPoint.String()) + t.DataFile.Write(fileb) + t.DataFile.Write([]byte{10}) } } @@ -600,10 +615,14 @@ func generateDataPoints(t *test) { continue } + if !rv.hasStats { + continue + } + r := t.Readers[ri] + tx := r.TX.Swap(0) - prevTime := time.Since(r.lastDataPointTime).Microseconds() - totalSecs := float64(prevTime) / 1000000 + totalSecs := time.Since(r.lastDataPointTime).Seconds() r.lastDataPointTime = time.Now() txtotal := float64(tx) / totalSecs @@ -612,8 +631,8 @@ func generateDataPoints(t *test) { TestID: t.ID, Created: time.Now(), TX: uint64(txtotal), + TXTotal: tx, TXCount: r.TXCount.Load(), - Local: bindAddress, Remote: r.addr, TTFBL: r.TTFBL, TTFBH: r.TTFBH, @@ -625,11 +644,18 @@ func generateDataPoints(t *test) { CPUUsedPercent: int(cpuPercent), } + if realIP != "" { + d.Local = realIP + } else { + d.Local = bindAddress + } + r.m.Lock() + r.hasStats = false r.TTFBH = 0 - r.TTFBL = 99999999 + r.TTFBL = math.MaxInt64 r.RMSH = 0 - r.RMSL = 99999999 + r.RMSL = math.MaxInt64 r.m.Unlock() t.DPS = append(t.DPS, d) @@ -669,7 +695,8 @@ func newPerformanceReaderForASingleHost(c *shared.Config, host string, port stri r.addr = net.JoinHostPort(host, port) r.ip = host r.buf = make([]byte, c.PayloadSize) - r.TTFBL = 99999999 + r.TTFBL = math.MaxInt64 + r.RMSL = math.MaxInt64 r.client = &http.Client{ Transport: newTransport(c), } @@ -749,6 +776,10 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) { proto+r.addr+route, body, ) + if err != nil { + t.AddError(err, "network-new-request") + return + } if t.Config.TestType == shared.BandwidthTest { req.ContentLength = -1 @@ -780,6 +811,7 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) { if done < r.RMSL { r.RMSL = done } + r.hasStats = true r.m.Unlock() io.Copy(io.Discard, resp.Body) diff --git a/shared/shared.go b/shared/shared.go index d3f34ef..57b29ad 100644 --- a/shared/shared.go +++ b/shared/shared.go @@ -50,12 +50,39 @@ type TestInfo struct { Time time.Time } +type TestOutput struct { + ErrCount int + TXC uint64 + TXL uint64 + TXH uint64 + TXT uint64 + RMSL int64 + RMSH int64 + TTFBL int64 + TTFBH int64 + DP int + ML int + MH int + CL int + CH int +} + type ( SignalType int SignalCode int TestType int + FilePrefix byte ) +const ( + DataPoint FilePrefix = iota + ErrorPoint +) + +func (f FilePrefix) String() []byte { + return []byte(strconv.Itoa(int(f))) +} + const ( Err SignalType = iota RunTest @@ -121,6 +148,7 @@ type DP struct { TTFBH int64 TTFBL int64 TX uint64 + TXTotal uint64 TXCount uint64 ErrCount int DroppedPackets int @@ -152,12 +180,13 @@ type Config struct { Save bool `json:"Save"` Insecure bool `json:"Insecure"` TestType TestType `json:"TestType"` - Output string `json:"Output"` File string `json:"File"` // AllowLocalInterface bool `json:"AllowLocalInterfaces"` // Client Only ResolveHosts string `json:"-"` + PrintFull bool `json:"-"` + PrintErrors bool `json:"-"` } func INFO(items ...any) { @@ -287,11 +316,19 @@ func GetInterfaceAddresses() (list []string, err error) { return } -func WriteStructAndNewLineToFile(f *os.File, s interface{}) (int, error) { +func WriteStructAndNewLineToFile(f *os.File, prefix FilePrefix, s interface{}) (int, error) { outb, err := json.Marshal(s) if err != nil { return 0, err } - n, err := f.Write(append(outb, []byte{10}...)) + n, err := f.Write(prefix.String()) + if err != nil { + return n, err + } + n, err = f.Write(outb) + if err != nil { + return n, err + } + n, err = f.Write([]byte{10}) return n, err }