-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathcache_flusher.py
75 lines (55 loc) · 1.86 KB
/
cache_flusher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import logging
import time
logger = logging.getLogger(__name__)
def enqueue_log_entry(ts_cache, queue):
# ts_cache = {
# 'source_file_xxx': {
# 123: { # 123 is line number
# ...
# },
# ...
# },
# ...
# }
for _, source_file_cache in ts_cache.iteritems():
for _, log_entry in source_file_cache.iteritems():
queue.put(log_entry)
def flush_cache(log_cache, queue, merge):
ts_list = log_cache.keys()
if len(ts_list) == 0:
return
if not merge:
for _, logs in log_cache.iteritems():
for entry in logs:
queue.put(entry)
else:
ts_list.sort()
# keep the latest log if it is generated in about 2 second.
if time.time() - ts_list[-1] < 2:
ts_list = ts_list[:-1]
# only enqueue logs of latest 5 ts.
for log_ts in ts_list[-5:]:
enqueue_log_entry(log_cache[log_ts], queue)
for log_ts in ts_list:
del log_cache[log_ts]
def one_flush(context):
for log_name in context['cache'].keys():
log_cache = context['cache'][log_name]
log_stat = context['stat'][log_name]
merge = context['conf'][log_name].get("merge", True)
try:
flush_cache(log_cache, context['queue'], merge)
log_stat['flush_cache_error'] = None
except Exception as e:
logger.exception('failed to flush cache of: %s, %s' %
(log_name, repr(e)))
log_stat['flush_cache_error'] = repr(e)
def run(context):
while True:
start_time = time.time()
with context['cache_lock']:
one_flush(context)
time_used = time.time() - start_time
logger.info('flush at: %f, time used: %f' %
(start_time, time_used))
time.sleep(1)