@@ -591,44 +591,43 @@ func processRemainingRecords(
591
591
ctx context.Context ,
592
592
wg * sync.WaitGroup ,
593
593
reader * bufio.Reader ,
594
+ file * os.File ,
594
595
runMessage RunMessage ,
595
596
scenarios []string ,
596
597
records chan <- interface {},
597
598
) {
598
- defer func () {
599
- close (records )
600
- wg .Done ()
601
- }()
599
+ defer close (records )
600
+ defer wg .Done ()
602
601
603
602
latestReadTime := time .Now ()
603
+ lastSize := int64 (0 )
604
604
605
605
for {
606
606
select {
607
607
case <- ctx .Done ():
608
608
l .Infoln ("Parser received closing signal. Processing stopped" )
609
609
return
610
610
default :
611
+ // Check file size
612
+ if stat , _ := file .Stat (); stat .Size () == lastSize {
613
+ if time .Now ().After (latestReadTime .Add (time .Duration (waitTime ) * time .Second )) {
614
+ l .Infof ("File size unchanged for %d seconds. Stopping application..." , waitTime )
615
+ return
616
+ }
617
+ } else {
618
+ lastSize = stat .Size ()
619
+ latestReadTime = time .Now ()
620
+ }
621
+
611
622
record , err := ReadNotHeaderRecord (reader , runMessage .Start , scenarios )
612
623
if err != nil {
613
- if err == io .EOF {
614
- // If no new data read for more than value provided by 'stop-timeout' key then processing is stopped
615
- if time .Now ().After (latestReadTime .Add (time .Duration (waitTime ) * time .Second )) {
616
- l .Infof ("No new entries found for %d seconds. Stopping application..." , waitTime )
617
- return
618
- }
619
- time .Sleep (time .Second ) // Wait if end of file
620
- continue
621
- }
622
- l .Errorf ("Reading error: %v" , err )
624
+ time .Sleep (100 * time .Millisecond )
623
625
continue
624
626
}
625
-
626
627
records <- record
627
- latestReadTime = time .Now ()
628
628
}
629
629
}
630
630
}
631
-
632
631
type RecordsWriter interface {
633
632
writeAll (wg * sync.WaitGroup , records <- chan interface {})
634
633
}
@@ -718,9 +717,10 @@ func (w *SumRecordsWriter) writeAll(wg *sync.WaitGroup, records <-chan interface
718
717
l .Debugf ("msg = %d, users = %d, reqs = %d, groups = %d, errors = %d\n " , rumMessages , users , reqs , groups , errors )
719
718
}
720
719
721
- func fileProcessorBinary (ctx context.Context , file * os.File , recordsWriter RecordsWriter ) {
720
+ func fileProcessorBinary (ctx context.Context , file * os.File , recordsWriter RecordsWriter ) {
722
721
defer func () { parserStopped <- struct {}{} }()
723
722
reader := bufio .NewReader (file )
723
+
724
724
runMessage , scenarios , err := processLogHeader (reader )
725
725
if err != nil {
726
726
l .Errorf ("Log file %s reading error: %v" , file .Name (), err )
@@ -732,7 +732,7 @@ func fileProcessorBinary(ctx context.Context, file *os.File, recordsWriter Reco
732
732
records <- * runMessage
733
733
734
734
wg .Add (2 )
735
- go processRemainingRecords (ctx , wg , reader , * runMessage , scenarios , records )
735
+ go processRemainingRecords (ctx , wg , reader , file , * runMessage , scenarios , records )
736
736
go recordsWriter .writeAll (wg , records )
737
737
wg .Wait ()
738
738
}
0 commit comments