@@ -44,6 +44,7 @@ import (
44
44
"github.com/perfana/x2i/influx"
45
45
l "github.com/perfana/x2i/logger"
46
46
"github.com/spf13/cobra"
47
+ "golang.org/x/mod/semver"
47
48
)
48
49
49
50
const (
79
80
groupLine = regexp .MustCompile (`GROUP\s` )
80
81
runLine = regexp .MustCompile (`^RUN\s` )
81
82
errorLine = regexp .MustCompile (`^ERROR\s` )
82
- // for checking gatling version
83
- gatlingVersion313x = regexp .MustCompile (`3\.13\..*` )
84
83
85
84
parserStopped = make (chan struct {})
86
85
)
@@ -596,8 +595,12 @@ func processRemainingRecords(
596
595
scenarios []string ,
597
596
records chan <- interface {},
598
597
) {
599
- defer close (records )
600
- defer wg .Done ()
598
+ defer func () {
599
+ close (records )
600
+ wg .Done ()
601
+ }()
602
+
603
+ latestReadTime := time .Now ()
601
604
602
605
for {
603
606
select {
@@ -608,19 +611,31 @@ func processRemainingRecords(
608
611
record , err := ReadNotHeaderRecord (reader , runMessage .Start , scenarios )
609
612
if err != nil {
610
613
if err == io .EOF {
611
- time .Sleep (time .Duration (waitTime ) * time .Second ) // Wait if end of file
614
+ // If no new data read for more than value provided by 'stop-timeout' key then processing is stopped
615
+ if time .Now ().After (latestReadTime .Add (time .Duration (waitTime ) * time .Second )) {
616
+ l .Infof ("No new entries found for %d seconds. Stopping application..." , waitTime )
617
+ return
618
+ }
619
+ time .Sleep (time .Second ) // Wait if end of file
612
620
continue
613
621
}
614
622
l .Errorf ("Reading error: %v" , err )
615
623
continue
616
624
}
617
625
618
626
records <- record
627
+ latestReadTime = time .Now ()
619
628
}
620
629
}
621
630
}
622
631
623
- func writeRecords (wg * sync.WaitGroup , records <- chan interface {}) {
632
+ type RecordsWriter interface {
633
+ writeAll (wg * sync.WaitGroup , records <- chan interface {})
634
+ }
635
+
636
+ type InfluxRecordsWriter struct {}
637
+
638
+ func (w * InfluxRecordsWriter ) writeAll (wg * sync.WaitGroup , records <- chan interface {}) {
624
639
defer wg .Done ()
625
640
for record := range records {
626
641
switch r := record .(type ) {
@@ -665,7 +680,45 @@ func writeRecords(wg *sync.WaitGroup, records <-chan interface{}) {
665
680
}
666
681
}
667
682
668
- func fileProcessorBinary (ctx context.Context , file * os.File ) {
683
+ type SumRecordsWriter struct {}
684
+
685
+ func (w * SumRecordsWriter ) writeAll (wg * sync.WaitGroup , records <- chan interface {}) {
686
+ defer wg .Done ()
687
+ var (
688
+ users = 0
689
+ reqs = 0
690
+ rumMessages = 0
691
+ errors = 0
692
+ groups = 0
693
+ )
694
+
695
+ for record := range records {
696
+ // l.Infoln(record)
697
+ switch r := record .(type ) {
698
+ case RunMessage :
699
+ rumMessages += 1
700
+
701
+ case RequestRecord :
702
+ reqs += 1
703
+
704
+ case GroupRecord :
705
+ groups += 1
706
+
707
+ case UserRecord :
708
+ users += 1
709
+
710
+ case ErrorRecord :
711
+ errors += 1
712
+
713
+ default :
714
+ l .Errorf ("Unknown record type: %T" , r )
715
+ }
716
+
717
+ }
718
+ l .Debugf ("msg = %d, users = %d, reqs = %d, groups = %d, errors = %d\n " , rumMessages , users , reqs , groups , errors )
719
+ }
720
+
721
+ func fileProcessorBinary (ctx context.Context , file * os.File , recordsWriter RecordsWriter ) {
669
722
defer func () { parserStopped <- struct {}{} }()
670
723
reader := bufio .NewReader (file )
671
724
runMessage , scenarios , err := processLogHeader (reader )
@@ -675,14 +728,13 @@ func fileProcessorBinary(ctx context.Context, file *os.File) {
675
728
}
676
729
677
730
wg := & sync.WaitGroup {}
678
- records := make (chan interface {}, 10 )
731
+ records := make (chan interface {}, 100 )
679
732
records <- * runMessage
680
733
681
734
wg .Add (2 )
682
735
go processRemainingRecords (ctx , wg , reader , * runMessage , scenarios , records )
683
- go writeRecords (wg , records )
736
+ go recordsWriter . writeAll (wg , records )
684
737
wg .Wait ()
685
-
686
738
}
687
739
688
740
func parseStart (ctx context.Context , wg * sync.WaitGroup ) {
@@ -703,8 +755,12 @@ func parseStart(ctx context.Context, wg *sync.WaitGroup) {
703
755
if err != nil {
704
756
l .Errorf ("Failed to read %s file: %v\n " , simulationLogFileName , err )
705
757
}
706
- if gatlingVersion313x .MatchString (ver ) {
707
- fileProcessorBinary (ctx , file )
758
+ if ! semver .IsValid (ver ) {
759
+ ver = "v" + ver
760
+ }
761
+ if semver .Compare (ver , "v3.12.1" ) >= 0 {
762
+ writer := InfluxRecordsWriter {}
763
+ fileProcessorBinary (ctx , file , & writer )
708
764
} else {
709
765
fileProcessor (ctx , file )
710
766
}
0 commit comments