-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDOOPRunner.py
75 lines (63 loc) · 2.87 KB
/
DOOPRunner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# ECSTATIC: Extensible, Customizable STatic Analysis Tester Informed by Configuration
#
# Copyright (c) 2022.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import os
import shutil
import subprocess
import time
from typing import List
from src.ecstatic.runners.CommandLineToolRunner import CommandLineToolRunner
from src.ecstatic.util.UtilClasses import BenchmarkRecord, FuzzingJob
logger = logging.getLogger("DOOPRunner")
class DOOPRunner(CommandLineToolRunner):
def get_timeout_option(self) -> List[str]:
return f"-t {self.timeout}".split(" ")
def get_whole_program(self) -> List[str]:
return ["--ignore-main-method"]
def get_input_option(self, benchmark_record: BenchmarkRecord) -> List[str]:
return f"-i {benchmark_record.name}".split(" ")
def get_output_option(self, output_file: str) -> List[str]:
return []
def get_task_option(self, task: str) -> List[str]:
if task == 'cg':
return []
else:
raise NotImplementedError(f'DOOP does not support task {task}.')
def get_base_command(self) -> List[str]:
return ["doop", "--dont-cache-facts", "--thorough-fact-gen"]
def run_from_cmd(self, cmd: List[str], job: FuzzingJob, output_file: str) -> str:
cmd.extend(self.get_input_option(job.target))
if self.timeout is not None:
cmd.extend(self.get_timeout_option())
if self.whole_program:
cmd.extend(self.get_whole_program())
logger.info(f"Cmd is {cmd}")
ps = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in ps.stdout.split("\n"):
if line.startswith("Making database available"):
output_dir = line.split(" ")[-1]
logger.info(f"Output directory: {output_dir}")
break
try:
intermediate_file = os.path.join(output_dir, "CallGraphEdge.csv")
except UnboundLocalError:
raise RuntimeError(ps.stdout)
shutil.move(intermediate_file, output_file)
logging.info(f'Moved {intermediate_file} to {output_file}')
logger.info(f'Now removing directory {output_dir}')
shutil.rmtree(os.path.realpath(output_dir))
return ps.stdout