-
Notifications
You must be signed in to change notification settings - Fork 5
/
analyze.py
64 lines (55 loc) · 1.66 KB
/
analyze.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
### SPDX-License-Identifier: GPL-2.0-or-later
"""Analyze log messages from a single source."""
from argparse import ArgumentParser
import sys
from .common import (
open_input,
print_loj,
)
from .parsers import PARSERS
from .analyzers import (
ANALYZERS,
Config,
)
def main():
"""Analyze log messages from a single source.
Analyze data parsed from the log messages in input. Print the test result
and data analysis as JSON.
"""
aparser = ArgumentParser(description=main.__doc__)
aparser.add_argument(
'--canonical', action='store_true',
help="input contains canonical data",
)
aparser.add_argument(
'--config',
help="YAML file specifying test requirements and parameters",
)
aparser.add_argument(
'input',
help="input file, or '-' to read from stdin",
)
aparser.add_argument(
'analyzer', choices=tuple(ANALYZERS),
help="analyzer to run over input",
)
args = aparser.parse_args()
config = Config.from_yaml(args.config) if args.config else Config()
analyzer = ANALYZERS[args.analyzer](config)
parser = PARSERS[analyzer.parser]()
with open_input(args.input) as fid:
method = parser.canonical if args.canonical else parser.parse
for parsed in method(fid):
analyzer.collect(parsed)
dct = {
'result': analyzer.result,
'timestamp': analyzer.timestamp,
'duration': analyzer.duration,
'reason': analyzer.reason,
'analysis': analyzer.analysis,
}
# Python exits with error code 1 on EPIPE
if not print_loj(dct):
sys.exit(1)
if __name__ == '__main__':
main()