@@ -588,6 +588,37 @@ func processLogHeader(reader *bufio.Reader) (*RunMessage, []string, error) {
588
588
return & runMessage , scenarios , nil
589
589
}
590
590
591
+ func processRemainingRecords (
592
+ ctx context.Context ,
593
+ wg * sync.WaitGroup ,
594
+ reader * bufio.Reader ,
595
+ runMessage RunMessage ,
596
+ scenarios []string ,
597
+ records chan <- interface {},
598
+ ) {
599
+ defer close (records )
600
+ defer wg .Done ()
601
+
602
+ for {
603
+ select {
604
+ case <- ctx .Done ():
605
+ l .Infoln ("Parser received closing signal. Processing stopped" )
606
+ return
607
+ default :
608
+ record , err := ReadNotHeaderRecord (reader , runMessage .Start , scenarios )
609
+ if err != nil {
610
+ if err == io .EOF {
611
+ time .Sleep (time .Duration (waitTime ) * time .Second ) // Wait if end of file
612
+ continue
613
+ }
614
+ l .Errorf ("Reading error: %v" , err )
615
+ continue
616
+ }
617
+
618
+ records <- record
619
+ }
620
+ }
621
+ }
591
622
592
623
func writeRecords (wg * sync.WaitGroup , records <- chan interface {}) {
593
624
defer wg .Done ()
@@ -602,6 +633,7 @@ func writeRecords(wg *sync.WaitGroup, records <-chan interface{}) {
602
633
l .Errorf ("Error creating new point with test start data: %v" , err )
603
634
}
604
635
influx .SendPoint (point )
636
+
605
637
case RequestRecord :
606
638
point , err := r .ToInfluxPoint ()
607
639
if err != nil {
@@ -633,48 +665,8 @@ func writeRecords(wg *sync.WaitGroup, records <-chan interface{}) {
633
665
}
634
666
}
635
667
636
- func processRemainingRecords (
637
- ctx context.Context ,
638
- wg * sync.WaitGroup ,
639
- reader * bufio.Reader ,
640
- runMessage RunMessage ,
641
- scenarios []string ,
642
- records chan <- interface {},
643
- activity chan <- struct {},
644
- ) {
645
- defer close (records )
646
- defer wg .Done ()
647
-
648
- for {
649
- select {
650
- case <- ctx .Done ():
651
- l .Infoln ("Parser received closing signal. Processing stopped" )
652
- return
653
- default :
654
- record , err := ReadNotHeaderRecord (reader , runMessage .Start , scenarios )
655
- if err != nil {
656
- if err == io .EOF {
657
- time .Sleep (500 * time .Millisecond ) // Wait if end of file
658
- continue
659
- }
660
- l .Errorf ("Reading error: %v" , err )
661
- continue
662
- }
663
-
664
- // Signal activity when new record is read
665
- activity <- struct {}{}
666
- records <- record
667
- }
668
- }
669
- }
670
-
671
668
func fileProcessorBinary (ctx context.Context , file * os.File ) {
672
669
defer func () { parserStopped <- struct {}{} }()
673
-
674
- // Create cancellable context
675
- ctx , cancel := context .WithCancel (ctx )
676
- defer cancel ()
677
-
678
670
reader := bufio .NewReader (file )
679
671
runMessage , scenarios , err := processLogHeader (reader )
680
672
if err != nil {
@@ -686,36 +678,13 @@ func fileProcessorBinary(ctx context.Context, file *os.File) {
686
678
records := make (chan interface {}, 10 )
687
679
records <- * runMessage
688
680
689
- activity := make (chan struct {}, 1 ) // Buffer of 1 to prevent blocking
690
-
691
681
wg .Add (2 )
692
- go processRemainingRecords (ctx , wg , reader , * runMessage , scenarios , records , activity )
682
+ go processRemainingRecords (ctx , wg , reader , * runMessage , scenarios , records )
693
683
go writeRecords (wg , records )
694
-
695
- // Start timeout monitor
696
- go func () {
697
- startWait := time .Now ()
698
- ticker := time .NewTicker (time .Second )
699
- defer ticker .Stop ()
700
-
701
- for {
702
- select {
703
- case <- activity :
704
- startWait = time .Now ()
705
- case <- ticker .C :
706
- if time .Now ().After (startWait .Add (time .Duration (waitTime ) * time .Second )) {
707
- l .Infof ("No new lines found for %d seconds. Stopping application..." , waitTime )
708
- cancel ()
709
- return
710
- }
711
- case <- ctx .Done ():
712
- return
713
- }
714
- }
715
- }()
716
-
717
684
wg .Wait ()
685
+
718
686
}
687
+
719
688
func parseStart (ctx context.Context , wg * sync.WaitGroup ) {
720
689
defer wg .Done ()
721
690
0 commit comments