@@ -588,37 +588,6 @@ func processLogHeader(reader *bufio.Reader) (*RunMessage, []string, error) {
588
588
return & runMessage , scenarios , nil
589
589
}
590
590
591
- func processRemainingRecords (
592
- ctx context.Context ,
593
- wg * sync.WaitGroup ,
594
- reader * bufio.Reader ,
595
- runMessage RunMessage ,
596
- scenarios []string ,
597
- records chan <- interface {},
598
- ) {
599
- defer close (records )
600
- defer wg .Done ()
601
-
602
- for {
603
- select {
604
- case <- ctx .Done ():
605
- l .Infoln ("Parser received closing signal. Processing stopped" )
606
- return
607
- default :
608
- record , err := ReadNotHeaderRecord (reader , runMessage .Start , scenarios )
609
- if err != nil {
610
- if err == io .EOF {
611
- time .Sleep (time .Duration (waitTime ) * time .Second ) // Wait if end of file
612
- continue
613
- }
614
- l .Errorf ("Reading error: %v" , err )
615
- continue
616
- }
617
-
618
- records <- record
619
- }
620
- }
621
- }
622
591
623
592
func writeRecords (wg * sync.WaitGroup , records <- chan interface {}) {
624
593
defer wg .Done ()
@@ -633,7 +602,6 @@ func writeRecords(wg *sync.WaitGroup, records <-chan interface{}) {
633
602
l .Errorf ("Error creating new point with test start data: %v" , err )
634
603
}
635
604
influx .SendPoint (point )
636
-
637
605
case RequestRecord :
638
606
point , err := r .ToInfluxPoint ()
639
607
if err != nil {
@@ -665,8 +633,48 @@ func writeRecords(wg *sync.WaitGroup, records <-chan interface{}) {
665
633
}
666
634
}
667
635
636
+ func processRemainingRecords (
637
+ ctx context.Context ,
638
+ wg * sync.WaitGroup ,
639
+ reader * bufio.Reader ,
640
+ runMessage RunMessage ,
641
+ scenarios []string ,
642
+ records chan <- interface {},
643
+ activity chan <- struct {},
644
+ ) {
645
+ defer close (records )
646
+ defer wg .Done ()
647
+
648
+ for {
649
+ select {
650
+ case <- ctx .Done ():
651
+ l .Infoln ("Parser received closing signal. Processing stopped" )
652
+ return
653
+ default :
654
+ record , err := ReadNotHeaderRecord (reader , runMessage .Start , scenarios )
655
+ if err != nil {
656
+ if err == io .EOF {
657
+ time .Sleep (500 * time .Millisecond ) // Wait if end of file
658
+ continue
659
+ }
660
+ l .Errorf ("Reading error: %v" , err )
661
+ continue
662
+ }
663
+
664
+ // Signal activity when new record is read
665
+ activity <- struct {}{}
666
+ records <- record
667
+ }
668
+ }
669
+ }
670
+
668
671
func fileProcessorBinary (ctx context.Context , file * os.File ) {
669
672
defer func () { parserStopped <- struct {}{} }()
673
+
674
+ // Create cancellable context
675
+ ctx , cancel := context .WithCancel (ctx )
676
+ defer cancel ()
677
+
670
678
reader := bufio .NewReader (file )
671
679
runMessage , scenarios , err := processLogHeader (reader )
672
680
if err != nil {
@@ -678,13 +686,36 @@ func fileProcessorBinary(ctx context.Context, file *os.File) {
678
686
records := make (chan interface {}, 10 )
679
687
records <- * runMessage
680
688
689
+ activity := make (chan struct {}, 1 ) // Buffer of 1 to prevent blocking
690
+
681
691
wg .Add (2 )
682
- go processRemainingRecords (ctx , wg , reader , * runMessage , scenarios , records )
692
+ go processRemainingRecords (ctx , wg , reader , * runMessage , scenarios , records , activity )
683
693
go writeRecords (wg , records )
684
- wg .Wait ()
685
694
686
- }
695
+ // Start timeout monitor
696
+ go func () {
697
+ startWait := time .Now ()
698
+ ticker := time .NewTicker (time .Second )
699
+ defer ticker .Stop ()
700
+
701
+ for {
702
+ select {
703
+ case <- activity :
704
+ startWait = time .Now ()
705
+ case <- ticker .C :
706
+ if time .Now ().After (startWait .Add (time .Duration (waitTime ) * time .Second )) {
707
+ l .Infof ("No new lines found for %d seconds. Stopping application..." , waitTime )
708
+ cancel ()
709
+ return
710
+ }
711
+ case <- ctx .Done ():
712
+ return
713
+ }
714
+ }
715
+ }()
687
716
717
+ wg .Wait ()
718
+ }
688
719
func parseStart (ctx context.Context , wg * sync.WaitGroup ) {
689
720
defer wg .Done ()
690
721
0 commit comments