From b7307e0e2be53d7ba8f8391a8ccb8590d4948e55 Mon Sep 17 00:00:00 2001 From: zveinn Date: Wed, 16 Oct 2024 17:56:06 +0000 Subject: [PATCH] Work in progress --- README.md | 25 ++-- client/client.go | 229 +++++++++++++++++++++++++---- client/table.go | 119 +++++++++++++-- cmd/hperf/analyze.go | 126 +--------------- cmd/hperf/{stat.go => download.go} | 20 ++- cmd/hperf/main.go | 29 ++-- cmd/hperf/server.go | 19 ++- server/server.go | 64 +++++--- shared/shared.go | 21 ++- 9 files changed, 438 insertions(+), 214 deletions(-) rename cmd/hperf/{stat.go => download.go} (71%) diff --git a/README.md b/README.md index 88471e5..9d2602a 100644 --- a/README.md +++ b/README.md @@ -85,13 +85,13 @@ host per file line. NOTE: Be careful not to re-use the ID's if you care about fetching results at a later date. ```bash -# get test results -./hperf stat --hosts 1.1.1.{1...100} --id [my_test_id] -# save test results -./hperf stat --hosts 1.1.1.{1...100} --id [my_test_id] --output /tmp/test.out +# download test results +./hperf download --hosts 1.1.1.{1...100} --id [my_test_id] --file /tmp/test.out # analyze test results ./hperf analyze --file /tmp/test.out +# analyze test results with full print output +./hperf analyze --file /tmp/test.out --print-stats --print-errors # listen in on a running test ./hperf listen --hosts 1.1.1.{1...100} --id [my_test_id] @@ -107,23 +107,22 @@ The format used is: - in between: total, low, avarage, high - 90th percentile: total, low, avarage, high -## Available Statistics +## Statistics - Payload Roundtrip (RMS high/low): - Payload transfer time (Microseconds) - Time to first byte (TTFB high/low): - This is the amount of time (Microseconds) it takes between a request being made and the first byte being requested by the receiver - - Transferred bytes (TX): + - Transferred bytes (TX high/low): - Bandwidth throughput in KB/s, MB/s, GB/s, etc.. + - Transferred bytes (TX total): + - Total transferred bytes (not per second) - Request count (#TX): - The number of HTTP/s requests made - Error Count (#ERR): - Number of encountered errors - Dropped Packets (#Dropped): - - Total dropped packets on the server (total for all time) - - Memory (MemUsed): - - Total memory in use (total for all time) - - CPU (CPUUsed): - - Total memory in use (total for all time) + - Memory (Mem high/low/used): + - CPU (CPU high/low/used): ## Example: 20 second HTTP payload transfer test using multiple sockets This test will use 12 concurrent workers to send http requests with a payload without any timeout between requests. @@ -138,11 +137,11 @@ This will perform a 20 second bandwidth test with 12 concurrent HTTP streams: $ ./hperf bandwidth --hosts file:./hosts --id http-test-2 --duration 20 --concurrency 12 ``` -## Example: 5 Minute latency test using a 2000 Byte buffer, with a delay of 50ms between requests +## Example: 5 Minute latency test using a 1000 Byte buffer, with a delay of 50ms between requests This test will send a single round trip request between servers to test base latency and reachability: ``` $ ./hperf latency --hosts file:./hosts --id http-test-2 --duration 360 --concurrency 1 --requestDelay 50 ---bufferSize 2000 --payloadSize 2000 +--bufferSize 1000 --payloadSize 1000 ``` diff --git a/client/client.go b/client/client.go index 6c675bd..e0780c1 100644 --- a/client/client.go +++ b/client/client.go @@ -18,11 +18,13 @@ package client import ( + "bufio" "bytes" "context" "encoding/json" "errors" "fmt" + "math" "net/http" "os" "runtime/debug" @@ -209,7 +211,7 @@ func handleWSConnection(ctx context.Context, c *shared.Config, host string, id i } switch signal.SType { case shared.Stats: - go praseDataPoint(signal.DataPoint, c) + go collectDataPointv2(signal.DataPoint) case shared.ListTests: go parseTestList(signal.TestList) case shared.GetTest: @@ -261,9 +263,13 @@ func receiveJSONDataPoint(data []byte, _ *shared.Config) { } } -func keepAliveLoop(ctx context.Context, tickerfunc func() (shouldExit bool)) error { +func keepAliveLoop(ctx context.Context, c *shared.Config, tickerfunc func() (shouldExit bool)) error { + start := time.Now() for ctx.Err() == nil { time.Sleep(1 * time.Second) + if time.Since(start).Seconds() > float64(c.Duration)+20 { + return errors.New("Total duration reached 20 seconds past the configured duration") + } select { case <-ctx.Done(): @@ -298,7 +304,7 @@ func Listen(ctx context.Context, c shared.Config) (err error) { } }) - return keepAliveLoop(ctx, nil) + return keepAliveLoop(ctx, &c, nil) } func Stop(ctx context.Context, c shared.Config) (err error) { @@ -316,7 +322,7 @@ func Stop(ctx context.Context, c shared.Config) (err error) { } }) - return keepAliveLoop(ctx, nil) + return keepAliveLoop(ctx, &c, nil) } func RunTest(ctx context.Context, c shared.Config) (err error) { @@ -334,7 +340,74 @@ func RunTest(ctx context.Context, c shared.Config) (err error) { } }) - return keepAliveLoop(ctx, nil) + printCount := 0 + + printOnTick := func() bool { + if len(responseDPS) == 0 { + return false + } + printCount++ + + to := new(shared.TestOutput) + to.ErrCount = len(responseERR) + to.TXL = math.MaxInt64 + to.RMSL = math.MaxInt64 + to.TTFBL = math.MaxInt64 + to.ML = responseDPS[0].MemoryUsedPercent + to.CL = responseDPS[0].CPUUsedPercent + tt := responseDPS[0].Type + + for i := range responseDPS { + to.TXC += responseDPS[i].TXCount + to.TXT += responseDPS[i].TXTotal + to.DP += responseDPS[i].DroppedPackets + + if to.TXL > responseDPS[i].TX { + to.TXL = responseDPS[i].TX + } + if to.RMSL > responseDPS[i].RMSL { + to.RMSL = responseDPS[i].RMSL + } + if to.TTFBL > responseDPS[i].TTFBL { + to.TTFBL = responseDPS[i].TTFBL + } + if to.ML > responseDPS[i].MemoryUsedPercent { + to.ML = responseDPS[i].MemoryUsedPercent + } + if to.CL > responseDPS[i].CPUUsedPercent { + to.CL = responseDPS[i].CPUUsedPercent + } + + if to.TXH < responseDPS[i].TX { + to.TXH = responseDPS[i].TX + } + if to.RMSH < responseDPS[i].RMSH { + to.RMSH = responseDPS[i].RMSH + } + if to.TTFBH < responseDPS[i].TTFBH { + to.TTFBH = responseDPS[i].TTFBH + } + if to.MH < responseDPS[i].MemoryUsedPercent { + to.MH = responseDPS[i].MemoryUsedPercent + } + if to.CH < responseDPS[i].CPUUsedPercent { + to.CH = responseDPS[i].CPUUsedPercent + } + } + + for i := range responseERR { + fmt.Println(responseERR[i]) + } + + if printCount%10 == 1 { + printRealTimeHeaders(tt) + } + printRealTimeRow(SuccessStyle, to, tt) + + return false + } + + return keepAliveLoop(ctx, &c, printOnTick) } func ListTests(ctx context.Context, c shared.Config) (err error) { @@ -352,7 +425,7 @@ func ListTests(ctx context.Context, c shared.Config) (err error) { } }) - err = keepAliveLoop(ctx, nil) + err = keepAliveLoop(ctx, &c, nil) if err != nil { return } @@ -400,7 +473,7 @@ func DeleteTests(ctx context.Context, c shared.Config) (err error) { } }) - return keepAliveLoop(ctx, nil) + return keepAliveLoop(ctx, &c, nil) } func parseTestList(list []shared.TestInfo) { @@ -415,7 +488,7 @@ func parseTestList(list []shared.TestInfo) { } } -func GetTest(ctx context.Context, c shared.Config) (err error) { +func DownloadTest(ctx context.Context, c shared.Config) (err error) { cancelContext, cancel := context.WithCancel(ctx) defer cancel() err = initializeClient(cancelContext, &c) @@ -431,7 +504,7 @@ func GetTest(ctx context.Context, c shared.Config) (err error) { } }) - _ = keepAliveLoop(ctx, nil) + _ = keepAliveLoop(ctx, &c, nil) slices.SortFunc(responseERR, func(a shared.TError, b shared.TError) int { if a.Created.Before(b.Created) { @@ -449,39 +522,143 @@ func GetTest(ctx context.Context, c shared.Config) (err error) { } }) - if c.Output != "" { - f, err := os.Create(c.Output) + f, err := os.Create(c.File) + if err != nil { + return err + } + for i := range responseDPS { + _, err := shared.WriteStructAndNewLineToFile(f, responseDPS[i]) if err != nil { return err } - for i := range responseDPS { - _, err := shared.WriteStructAndNewLineToFile(f, responseDPS[i]) + } + for i := range responseERR { + _, err := shared.WriteStructAndNewLineToFile(f, responseERR[i]) + if err != nil { + return err + } + } + + return nil +} + +func AnalyzeTest(ctx context.Context, c shared.Config) (err error) { + _, cancel := context.WithCancel(ctx) + defer cancel() + + f, err := os.Open(c.File) + if err != nil { + return err + } + + dps := make([]shared.DP, 0) + errors := make([]shared.TError, 0) + + s := bufio.NewScanner(f) + for s.Scan() { + b := s.Bytes() + if !bytes.Contains(b, []byte("Error")) { + dp := new(shared.DP) + err := json.Unmarshal(b, dp) if err != nil { return err } - } - for i := range responseERR { - _, err := shared.WriteStructAndNewLineToFile(f, responseERR[i]) + dps = append(dps, *dp) + } else { + dperr := new(shared.TError) + err := json.Unmarshal(b, dperr) if err != nil { return err } + errors = append(errors, *dperr) } + } - return nil + if c.PrintFull { + printDataPointHeaders(dps[0].Type) + for i := range dps { + dp := dps[i] + sp1 := strings.Split(dp.Local, ":") + sp2 := strings.Split(sp1[0], ".") + s1 := lipgloss.NewStyle().Background(lipgloss.Color(getHex(sp2[len(sp2)-1]))) + printTableRow(s1, &dp, dp.Type) + } } - printDataPointHeaders(responseDPS[0].Type) - for i := range responseDPS { - dp := responseDPS[i] - sp1 := strings.Split(dp.Local, ":") - sp2 := strings.Split(sp1[0], ".") - s1 := lipgloss.NewStyle().Background(lipgloss.Color(getHex(sp2[len(sp2)-1]))) - printTableRow(s1, &dp, dp.Type) + if c.PrintErrors { + for i := range errors { + PrintTError(errors[i]) + } } - for i := range responseERR { - PrintTError(responseERR[i]) + dps10 := math.Ceil((float64(len(dps)) / 100) * 10) + dps90 := math.Floor((float64(len(dps)) / 100) * 90) + + slices.SortFunc(dps, func(a shared.DP, b shared.DP) int { + if a.RMSH < b.RMSH { + return -1 + } else { + return 1 + } + }) + + dps10s := make([]shared.DP, 0) + dps50s := make([]shared.DP, 0) + dps90s := make([]shared.DP, 0) + + // total, sum, low, mean, high + dps10stats := []int64{0, 0, math.MaxInt64, 0, 0} + dps50stats := []int64{0, 0, math.MaxInt64, 0, 0} + dps90stats := []int64{0, 0, math.MaxInt64, 0, 0} + + for i := range dps { + if i <= int(dps10) { + dps10s = append(dps10s, dps[i]) + updateBracketStats(dps10stats, dps[i]) + } else if i >= int(dps90) { + dps90s = append(dps90s, dps[i]) + updateBracketStats(dps90stats, dps[i]) + } else { + dps50s = append(dps50s, dps[i]) + updateBracketStats(dps50stats, dps[i]) + } } + fmt.Println("") + fmt.Println("") + fmt.Println("") + + fmt.Println(" First 10% of data points") + printBracker(dps10stats, SuccessStyle) + fmt.Println("") + fmt.Println(" Between 10% and 90%") + printBracker(dps50stats, WarningStyle) + fmt.Println("") + fmt.Println(" Last 10% of data points") + printBracker(dps90stats, ErrorStyle) + fmt.Println("") return nil } + +func printBracker(b []int64, style lipgloss.Style) { + fmt.Println(style.Render( + fmt.Sprintf(" Total %d | Low %d | Avg %d | High %d | Microseconds ", + b[0], + b[2], + b[3], + b[4], + ), + )) +} + +func updateBracketStats(b []int64, dp shared.DP) { + b[0]++ + b[1] += dp.RMSH + if dp.RMSH < b[2] { + b[2] = dp.RMSH + } + b[3] = b[1] / b[0] + if dp.RMSH > b[4] { + b[4] = dp.RMSH + } +} diff --git a/client/table.go b/client/table.go index aa34ff0..f1144f3 100644 --- a/client/table.go +++ b/client/table.go @@ -37,7 +37,7 @@ type column struct { width int } -var headerSlice = make([]header, 16) +var headerSlice = make([]header, 23) type HeaderField int @@ -46,16 +46,23 @@ const ( Created Local Remote - PMSH - PMSL + RMSH + RMSL TTFBH TTFBL TX + TXH + TXL + TXT TXCount ErrCount DroppedPackets MemoryUsage + MemoryHigh + MemoryLow CPUUsage + CPUHigh + CPULow ID HumanTime ) @@ -65,16 +72,23 @@ func initHeaders() { headerSlice[Created] = header{"Created", 8} headerSlice[Local] = header{"Local", 15} headerSlice[Remote] = header{"Remote", 15} - headerSlice[PMSH] = header{"RMSH", 8} - headerSlice[PMSL] = header{"RMSL", 8} + headerSlice[RMSH] = header{"RMSH", 8} + headerSlice[RMSL] = header{"RMSL", 8} headerSlice[TTFBH] = header{"TTFBH", 8} headerSlice[TTFBL] = header{"TTFBL", 8} headerSlice[TX] = header{"TX", 10} + headerSlice[TXL] = header{"TXLow", 10} + headerSlice[TXH] = header{"TXHigh", 10} + headerSlice[TXT] = header{"TXTotal", 15} headerSlice[TXCount] = header{"#TX", 10} headerSlice[ErrCount] = header{"#ERR", 6} headerSlice[DroppedPackets] = header{"#Dropped", 9} headerSlice[MemoryUsage] = header{"MemUsed", 7} + headerSlice[MemoryHigh] = header{"MemHigh", 7} + headerSlice[MemoryLow] = header{"MemLow", 7} headerSlice[CPUUsage] = header{"CPUUsed", 7} + headerSlice[CPUHigh] = header{"CPUHigh", 7} + headerSlice[CPULow] = header{"CPULow", 7} headerSlice[ID] = header{"ID", 30} headerSlice[HumanTime] = header{"Time", 30} } @@ -89,9 +103,13 @@ func GenerateFormatString(columnCount int) (fs string) { var ( ListHeaders = []HeaderField{IntNumber, ID, HumanTime} BandwidthHeaders = []HeaderField{Created, Local, Remote, TX, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} - LatencyHeaders = []HeaderField{Created, Local, Remote, PMSH, PMSL, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} - HTTPHeaders = []HeaderField{Created, Local, Remote, PMSH, PMSL, TTFBH, TTFBL, TX, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} - FullDataPointHeaders = []HeaderField{Created, Local, Remote, PMSH, PMSL, TTFBH, TTFBL, TX, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} + LatencyHeaders = []HeaderField{Created, Local, Remote, RMSH, RMSL, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} + HTTPHeaders = []HeaderField{Created, Local, Remote, RMSH, RMSL, TTFBH, TTFBL, TX, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} + FullDataPointHeaders = []HeaderField{Created, Local, Remote, RMSH, RMSL, TTFBH, TTFBL, TX, TXCount, ErrCount, DroppedPackets, MemoryUsage, CPUUsage} + + RealTimeLatencyHeaders = []HeaderField{ErrCount, TXCount, RMSH, RMSL, DroppedPackets, MemoryHigh, MemoryLow, CPUHigh, CPULow} + RealTimeBandwidthHeaders = []HeaderField{ErrCount, TXCount, TXH, TXL, TXT, DroppedPackets, MemoryHigh, MemoryLow, CPUHigh, CPULow} + RealTimeHTTPHeaders = []HeaderField{ErrCount, TXCount, TXH, TXL, TXT, RMSH, RMSL, TTFBH, TTFBL, DroppedPackets, MemoryHigh, MemoryLow, CPUHigh, CPULow} ) var ( @@ -140,6 +158,71 @@ func printDataPointHeaders(t shared.TestType) { } } +func printRealTimeHeaders(t shared.TestType) { + switch t { + case shared.BandwidthTest: + printHeader(RealTimeBandwidthHeaders) + case shared.LatencyTest: + printHeader(RealTimeLatencyHeaders) + case shared.HTTPTest: + printHeader(RealTimeHTTPHeaders) + default: + } +} + +func printRealTimeRow(style lipgloss.Style, entry *shared.TestOutput, t shared.TestType) { + switch t { + case shared.LatencyTest: + PrintColumns( + style, + column{formatInt(int64(entry.ErrCount)), headerSlice[ErrCount].width}, + column{formatUint(entry.TXC), headerSlice[TXCount].width}, + column{formatInt(entry.RMSH), headerSlice[RMSH].width}, + column{formatInt(entry.RMSL), headerSlice[RMSL].width}, + column{formatInt(int64(entry.DP)), headerSlice[DroppedPackets].width}, + column{formatInt(int64(entry.MH)), headerSlice[MemoryHigh].width}, + column{formatInt(int64(entry.ML)), headerSlice[MemoryLow].width}, + column{formatInt(int64(entry.CH)), headerSlice[CPUHigh].width}, + column{formatInt(int64(entry.CL)), headerSlice[CPULow].width}, + ) + return + case shared.BandwidthTest: + PrintColumns( + style, + column{formatInt(int64(entry.ErrCount)), headerSlice[ErrCount].width}, + column{formatUint(entry.TXH), headerSlice[TXH].width}, + column{formatUint(entry.TXL), headerSlice[TXL].width}, + column{formatUint(entry.TXT), headerSlice[TXT].width}, + column{formatInt(int64(entry.DP)), headerSlice[DroppedPackets].width}, + column{formatInt(int64(entry.MH)), headerSlice[MemoryHigh].width}, + column{formatInt(int64(entry.ML)), headerSlice[MemoryLow].width}, + column{formatInt(int64(entry.CH)), headerSlice[CPUHigh].width}, + column{formatInt(int64(entry.CL)), headerSlice[CPULow].width}, + ) + return + case shared.HTTPTest: + PrintColumns( + style, + column{formatInt(int64(entry.ErrCount)), headerSlice[ErrCount].width}, + column{formatUint(entry.TXC), headerSlice[TXCount].width}, + column{formatUint(entry.TXH), headerSlice[TXH].width}, + column{formatUint(entry.TXL), headerSlice[TXL].width}, + column{formatUint(entry.TXT), headerSlice[TXT].width}, + column{formatInt(entry.RMSH), headerSlice[RMSH].width}, + column{formatInt(entry.RMSL), headerSlice[RMSL].width}, + column{formatInt(entry.TTFBH), headerSlice[TTFBH].width}, + column{formatInt(entry.TTFBL), headerSlice[TTFBL].width}, + column{formatInt(int64(entry.DP)), headerSlice[DroppedPackets].width}, + column{formatInt(int64(entry.MH)), headerSlice[MemoryHigh].width}, + column{formatInt(int64(entry.ML)), headerSlice[MemoryLow].width}, + column{formatInt(int64(entry.CH)), headerSlice[CPUHigh].width}, + column{formatInt(int64(entry.CL)), headerSlice[CPULow].width}, + ) + default: + shared.DEBUG("Unknown test type, not printing table") + } +} + func printTableRow(style lipgloss.Style, entry *shared.DP, t shared.TestType) { switch t { case shared.LatencyTest: @@ -148,8 +231,8 @@ func printTableRow(style lipgloss.Style, entry *shared.DP, t shared.TestType) { column{entry.Created.Format("15:04:05"), headerSlice[Created].width}, column{strings.Split(entry.Local, ":")[0], headerSlice[Local].width}, column{strings.Split(entry.Remote, ":")[0], headerSlice[Remote].width}, - column{formatInt(entry.RMSH), headerSlice[PMSH].width}, - column{formatInt(entry.RMSL), headerSlice[PMSL].width}, + column{formatInt(entry.RMSH), headerSlice[RMSH].width}, + column{formatInt(entry.RMSL), headerSlice[RMSL].width}, column{formatUint(entry.TXCount), headerSlice[TXCount].width}, column{formatInt(int64(entry.ErrCount)), headerSlice[ErrCount].width}, column{formatInt(int64(entry.DroppedPackets)), headerSlice[DroppedPackets].width}, @@ -176,8 +259,8 @@ func printTableRow(style lipgloss.Style, entry *shared.DP, t shared.TestType) { column{entry.Created.Format("15:04:05"), headerSlice[Created].width}, column{strings.Split(entry.Local, ":")[0], headerSlice[Local].width}, column{strings.Split(entry.Remote, ":")[0], headerSlice[Remote].width}, - column{formatInt(entry.RMSH), headerSlice[PMSH].width}, - column{formatInt(entry.RMSL), headerSlice[PMSL].width}, + column{formatInt(entry.RMSH), headerSlice[RMSH].width}, + column{formatInt(entry.RMSL), headerSlice[RMSL].width}, column{formatInt(entry.TTFBH), headerSlice[TTFBH].width}, column{formatInt(entry.TTFBL), headerSlice[TTFBH].width}, column{shared.BandwidthBytesToString(entry.TX), headerSlice[TX].width}, @@ -192,6 +275,18 @@ func printTableRow(style lipgloss.Style, entry *shared.DP, t shared.TestType) { } } +func collectDataPointv2(r *shared.DataReponseToClient) { + if r == nil { + return + } + + responseLock.Lock() + defer responseLock.Unlock() + + responseDPS = append(responseDPS, r.DPS...) + responseERR = append(responseERR, r.Errors...) +} + func praseDataPoint(r *shared.DataReponseToClient, c *shared.Config) { if r == nil { return diff --git a/cmd/hperf/analyze.go b/cmd/hperf/analyze.go index 25d935f..cc23f5e 100644 --- a/cmd/hperf/analyze.go +++ b/cmd/hperf/analyze.go @@ -18,19 +18,8 @@ package main import ( - "bufio" - "bytes" - "context" - "encoding/json" - "fmt" - "math" - "os" - "slices" - - "github.com/charmbracelet/lipgloss" "github.com/minio/cli" "github.com/minio/hperf/client" - "github.com/minio/hperf/shared" ) var analyzeCMD = cli.Command{ @@ -42,6 +31,8 @@ var analyzeCMD = cli.Command{ hostsFlag, portFlag, fileFlag, + printStatsFlag, + printErrFlag, }, CustomHelpTemplate: `NAME: {{.HelpName}} - {{.Usage}} @@ -55,6 +46,8 @@ FLAGS: EXAMPLES: 1. Analyze test results in file '/tmp/latency-test-1': {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1 --file latency-test-1 + 1. Analyze test results and print full output: + {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1 --file latency-test-1 --print-full `, } @@ -63,114 +56,5 @@ func runAnalyze(ctx *cli.Context) error { if err != nil { return err } - return AnalyzeTest(GlobalContext, *config) -} - -func AnalyzeTest(ctx context.Context, c shared.Config) (err error) { - _, cancel := context.WithCancel(ctx) - defer cancel() - - f, err := os.Open(c.File) - if err != nil { - return err - } - - dps := make([]shared.DP, 0) - errors := make([]shared.TError, 0) - - s := bufio.NewScanner(f) - for s.Scan() { - b := s.Bytes() - if !bytes.Contains(b, []byte("Error")) { - dp := new(shared.DP) - err := json.Unmarshal(b, dp) - if err != nil { - return err - } - dps = append(dps, *dp) - } else { - dperr := new(shared.TError) - err := json.Unmarshal(b, dperr) - if err != nil { - return err - } - errors = append(errors, *dperr) - } - } - - // adjust stats - for i := range dps { - // Highest RMSH can never be 0, but it's the default value of golang int64. - // if we find a 0 we just set it to an impossibly high value. - if dps[i].RMSH == 0 { - dps[i].RMSH = 999999999 - } - } - - dps10 := math.Ceil((float64(len(dps)) / 100) * 10) - dps90 := math.Floor((float64(len(dps)) / 100) * 90) - - slices.SortFunc(dps, func(a shared.DP, b shared.DP) int { - if a.RMSH < b.RMSH { - return -1 - } else { - return 1 - } - }) - - dps10s := make([]shared.DP, 0) - dps50s := make([]shared.DP, 0) - dps90s := make([]shared.DP, 0) - - // total, sum, low, mean, high - dps10stats := []int64{0, 0, 999999999, 0, 0} - dps50stats := []int64{0, 0, 999999999, 0, 0} - dps90stats := []int64{0, 0, 999999999, 0, 0} - - for i := range dps { - if i <= int(dps10) { - dps10s = append(dps10s, dps[i]) - updateBracketStats(dps10stats, dps[i]) - } else if i >= int(dps90) { - dps90s = append(dps90s, dps[i]) - updateBracketStats(dps90stats, dps[i]) - } else { - dps50s = append(dps50s, dps[i]) - updateBracketStats(dps50stats, dps[i]) - } - } - - for i := range errors { - client.PrintTError(errors[i]) - } - - printBracker(dps10stats, "? < 10%", client.SuccessStyle) - printBracker(dps50stats, "10% < ? < 90%", client.WarningStyle) - printBracker(dps90stats, "? > 90%", client.ErrorStyle) - - return nil -} - -func printBracker(b []int64, tag string, style lipgloss.Style) { - fmt.Println(style.Render( - fmt.Sprintf(" %s | Total %d | Low %d | Avg %d | High %d | Microseconds ", - tag, - b[0], - b[2], - b[3], - b[4], - ), - )) -} - -func updateBracketStats(b []int64, dp shared.DP) { - b[0]++ - b[1] += dp.RMSH - if dp.RMSH < b[2] { - b[2] = dp.RMSH - } - b[3] = b[1] / b[0] - if dp.RMSH > b[4] { - b[4] = dp.RMSH - } + return client.AnalyzeTest(GlobalContext, *config) } diff --git a/cmd/hperf/stat.go b/cmd/hperf/download.go similarity index 71% rename from cmd/hperf/stat.go rename to cmd/hperf/download.go index 2fbd7bd..a541229 100644 --- a/cmd/hperf/stat.go +++ b/cmd/hperf/download.go @@ -22,16 +22,16 @@ import ( "github.com/minio/hperf/client" ) -var statTestsCMD = cli.Command{ - Name: "stat", - Usage: "print stats for a given test from the selected hosts", - Action: runStat, +var statDownloadCMD = cli.Command{ + Name: "download", + Usage: "Download stats for tests by ID", + Action: runDownload, Flags: []cli.Flag{ dnsServerFlag, hostsFlag, portFlag, testIDFlag, - outputFlag, + fileFlag, }, CustomHelpTemplate: `NAME: {{.HelpName}} - {{.Usage}} @@ -43,18 +43,16 @@ FLAGS: {{range .VisibleFlags}}{{.}} {{end}} EXAMPLES: - 1. Print stats by ID for hosts '10.10.10.1' and '10.10.10.2': - {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id - 2. Save stats by ID for hosts '10.10.10.1' and '10.10.10.2': - {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id --output /tmp/output-file + 1. Download test by ID for hosts '10.10.10.1' and '10.10.10.2': + {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id --file /tmp/output-file `, } -func runStat(ctx *cli.Context) error { +func runDownload(ctx *cli.Context) error { config, err := parseConfig(ctx) if err != nil { return err } - return client.GetTest(GlobalContext, *config) + return client.DownloadTest(GlobalContext, *config) } diff --git a/cmd/hperf/main.go b/cmd/hperf/main.go index 8212708..577ceb4 100644 --- a/cmd/hperf/main.go +++ b/cmd/hperf/main.go @@ -26,10 +26,10 @@ import ( "os" "os/signal" "runtime" + "strconv" "syscall" "time" - "github.com/google/uuid" "github.com/minio/cli" "github.com/minio/hperf/client" "github.com/minio/hperf/shared" @@ -137,10 +137,6 @@ var ( Name: "id", Usage: "specify custom ID per test", } - outputFlag = cli.StringFlag{ - Name: "output", - Usage: "set output file path/name", - } fileFlag = cli.StringFlag{ Name: "file", Usage: "input file path", @@ -155,6 +151,14 @@ var ( EnvVar: "HPERF_DNS_SERVER", Usage: "use a custom DNS server to resolve hosts", } + printStatsFlag = cli.BoolFlag{ + Name: "print-stats", + Usage: "Print stat points", + } + printErrFlag = cli.BoolFlag{ + Name: "print-errors", + Usage: "Print errors", + } ) var ( @@ -171,7 +175,7 @@ var ( listTestsCMD, requestsCMD, serverCMD, - statTestsCMD, + statDownloadCMD, stopCMD, } ) @@ -248,15 +252,22 @@ func parseConfig(ctx *cli.Context) (*shared.Config, error) { Save: ctx.BoolT(saveTestFlag.Name), TestID: ctx.String(testIDFlag.Name), RestartOnError: ctx.BoolT(restartOnErrorFlag.Name), - Output: ctx.String(outputFlag.Name), File: ctx.String(fileFlag.Name), + PrintFull: ctx.Bool(printStatsFlag.Name), + PrintErrors: ctx.Bool(printErrFlag.Name), } switch ctx.Command.Name { case "latency", "bandwidth", "http", "get": if ctx.String("id") == "" { - uid := uuid.NewString() - config.TestID = uid + "-" + time.Now().Format("2006-01-02-15-04-05") + config.TestID = strconv.Itoa(int(time.Now().Unix())) + } + case "download": + if ctx.String("id") == "" { + err = errors.New("--id is required") + } + if ctx.String("file") == "" { + err = errors.New("--file is required") } case "analyze": if ctx.String("file") == "" { diff --git a/cmd/hperf/server.go b/cmd/hperf/server.go index c7c0124..80c9f38 100644 --- a/cmd/hperf/server.go +++ b/cmd/hperf/server.go @@ -37,7 +37,12 @@ var ( Value: "0.0.0.0:9010", Usage: "bind to the specified address", } - + realIPFlag = cli.StringFlag{ + Name: "real-ip", + EnvVar: "HPERF_REAL_IP", + Value: "", + Usage: "The real IP used to connect to other servers. If the --address is bound to the real IP then this flag can be skipped.", + } storagePathFlag = cli.StringFlag{ Name: "storage-path", EnvVar: "HPERF_STORAGE_PATH", @@ -49,7 +54,7 @@ var ( Name: "server", Usage: "start an interactive server", Action: runServer, - Flags: []cli.Flag{addressFlag, storagePathFlag, debugFlag}, + Flags: []cli.Flag{addressFlag, realIPFlag, storagePathFlag, debugFlag}, CustomHelpTemplate: `NAME: {{.HelpName}} - {{.Usage}} @@ -68,11 +73,19 @@ EXAMPLES: 3. Run HPerf server with custom file path and custom address {{.Prompt}} {{.HelpName}} --storage-path /path/on/disk --address 0.0.0.0:9000 + + 4. Run HPerf server with custom file path and floating(real) ip + {{.Prompt}} {{.HelpName}} --storage-path /path/on/disk --address 0.0.0.0:9000 --real-ip 152.121.12.4 `, } ) func runServer(ctx *cli.Context) error { shared.DebugEnabled = debug - return server.RunServer(GlobalContext, ctx.String("address"), ctx.String("storage-path")) + return server.RunServer( + GlobalContext, + ctx.String("address"), + ctx.String("real-ip"), + ctx.String("storage-path"), + ) } diff --git a/server/server.go b/server/server.go index ad25caf..ee049a5 100644 --- a/server/server.go +++ b/server/server.go @@ -25,6 +25,7 @@ import ( "fmt" "io" "log" + "math" "net" "net/http" "os" @@ -54,6 +55,7 @@ var ( WriteBufferSize: 1000000, }) bindAddress = "0.0.0.0:9000" + realIP = "" testFolderSuffix = "hperf-tests" basePath = "./" tests = make([]*test, 0) @@ -97,7 +99,7 @@ func (t *test) AddError(err error, id string) { t.errMap[id] = struct{}{} } -func RunServer(ctx context.Context, address string, storagePath string) (err error) { +func RunServer(ctx context.Context, address string, rIP string, storagePath string) (err error) { cancelContext, cancel := context.WithCancel(ctx) defer cancel() @@ -128,6 +130,7 @@ func RunServer(ctx context.Context, address string, storagePath string) (err err } bindAddress = address + realIP = rIP shared.INFO("starting 'hperf' server on:", bindAddress) err = startAPIandWS(cancelContext) if err != nil { @@ -385,7 +388,11 @@ func newTest(c *shared.Config) (t *test, err error) { for i := range c.Hosts { - if net.JoinHostPort(c.Hosts[i], c.Port) == bindAddress { + joinedHostPort := net.JoinHostPort(c.Hosts[i], c.Port) + if realIP != "" && strings.Contains(joinedHostPort, realIP) { + continue + } + if joinedHostPort == bindAddress { continue } t.Readers = append(t.Readers, @@ -404,7 +411,8 @@ func newTest(c *shared.Config) (t *test, err error) { } type netPerfReader struct { - m sync.Mutex + hasStats bool + m sync.Mutex buf []byte @@ -436,18 +444,19 @@ type asyncReader struct { } func (a *asyncReader) Read(b []byte) (n int, err error) { + a.pr.m.Lock() if !a.ttfbRegistered { - a.ttfbRegistered = true since := time.Since(a.start).Microseconds() - a.pr.m.Lock() + a.ttfbRegistered = true if since > a.pr.TTFBH { a.pr.TTFBH = since } if since < a.pr.TTFBL { a.pr.TTFBL = since } - a.pr.m.Unlock() } + a.pr.hasStats = true + a.pr.m.Unlock() if a.ctx.Err() != nil { return 0, io.EOF @@ -488,7 +497,9 @@ func createAndRunTest(con *websocket.Conn, signal shared.WebsocketSignal) { go startPerformanceReader(test, test.Readers[i]) } - listenToLiveTests(con, signal) + conUID := uuid.NewString() + test.cons[conUID] = con + for { if test.ctx.Err() != nil { return @@ -557,18 +568,18 @@ func sendAndSaveData(t *test) (err error) { t.DPS = make([]shared.DP, 0) t.M.Lock() - errMapClone := make([]shared.TError, 0) + errorsClone := make([]shared.TError, 0) for _, v := range t.errors { - errMapClone = append(errMapClone, v) + errorsClone = append(errorsClone, v) } t.errors = make([]shared.TError, 0) t.errMap = make(map[string]struct{}) t.M.Unlock() - for i := range errMapClone { - wss.DataPoint.Errors = append(wss.DataPoint.Errors, errMapClone[i]) + for i := range errorsClone { + wss.DataPoint.Errors = append(wss.DataPoint.Errors, errorsClone[i]) if t.Config.Save { - fileb, err := json.Marshal(errMapClone[i]) + fileb, err := json.Marshal(errorsClone[i]) if err != nil { t.AddError(err, "error-marshaling") } @@ -600,10 +611,14 @@ func generateDataPoints(t *test) { continue } + if !rv.hasStats { + continue + } + r := t.Readers[ri] + tx := r.TX.Swap(0) - prevTime := time.Since(r.lastDataPointTime).Microseconds() - totalSecs := float64(prevTime) / 1000000 + totalSecs := time.Since(r.lastDataPointTime).Seconds() r.lastDataPointTime = time.Now() txtotal := float64(tx) / totalSecs @@ -612,8 +627,8 @@ func generateDataPoints(t *test) { TestID: t.ID, Created: time.Now(), TX: uint64(txtotal), + TXTotal: tx, TXCount: r.TXCount.Load(), - Local: bindAddress, Remote: r.addr, TTFBL: r.TTFBL, TTFBH: r.TTFBH, @@ -625,11 +640,18 @@ func generateDataPoints(t *test) { CPUUsedPercent: int(cpuPercent), } + if realIP != "" { + d.Local = realIP + } else { + d.Local = bindAddress + } + r.m.Lock() + r.hasStats = false r.TTFBH = 0 - r.TTFBL = 99999999 + r.TTFBL = math.MaxInt64 r.RMSH = 0 - r.RMSL = 99999999 + r.RMSL = math.MaxInt64 r.m.Unlock() t.DPS = append(t.DPS, d) @@ -669,7 +691,8 @@ func newPerformanceReaderForASingleHost(c *shared.Config, host string, port stri r.addr = net.JoinHostPort(host, port) r.ip = host r.buf = make([]byte, c.PayloadSize) - r.TTFBL = 99999999 + r.TTFBL = math.MaxInt64 + r.RMSL = math.MaxInt64 r.client = &http.Client{ Transport: newTransport(c), } @@ -749,6 +772,10 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) { proto+r.addr+route, body, ) + if err != nil { + t.AddError(err, "network-new-request") + return + } if t.Config.TestType == shared.BandwidthTest { req.ContentLength = -1 @@ -780,6 +807,7 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) { if done < r.RMSL { r.RMSL = done } + r.hasStats = true r.m.Unlock() io.Copy(io.Discard, resp.Body) diff --git a/shared/shared.go b/shared/shared.go index d3f34ef..adede35 100644 --- a/shared/shared.go +++ b/shared/shared.go @@ -50,6 +50,23 @@ type TestInfo struct { Time time.Time } +type TestOutput struct { + ErrCount int + TXC uint64 + TXL uint64 + TXH uint64 + TXT uint64 + RMSL int64 + RMSH int64 + TTFBL int64 + TTFBH int64 + DP int + ML int + MH int + CL int + CH int +} + type ( SignalType int SignalCode int @@ -121,6 +138,7 @@ type DP struct { TTFBH int64 TTFBL int64 TX uint64 + TXTotal uint64 TXCount uint64 ErrCount int DroppedPackets int @@ -152,12 +170,13 @@ type Config struct { Save bool `json:"Save"` Insecure bool `json:"Insecure"` TestType TestType `json:"TestType"` - Output string `json:"Output"` File string `json:"File"` // AllowLocalInterface bool `json:"AllowLocalInterfaces"` // Client Only ResolveHosts string `json:"-"` + PrintFull bool `json:"-"` + PrintErrors bool `json:"-"` } func INFO(items ...any) {