@@ -99,11 +99,14 @@ def _wait_flush(self, timeout, callback):
99
99
# type: (float, Optional[Any]) -> None
100
100
initial_timeout = min (0.1 , timeout )
101
101
if not self ._timed_queue_join (initial_timeout ):
102
- pending = self ._queue .qsize ()
102
+ pending = self ._queue .qsize () + 1
103
103
logger .debug ("%d event(s) pending on flush" , pending )
104
104
if callback is not None :
105
105
callback (pending , timeout )
106
- self ._timed_queue_join (timeout - initial_timeout )
106
+
107
+ if not self ._timed_queue_join (timeout - initial_timeout ):
108
+ pending = self ._queue .qsize () + 1
109
+ logger .error ("flush timed out, dropped %s events" , pending )
107
110
108
111
def submit (self , callback ):
109
112
# type: (Callable[[], None]) -> None
@@ -115,7 +118,7 @@ def submit(self, callback):
115
118
116
119
def on_full_queue (self , callback ):
117
120
# type: (Optional[Any]) -> None
118
- logger .debug ("background worker queue full, dropping event" )
121
+ logger .error ("background worker queue full, dropping event" )
119
122
120
123
def _target (self ):
121
124
# type: () -> None
0 commit comments