-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathcollector.py
53 lines (38 loc) · 1.33 KB
/
collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import logging
import Queue
import threading
import time
from datetime import datetime
from pykit import threadutil
from pykit.logcollector import cache_flusher
from pykit.logcollector import scanner
from pykit.logcollector import sender
logger = logging.getLogger(__name__)
def run(**kwargs):
context = {
'node_id': kwargs['node_id'],
'node_ip': kwargs['node_ip'],
'send_log': kwargs['send_log'],
'conf': kwargs['conf'],
'cache_lock': threading.RLock(),
'cache': {},
'stat': {},
'queue': Queue.Queue(1024 * 10),
}
# strptime not thread safe, need to call it manually before
# initiating any thread
datetime.strptime("2011-04-05", "%Y-%m-%d")
for log_name in context['conf'].keys():
context['cache'][log_name] = {}
context['stat'][log_name] = {}
threadutil.start_daemon_thread(
scanner.scan,
args=(context, log_name))
threadutil.start_daemon_thread(cache_flusher.run, args=(context,))
threadutil.start_daemon_thread(sender.run, args=(context,))
while True:
# actually it is not an error log, but normally we only report
# error log, and we want to report this log even it is not
# an error log.
logger.error('stat: %s' % context['stat'])
time.sleep(100)