-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathscanner.py
150 lines (103 loc) · 3.76 KB
/
scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import logging
import os
import time
from pykit import fsutil
logger = logging.getLogger(__name__)
class ParseLogError(Exception):
pass
def build_entry(context, log_name, file_name, log_str, log_conf):
log_entry = {
'log_name': log_name,
'log_file': file_name,
'content': log_str,
'node_id': context['node_id'],
'node_ip': context['node_ip'],
'count': 1,
}
try:
log_info = log_conf['parse'](log_str)
except Exception as e:
logger.exception('failed to parse log: %s, %s, %s' %
(log_name, log_str, repr(e)))
raise ParseLogError('faild to parse log: %s' % log_name)
log_entry.update(log_info)
return log_entry
def put_into_cache(log_cache, log_entry, merge=True, nlimit=None):
source_file = log_entry['source_file']
if not merge:
nlimit = nlimit or 10240
if source_file not in log_cache:
log_cache[source_file] = []
log_cache[source_file].append(log_entry)
if len(log_cache[source_file]) > nlimit:
log_cache[source_file] = log_cache[source_file][:nlimit]
return
log_ts = log_entry['log_ts']
line_number = log_entry['line_number']
if log_ts not in log_cache:
log_cache[log_ts] = {}
if source_file not in log_cache[log_ts]:
log_cache[log_ts][source_file] = {}
cache_source_file = log_cache[log_ts][source_file]
if line_number not in cache_source_file:
cache_source_file[line_number] = log_entry
return
old_entry = cache_source_file[line_number]
log_entry['count'] = old_entry['count'] + 1
cache_source_file[line_number] = log_entry
return
def _iter_log(log_conf):
file_path = log_conf['file_path']
log_lines = []
try:
for line in fsutil.Cat(file_path).iterate(
timeout=3, default_seek=-1024*1024*2):
if log_conf['is_first_line'](line):
if len(log_lines) > 0:
yield ''.join(log_lines)
log_lines = []
log_lines = [line]
else:
if len(log_lines) < 100:
log_lines.append(line)
except Exception as e:
logger.info('got exception: %s when iter lines of file: %s' %
(repr(e), file_path))
if len(log_lines) > 0:
yield ''.join(log_lines)
def iter_log(log_conf):
while True:
for log_str in _iter_log(log_conf):
yield log_str
time.sleep(1)
def _scan(context, log_name):
log_stat = context['stat'][log_name]
log_conf = context['conf'][log_name]
log_cache = context['cache'][log_name]
file_path = log_conf['file_path']
file_name = os.path.basename(file_path)
log_stat['total_n'] = 0
log_stat['reported_n'] = 0
for log_str in iter_log(log_conf):
log_str = log_str[:10240]
log_stat['total_n'] += 1
log_level = log_conf['get_level'](log_str)
if log_level not in log_conf['level']:
continue
log_entry = build_entry(context, log_name, file_name,
log_str, log_conf)
log_stat['latence'] = time.time() - log_entry['log_ts']
log_stat['reported_n'] += 1
cache_nlimit = log_conf.get("cache_nlimit", None)
merge = log_conf.get("merge", True)
with context['cache_lock']:
put_into_cache(log_cache, log_entry, merge, cache_nlimit)
def scan(context, log_name):
while True:
try:
_scan(context, log_name)
except Exception as e:
logger.exception('failed to scan log: %s, %s' %
(log_name, repr(e)))
context['stat'][log_name]['error'] = repr(e)
time.sleep(1)