From 406bc07c281729b47e6521690b1fcdb0b57afd4f Mon Sep 17 00:00:00 2001 From: 1bitbool <1bitbool@gmail.com> Date: Wed, 21 Aug 2024 22:00:51 +0800 Subject: [PATCH] Add: ExplanationGenerator --- ExplanationGenerator/__init__.py | 0 ExplanationGenerator/config.py | 134 ++++ ExplanationGenerator/exceptions.py | 51 ++ ExplanationGenerator/explainer.py | 261 +++++++ ExplanationGenerator/extractor.py | 285 ++++++++ ExplanationGenerator/my_types.py | 9 + ExplanationGenerator/precheck.py | 396 +++++++++++ ExplanationGenerator/prompt.py | 667 ++++++++++++++++++ ExplanationGenerator/report_fill.py | 124 ++++ ExplanationGenerator/report_generator.py | 119 ++++ ExplanationGenerator/report_template.md | 25 + ExplanationGenerator/run.py | 72 ++ .../survery_global_summary_generator.py | 152 ++++ ExplanationGenerator/utils/__init__.py | 0 ExplanationGenerator/utils/cg.py | 130 ++++ .../utils/constraint_checker.py | 157 +++++ ExplanationGenerator/utils/helper.py | 36 + ExplanationGenerator/utils/llm.py | 100 +++ ExplanationGenerator/utils/parser.py | 504 +++++++++++++ 19 files changed, 3222 insertions(+) create mode 100644 ExplanationGenerator/__init__.py create mode 100644 ExplanationGenerator/config.py create mode 100644 ExplanationGenerator/exceptions.py create mode 100644 ExplanationGenerator/explainer.py create mode 100644 ExplanationGenerator/extractor.py create mode 100644 ExplanationGenerator/my_types.py create mode 100644 ExplanationGenerator/precheck.py create mode 100644 ExplanationGenerator/prompt.py create mode 100644 ExplanationGenerator/report_fill.py create mode 100644 ExplanationGenerator/report_generator.py create mode 100644 ExplanationGenerator/report_template.md create mode 100644 ExplanationGenerator/run.py create mode 100644 ExplanationGenerator/survery_global_summary_generator.py create mode 100644 ExplanationGenerator/utils/__init__.py create mode 100644 ExplanationGenerator/utils/cg.py create mode 100644 ExplanationGenerator/utils/constraint_checker.py create mode 100644 ExplanationGenerator/utils/helper.py create mode 100644 ExplanationGenerator/utils/llm.py create mode 100644 ExplanationGenerator/utils/parser.py diff --git a/ExplanationGenerator/__init__.py b/ExplanationGenerator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ExplanationGenerator/config.py b/ExplanationGenerator/config.py new file mode 100644 index 00000000..7a6cbcdf --- /dev/null +++ b/ExplanationGenerator/config.py @@ -0,0 +1,134 @@ +# Run Configuration +from enum import Enum +from pathlib import Path + + +class RunMode(Enum): + ALL = "all" + PENDING = "pending" + + +RUN_MODE = RunMode.ALL +PENDING_REPORTS = ["com.netease.newsreader.activity-481", "com.myung.snsday-45"] + +ATTEMPT_TIMES = 3 +bypass_signature = { + # " `{method}`" + terminal_api = call_methods[0] + + reason = KEY_VAR_TERMINAL_PROMPT(framework_entry_api, call_chain_to_entry) + elif reason_type == ReasonType.KEY_VAR_NON_TERMINAL.value: + if terminal_api is not None: + call_methods = candidate['Reasons'][0]['M_app Trace to Crash API'] + call_chain_to_terminal = f"`{call_methods[0]}`" + for method in call_methods[1:]: + call_chain_to_terminal += f" -> `{method}`" + if method == terminal_api: + break + reason = KEY_VAR_NON_TERMINAL_AFTER_TERMINAL_PROMPT(framework_entry_api, terminal_api, call_chain_to_terminal) + else: + raise NotImplementedError("Non-terminal key variable explanation is not implemented yet") + elif reason_type == ReasonType.KEY_API_INVOKED.value: + key_api = candidate['Reasons'][0]['M_frame Triggered KeyAPI'] + key_field = candidate['Reasons'][0]['M_frame Influenced Field'] + if not key_api in key_api_effects: + effect = summarize_key_api_effect(candidate, report_info) + key_api_effects[key_api] = effect + reason = KEY_API_INVOKED_PROMPT(key_api, key_field, key_api_effects[key_api]) + elif reason_type == ReasonType.KEY_API_EXECUTED.value: + reason = KEY_API_EXECUTED_PROMPT() + elif reason_type == ReasonType.KEY_VAR_MODIFIED_FIELD.value: + _, api, field = _extract_var4_field_and_passed_method(candidate['Reasons'][0]['Explanation Info']) + reason = KEY_VAR_MODIFIED_FIELD_PROMPT(field, api) + else: + raise NotImplementedError(f"Unknown explanation type {reason_type}") + + messages.append( + { + "role": "user", + "content": EXPLAINER_USER_PROMPT(method_code, reason) + } + ) + messages, system_fingerprint = send_message(messages) + return messages, terminal_api, key_api_effects + + +def prepare_init_message(report_info, constraint): + messages = EXPLAINER_INIT_PROMPT.copy() + crash_info = { + "Stack Trace": report_info["stack_trace"], + "Crash Message": report_info["crash_message"], + "Android Version": report_info["android_version"], + } + crash_info = json.dumps(crash_info, indent=4) + messages.append( + { + "role": "user", + "content": EXPLAINER_CRASH_PROMPT(crash_info, constraint) + } + ) + return messages + + +def write_explanation(explanations: list, result_dir): + with open(f"{result_dir}/explanation.txt", "w") as f: + for explanation in explanations: + f.write(f"Candidate Name: {explanation['Candidate_Name']}\n\n") + f.write(f"Analysis: ```\n{explanation['Analysis']}\n```\n\n") + if "Android_Knowledge" in explanation: + f.write(f"Android_Knowledge: ```\n{explanation['Android_Knowledge']}\n```\n\n") + f.write(f"Explanation: ```\n{explanation['Explanation']}\n```\n\n") + f.write("-----------------------------------\n") + + +def explain_candidates(report_info, constraint, result_dir): + import os + from ExplanationGenerator.utils.parser import NodeNotFoundException, MultipleNodeException + candidates = sort_candidates(report_info) + terminal_api = find_terminal_api(candidates) + + def dump_result(messages, key_api_effects, explanations): + write_conversation(messages, f"{result_dir}/candidate_conversation.txt") + json.dump(messages, open(f"{result_dir}/candidate_conversation.json", "w"), indent=4) + json.dump(key_api_effects, open(f"{result_dir}/key_api_effects.json", "w"), indent=4) + json.dump(explanations, open(f"{result_dir}/explanation.json", "w"), indent=4) + write_explanation(explanations, result_dir) + + # Cache explanation + if os.path.exists(f"{result_dir}/candidate_conversation.json"): + messages = json.load(open(f"{result_dir}/candidate_conversation.json", "r")) + key_api_effects = json.load(open(f"{result_dir}/key_api_effects.json", "r")) + explanations = json.load(open(f"{result_dir}/explanation.json", "r")) + candidates_len = len(candidates) + messages_len = (len(messages) - 2) / 2 + if candidates_len == messages_len: + return + candidates_new = [] + for candidate in candidates: + if candidate["Candidate Name"] not in [explanation["Candidate_Name"] for explanation in explanations]: + candidates_new.append(candidate) + candidates = candidates_new + else: + messages = prepare_init_message(report_info, constraint) + explanations = [] + key_api_effects = {} + if len(candidates) > 6: + raise TooMuchCandidateError("Too much candidates to explain") + for index, candidate in enumerate(candidates): + logger.info(f"Explaining {candidate['Candidate Name']}, candidate: {index + 1}/{len(candidates)}") + attempt = 0 + MAX_ATTEMPT = 3 + + while attempt < MAX_ATTEMPT: + try: + messages, _, key_api_effects = llm_explain(messages, candidate, report_info, terminal_api, key_api_effects) + + parsed = parse_message(messages[-1]["content"]) + if "Analysis" not in parsed or "Explanation" not in parsed: + raise LLMOutputFormatError("Invalid output format from LLM") + explanation = { + "Candidate_Name": candidate["Candidate Name"], + "Analysis": parsed["Analysis"], + "Explanation": parsed["Explanation"], + } + if "Android_Knowledge" in parsed: + explanation["Android_Knowledge"] = parsed["Android_Knowledge"] + logger.info(f"Explanation for {candidate['Candidate Name']} is generated!") + except NodeNotFoundException as e: + logger.error(f"Failed to generate explanation for {candidate['Candidate Name']}: {e}") + explanation = { + "Candidate_Name": candidate["Candidate Name"], + "Analysis": "Method not found", + "Explanation": "Method not found", + } + break + except MultipleNodeException as e: + logger.error(f"Failed to generate explanation for {candidate['Candidate Name']}: {e}") + explanation = { + "Candidate_Name": candidate["Candidate Name"], + "Analysis": "Multiple nodes found", + "Explanation": "Multiple nodes found", + } + break + except FileNotFoundError as e: + logger.error(f"Failed to generate explanation for {candidate['Candidate Name']}: {e}") + explanation = { + "Candidate_Name": candidate["Candidate Name"], + "Analysis": "File not found", + "Explanation": "File not found", + } + break + except LLMOutputFormatError as e: + logger.error(f"Failed to generate explanation for {candidate['Candidate Name']}: {e}") + explanation = { + "Candidate_Name": candidate["Candidate Name"], + "Analysis": "Invalid output format from LLM", + "Explanation": "Invalid output format from LLM", + } + attempt += 1 + logger.error(f"Retrying... {attempt}/{MAX_ATTEMPT}") + else: + break + + explanations.append(explanation) + dump_result(messages, key_api_effects, explanations) + + dump_result(messages, key_api_effects, explanations) + + return + \ No newline at end of file diff --git a/ExplanationGenerator/extractor.py b/ExplanationGenerator/extractor.py new file mode 100644 index 00000000..72d97bc8 --- /dev/null +++ b/ExplanationGenerator/extractor.py @@ -0,0 +1,285 @@ +from ExplanationGenerator.config import logger +from ExplanationGenerator.prompt import ( + EXTRACTOR_INIT_PROMPT, + EXTRACTOR_USER_PROMPT, + INFERRER_INIT_PROMPT, + INFERRER_USER_PROMPT, +) +from ExplanationGenerator.exceptions import LLMOutputFormatError, ConstraintBasicCheckError, ConstraintStaticAnalysisCheckError, ConstraintCheckError +from ExplanationGenerator.utils.parser import get_framework_method_snippet, get_method_snippet +from ExplanationGenerator.utils.llm import send_message, pretty_log_conversation, save_messages +from ExplanationGenerator.utils.parser import parse_message +import os +import json + + +def extracted_message_hash_helper(messages, regression_message): + import re + + replaced_messages = messages.copy() + last_message = replaced_messages[-1]["content"] + safe_regression_message = re.escape(regression_message) + pattern = r"Exception Message: .*\s```" + last_message = re.sub(pattern, f"Exception Message: {safe_regression_message}\n```", last_message) + replaced_messages[-1] = { + "role": messages[-1]["role"], + "content": last_message + } + + return replaced_messages + + +def extract_constraint(apk_name, method_signature, android_version, exception_name, crash_message, framework_reference_fields, regression_message, allow_cache=True, init_prompt=None, check=True): + code_snippet = get_method_snippet(method_signature, apk_name, android_version, framework_reference_fields) + logger.debug(f"code_snippet for {method_signature}:\n {code_snippet}") + + if init_prompt is None: + messages = EXTRACTOR_INIT_PROMPT.copy() + else: + messages = init_prompt.copy() + messages.append( + {"role": "user", "content": EXTRACTOR_USER_PROMPT(code_snippet, exception_name, crash_message)} + ) + + # Check if these is cache + import json + import hashlib + from ExplanationGenerator.config import EXTRACT_CACHE_DIRECTORY, EXTRACT_CACHE_CONSTRAINT_PATH, EXTRACT_CACHE_MESSAGES_PATH + + hit_cache = False + query_messages = extracted_message_hash_helper(messages, regression_message) + query_str = json.dumps([json.dumps(x, sort_keys=True) for x in query_messages]) + query_hash = hashlib.sha256(query_str.encode()).hexdigest() + if os.path.exists(EXTRACT_CACHE_DIRECTORY(query_hash)) and allow_cache: + result_messages = json.load(open(EXTRACT_CACHE_MESSAGES_PATH(query_hash), "r")) + constraint = open(EXTRACT_CACHE_CONSTRAINT_PATH(query_hash), "r").read() + hit_cache = True + logger.info(f"Hit cache for query {query_hash}") + else: + result_messages, system_fingerprint = send_message(messages, json_mode=False) + pretty_log_conversation(result_messages) + if check: + extracted = parse_message(result_messages[-1]["content"]) + if "Constraint" not in extracted or "Analysis" not in extracted: + raise LLMOutputFormatError("Invalid output format from LLM") + constraint = extracted["Constraint"] + else: + constraint = result_messages[-1]["content"] + return constraint, result_messages, hit_cache + + +def infer_constraint( + inferred_method_signature, + android_version, + original_constraint, + framework_reference_fields, + received_messages, + apk_name +): + code_snippet = get_method_snippet(inferred_method_signature, apk_name, android_version, framework_reference_fields) + + messages = received_messages.copy() + messages.append( + { + "role": "user", + "content": INFERRER_USER_PROMPT(code_snippet, original_constraint), + } + ) + + # Check if these is cache + import json + import hashlib + from ExplanationGenerator.config import EXTRACT_CACHE_DIRECTORY, EXTRACT_CACHE_CONSTRAINT_PATH, EXTRACT_CACHE_MESSAGES_PATH + + hit_cache = False + query_messages = messages.copy() + query_str = json.dumps([json.dumps(x, sort_keys=True) for x in query_messages]) + query_hash = hashlib.sha256(query_str.encode()).hexdigest() + + if os.path.exists(EXTRACT_CACHE_DIRECTORY(query_hash)): + result_messages = json.load(open(EXTRACT_CACHE_MESSAGES_PATH(query_hash), "r")) + constraint = open(EXTRACT_CACHE_CONSTRAINT_PATH(query_hash), "r").read() + hit_cache = True + logger.info(f"Hit cache for query {query_hash}") + else: + result_messages, system_fingerprint = send_message(messages, json_mode=False) + pretty_log_conversation(result_messages) + extracted = parse_message(result_messages[-1]["content"]) + if "Constraint" not in extracted or "Analysis" not in extracted: + raise LLMOutputFormatError("Invalid output format from LLM") + constraint = extracted["Constraint"] + return constraint, result_messages, hit_cache + + +def query_framework_constraint(report_info, result_path): + def save_constraint( + constraint, extract_constraint_messages, infer_constraint_messages + ): + with open(f"{result_path}/constraint.txt", "w") as f: + f.write(constraint) + save_messages( + extract_constraint_messages, + result_path, + "extract_conversation", + ) + save_messages( + infer_constraint_messages, + result_path, + "infer_conversation", + ) + + def cache_extract_constraint(constraint, extract_message, regression_message): + from ExplanationGenerator.config import EXTRACT_CACHE_DIRECTORY, EXTRACT_CACHE_CONSTRAINT_PATH + import hashlib + import json + + query_messages = extracted_message_hash_helper(extract_message[:-1], regression_message) + query_str = json.dumps([json.dumps(x, sort_keys=True) for x in query_messages]) + query_hash = hashlib.sha256(query_str.encode()).hexdigest() + + if not os.path.exists(EXTRACT_CACHE_DIRECTORY(query_hash)): + os.makedirs(EXTRACT_CACHE_DIRECTORY(query_hash), exist_ok=True) + with open(EXTRACT_CACHE_CONSTRAINT_PATH(query_hash), "w") as f: + f.write(constraint) + save_messages(extract_message, EXTRACT_CACHE_DIRECTORY(query_hash), "messages") + logger.info(f"Cache extracted constraint for query {query_hash}") + + def cache_infer_constraint(constraint, infer_messages): + from ExplanationGenerator.config import EXTRACT_CACHE_DIRECTORY, EXTRACT_CACHE_CONSTRAINT_PATH + import hashlib + import json + + query_messages = infer_messages[:-1].copy() + query_str = json.dumps([json.dumps(x, sort_keys=True) for x in query_messages]) + query_hash = hashlib.sha256(query_str.encode()).hexdigest() + + if not os.path.exists(EXTRACT_CACHE_DIRECTORY(query_hash)): + os.makedirs(EXTRACT_CACHE_DIRECTORY(query_hash), exist_ok=True) + with open(EXTRACT_CACHE_CONSTRAINT_PATH(query_hash), "w") as f: + f.write(constraint) + save_messages(infer_messages, EXTRACT_CACHE_DIRECTORY(query_hash), "messages") + logger.info(f"Cache inferred constraint for query {query_hash}") + + # Cache constraint + if os.path.exists(f"{result_path}/constraint.txt"): + with open(f"{result_path}/constraint.txt", "r") as f: + constraint = f.read() + query_count = len(report_info["framework_trace"]) + return constraint, 0, query_count, query_count + + from ExplanationGenerator.extractor import extract_constraint, infer_constraint + from ExplanationGenerator.utils.constraint_checker import fill_full_signature, parse_constraint, constraint_basic_check, constraint_static_analysis_check + from ExplanationGenerator.config import ATTEMPT_TIMES + from itertools import zip_longest + + apk_name = report_info["apk_name"] + framework_trace = report_info["framework_trace"] + android_version = report_info["android_version"] + exception_type = report_info["exception_type"] + crash_message = report_info["crash_message"] + framework_pass_chain = report_info["framework_pass_chain"] + framework_reference_fields = report_info["framework_reference_fields"] + regression_message = report_info["regression_message"] + bypass_count = 0 + cache_hit_count = 0 + + first = True + infer_messages = INFERRER_INIT_PROMPT.copy() + for index, (method_signature, pass_chain_indexes) in enumerate(zip_longest( + framework_trace, framework_pass_chain + )): + logger.info(f"Querying constraint of {method_signature}, framework trace: {index + 1}/{len(framework_trace)}") + attempts = 0 + successful = False + constraints = [] + messages = [] + while attempts < ATTEMPT_TIMES: + try: + if first: + constraint_unchecked, message_unchecked, hit_cache = extract_constraint(apk_name, method_signature, android_version, exception_type, crash_message, framework_reference_fields, regression_message + ) + else: + constraint_unchecked, message_unchecked, hit_cache = infer_constraint( + method_signature, + android_version, + constraint, + framework_reference_fields, + infer_messages, + apk_name + ) + # TODO: Check system_fingerprint + constraint_basic_check(constraint_unchecked, method_signature, android_version, apk_name) + constraints.append(constraint_unchecked) + messages.append(message_unchecked) + if hit_cache is False: + if (pass_chain_indexes is not None and len(pass_chain_indexes) > 0) or len(framework_reference_fields) > 0: + constraint_static_analysis_check( + constraint_unchecked, method_signature, android_version, pass_chain_indexes, framework_reference_fields + ) + elif hit_cache is True: + cache_hit_count += 1 + logger.info("Hit cache, skip static analysis check") + else: + raise ValueError("What happened?") + except LLMOutputFormatError as e: + attempts += 1 + logger.error(f"LLM output format error: {e}") + continue + except ConstraintBasicCheckError as e: + attempts += 1 + logger.error(f"Constraint basic check failed: {e}, attempts: {attempts}/{ATTEMPT_TIMES}, constraint: {constraint_unchecked}") + continue + except ConstraintStaticAnalysisCheckError as e: + attempts += 1 + logger.error(f"Constraint static analysis check failed: {e}, attempts: {attempts}/{ATTEMPT_TIMES}, constraint: {constraint_unchecked}, pass_chain_indexes: {pass_chain_indexes}") + continue + else: + successful = True + break + + constraint_idx = -1 + if not successful: + if len(constraints) == 0: + raise ConstraintCheckError(f"No constraint pass basic check after {ATTEMPT_TIMES} attempts in {apk_name}") + # If all attempts failed, try to check all constraint is the same + constraint_items_list = [] + for constraint in constraints: + _, constraint_items = parse_constraint(constraint) + constraint_items_list.append(set(constraint_items)) + longest_constraint_items = constraint_items_list[0] + longest_index = 0 + for index, constraint_items in enumerate(constraint_items_list): + if len(constraint_items) > len(longest_constraint_items): + longest_constraint_items = constraint_items + longest_index = index + for constraint_items in constraint_items_list: + if constraint_items & longest_constraint_items != longest_constraint_items: + failed_path = f"Meta/Results/failed_list/{apk_name}" + os.makedirs(failed_path, exist_ok=True) + with open(f"{failed_path}/constraints.json", "w") as f: + json.dump(constraints, f, indent=4) + raise ConstraintCheckError(f"Cannot pass constraint consistent check for {len(constraints)} constraints after {ATTEMPT_TIMES} attempts in {apk_name}") + bypass_count += 1 + constraint_idx = longest_index + logger.info("All attempts failed, but constraints are consistent, bypassing") + + msg = messages[constraint_idx] + constraint = constraints[constraint_idx] + if not first: + infer_messages = msg + cache_infer_constraint(constraint, infer_messages) + else: + extract_message = msg + cache_extract_constraint(constraint, extract_message, regression_message) + first = False + constraint = fill_full_signature(framework_trace[-1], constraint) + save_constraint(constraint, extract_message, infer_messages) + + query_count = len(framework_trace) + return constraint, bypass_count, query_count, cache_hit_count + + +if __name__ == "__main__": + extract_constraint( + "android.app.ContextImpl.startActivity", 2.3, "AndroidRuntimeException" + ) diff --git a/ExplanationGenerator/my_types.py b/ExplanationGenerator/my_types.py new file mode 100644 index 00000000..c323d5f4 --- /dev/null +++ b/ExplanationGenerator/my_types.py @@ -0,0 +1,9 @@ +from enum import Enum + +class ReasonType(Enum): + KEY_VAR_TERMINAL = "Key Variable Related 1" + KEY_VAR_NON_TERMINAL = "Key Variable Related 2" + KEY_API_INVOKED = "Key API Related 1" + KEY_API_EXECUTED = "Key API Related 2 (Executed)" + KEY_VAR_MODIFIED_FIELD = "Key Variable Related 4" # The method change some field value which is passed into crash API + \ No newline at end of file diff --git a/ExplanationGenerator/precheck.py b/ExplanationGenerator/precheck.py new file mode 100644 index 00000000..a60a8d47 --- /dev/null +++ b/ExplanationGenerator/precheck.py @@ -0,0 +1,396 @@ +from ExplanationGenerator.config import ( + RunMode, + PENDING_REPORTS, + CRASH_REPORT_PATH, + logger, + PRE_CHECK_RESULT_PATH, + RESULT_SUMMARY_PATH +) +from ExplanationGenerator.utils.helper import MethodType, get_method_type +import json +import os +import shutil +from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm + +class InvalidFrameworkMethodException(Exception): + pass + +class NoFrameworkMethodException(Exception): + pass + +class EmptyExceptionInfoException(Exception): + pass + +class InvalidStateException(Exception): + pass + +class InvalidApplicationMethodException(Exception): + pass + + +def report_completion(report): + """ + Complete the full signature stack trace of report. + """ + apk_name = report["Crash Info in Dataset"]["Apk name"] + android_version = report["Fault Localization by CrashTracker"]["Exception Info"]["Target Version of Framework"] + stack_trace = report["Crash Info in Dataset"]["stack trace signature"] + + def complete_self_invoke_trace(stack_trace, apk_name, android_version): + stack_trace_reverse = list(reversed(stack_trace)) + for index, (first_sig, second_sig) in enumerate(zip(stack_trace_reverse, stack_trace_reverse[1:])): + if first_sig != second_sig: + continue + if ';' not in first_sig: + continue + + end_index = index + 1 + while end_index < (len(stack_trace_reverse) - 1) and stack_trace_reverse[end_index + 1] == first_sig: + end_index += 1 + + methods = set([s.strip().strip("<>") for s in first_sig.split(";")]) + + valid_count = 0 + next_method = {} + for m in methods: + called_methods = get_called_methods(m, apk_name, android_version) + if len(called_methods) == 1: + called_method = called_methods.pop() + if called_method in methods: + next_method[m] = called_method + valid_count += 1 + + if valid_count != len(methods) - 1: + continue + invoke_list = [] + for m in methods: + if m not in set(next_method.values()): + invoke_list.append(m) + break + + while invoke_list[-1] in next_method: + invoke_list.append(next_method[invoke_list[-1]]) + + for i in range(end_index, index - 1, -1): + stack_trace_reverse[i] = invoke_list.pop() + + return list(reversed(stack_trace_reverse)) + + return None + + def complete_stack_trace(stack_trace, apk_name, android_version, call_func): + from ExplanationGenerator.utils.parser import parse_signature, InvalidSignatureException + + for index, (first_sig, second_sig) in enumerate(zip(stack_trace, stack_trace[1:])): + try: + parse_signature(first_sig) + except InvalidSignatureException: + continue + if ';' not in second_sig: + continue + + called_methods = call_func(first_sig, apk_name, android_version) + second_sig_set = set([s.strip().strip("<>") for s in second_sig.split(";")]) + common_methods = called_methods & second_sig_set + + if len(common_methods) == 1: + stack_trace[index + 1] = common_methods.pop() + return True + return False + + from ExplanationGenerator.utils.cg import get_called_methods, get_callers_method + + while True: + new_trace = complete_self_invoke_trace(stack_trace, apk_name, android_version) + if new_trace is not None: + stack_trace = new_trace + continue + break + + while True: + stack_trace_reverse = list(reversed(stack_trace)) + if complete_stack_trace(stack_trace_reverse, apk_name, android_version, get_called_methods): + stack_trace = list(reversed(stack_trace_reverse)) + continue + elif complete_stack_trace(stack_trace, apk_name, android_version, get_callers_method): + continue + break + + report["Crash Info in Dataset"]["stack trace signature"] = stack_trace + + +def fetch_new_pass_chain(report_name): + from ExplanationGenerator.config import PASS_CHAIN_REPORT_PATH + + report_path = PASS_CHAIN_REPORT_PATH(report_name) + with open(report_path, "r") as f: + new_report = json.load(f) + pass_chain = new_report["Fault Localization by CrashTracker"]["Exception Info"]["Framework Variable PassChain Info"] + return pass_chain + + +def fetch_information(report, write_report=True): + from ExplanationGenerator.utils.parser import parse_signature, get_framework_method_snippet, get_application_method_snippet, NodeNotFoundException, InvalidSignatureException, MultipleNodeException + from ExplanationGenerator.config import bypass_signature + from javalang.parser import JavaSyntaxError + + first_or_none = lambda l: l[0] if len(l) > 0 else None + bypass_signature_result = lambda method: first_or_none([value for key, value in bypass_signature.items() if method.startswith(key)]) + + apk_name = report["Crash Info in Dataset"]["Apk name"] + if len(report["Fault Localization by CrashTracker"]["Exception Info"]) == 0: + raise EmptyExceptionInfoException(f"Empty exception info for {apk_name}") + report_completion(report) + android_version = report["Fault Localization by CrashTracker"]["Exception Info"][ + "Target Version of Framework" + ] + regression_message = report["Fault Localization by CrashTracker"]["Exception Info"]["Regression Message"] + exception_type = report["Crash Info in Dataset"]["Exception Type"].split(".")[-1] + if '$' in exception_type: + exception_type = exception_type.split("$")[-1] + ets_related_type = report["Fault Localization by CrashTracker"]["Exception Info"]["ETS-related Type"] + related_variable_type = report["Fault Localization by CrashTracker"]["Exception Info"]["Related Variable Type"] + related_condition_type = report["Fault Localization by CrashTracker"]["Exception Info"]["Related Condition Type"] + stack_trace = [method if bypass_signature_result(method) is None else bypass_signature_result(method) for method in report["Crash Info in Dataset"]["stack trace signature"]] + stack_trace = [method.strip("<>") for method in stack_trace] + stack_trace_short_api = report["Crash Info in Dataset"]["stack trace"] + candidates = report["Fault Localization by CrashTracker"]["Buggy Method Candidates"] + crash_message = report["Crash Info in Dataset"]["Crash Message"] + + framework_trace = [] + framework_short_trace = [] + divider_index = None + + + for index, (method, method_short_api) in enumerate(zip(stack_trace, stack_trace_short_api)): + try: + method_type = get_method_type(method) + if method_type == MethodType.ANDROID: + framework_trace.append(method) + framework_short_trace.append(method_short_api) + get_framework_method_snippet(method, android_version) + elif method_type == MethodType.ANDROID_SUPPORT: + framework_trace.append(method) + framework_short_trace.append(method_short_api) + get_application_method_snippet(method, apk_name) + elif method_type == MethodType.JAVA: + raise Exception("Java method in framework stack trace") + elif method_type == MethodType.APPLICATION: + divider_index = index + break + except JavaSyntaxError: + raise InvalidFrameworkMethodException( + f"Occur Java syntax error in {method_short_api}, full signature: {method}" + ) + except InvalidSignatureException: + raise InvalidFrameworkMethodException( + f"Invalid signature for {method_short_api}, full signature: {method}" + ) + except (NodeNotFoundException, FileNotFoundError, MultipleNodeException): + raise InvalidFrameworkMethodException( + f"Failed to find framework method snippet for {method_short_api}" + ) + if len(framework_trace) == 0: + raise NoFrameworkMethodException("Failed to find any framework method in stack trace") + + if divider_index is None: + raise InvalidStateException("Failed to find divider index for stack trace") + + pending_application_trace = stack_trace[divider_index:] + pending_application_short_trace = stack_trace_short_api[divider_index:] + application_trace = [] + application_short_trace = [] + for method, method_short_api in zip(pending_application_trace, pending_application_short_trace): + try: + method_type = get_method_type(method) + if method_type == MethodType.ANDROID: + break + elif method_type == MethodType.ANDROID_SUPPORT: + break + elif method_type == MethodType.JAVA: + break + elif method_type == MethodType.APPLICATION: + # get_application_method_snippet(method, apk_name) + application_trace.append(method) + application_short_trace.append(method_short_api) + except JavaSyntaxError: + raise InvalidApplicationMethodException( + f"Occur Java syntax error in {method_short_api}, full signature: {method}" + ) + except InvalidSignatureException: + raise InvalidApplicationMethodException( + f"Invalid signature for {method_short_api}, full signature: {method}" + ) + except (NodeNotFoundException, FileNotFoundError, MultipleNodeException): + raise InvalidApplicationMethodException( + f"Failed to find application method snippet for {method_short_api}" + ) + + if "Framework Variable PassChain Info" in report["Fault Localization by CrashTracker"]["Exception Info"]: + try: + pass_chain = fetch_new_pass_chain(apk_name) + except KeyError: + pass_chain = report["Fault Localization by CrashTracker"]["Exception Info"]["Framework Variable PassChain Info"] + + # Check consistency between pass chain and framework trace + if len(pass_chain) > len(framework_trace): + msg = f"Length of pass chain {len(pass_chain)} is greater than length of framework trace {len(framework_trace)}" + logger.error(msg) + raise InvalidStateException(msg) + for i in range(len(pass_chain)): + method_signature, pass_indexes = pass_chain[i].split("@") + parsed_item_from_pass_chain = parse_signature(method_signature) + parsed_item_from_framework_trace = parse_signature(framework_trace[i]) + is_matched = True + for j in range(len(parsed_item_from_pass_chain)): + if ( + parsed_item_from_pass_chain[j] is None + or parsed_item_from_framework_trace[j] is None + ): + continue + if parsed_item_from_pass_chain[j] != parsed_item_from_framework_trace[j]: + is_matched = False + break + + if is_matched: + pass_indexes = json.loads(pass_indexes.strip()) + pass_chain[i] = pass_indexes + else: + pass_chain = pass_chain[:i] + break + else: + pass_chain = [] + + framework_reference_fields = [] + if "Field2InitialMethod" in report["Fault Localization by CrashTracker"]["Exception Info"]: + framework_reference_fields = [key.strip("<>") for key in report["Fault Localization by CrashTracker"]["Exception Info"]["Field2InitialMethod"]] + for candidate in report["Fault Localization by CrashTracker"]["Buggy Method Candidates"]: + for reason in candidate["Reasons"]: + if "M_frame Influenced Field" in reason: + for field in reason["M_frame Influenced Field"]: + framework_reference_fields.append(field) + framework_reference_fields = list(set(framework_reference_fields)) + + report_info = { + "apk_name": apk_name, + "android_version": android_version, + "regression_message": regression_message, + "exception_type": exception_type, + "crash_message": crash_message, + "stack_trace": stack_trace, + "stack_trace_short_api": stack_trace_short_api, + "framework_trace": framework_trace, + "framework_short_trace": framework_short_trace, + "application_trace": application_trace, + "application_short_trace": application_short_trace, + "framework_pass_chain": pass_chain, + "framework_entry_api": framework_trace[-1], + "framework_reference_fields": framework_reference_fields, + "candidates": candidates, + "ets_related_type": ets_related_type, + "related_variable_type": related_variable_type, + "related_condition_type": related_condition_type, + } + if write_report: + with open(f"{PRE_CHECK_RESULT_PATH(apk_name)}/report_info.json", "w") as f: + json.dump(report_info, f, indent=4) + return report_info + + +def init_report(report_name): + """ + Initialize report and result directory. + + Load report from directory which specified in configuration and create result directory. + + Copy the report to result directory as well. + """ + result_dir = PRE_CHECK_RESULT_PATH(report_name) + os.makedirs(result_dir, exist_ok=True) + + report_path = CRASH_REPORT_PATH(report_name) + with open(report_path, "r") as f: + report = json.load(f) + # Copy report to result directory + shutil.copy(report_path, f"{result_dir}/{report_name}.json") + return report, result_dir + + + +def check_snippet(report_info): + pass + + + +def pre_check(report_name, result_summary): + """ + Pre-check the report to see if it's valid for further processing. + + If the report is valid, it will create corresponding folder and update the result summary with the statistic information. + """ + from ExplanationGenerator.config import RUN_MODE + + def statistic(report_info, result_summary): + # Update statistic information + sum_ets_related_type = result_summary["statistic"]["ets_related_type"] + sum_related_variable_type = result_summary["statistic"]["related_variable_type"] + sum_related_condition_type = result_summary["statistic"]["related_condition_type"] + candidate_reason_type = result_summary["statistic"]["candidate_reason_type"] + + result_summary["statistic"]["sum"] += 1 + result_summary["statistic"]["sum_candidate"] += len(report_info["candidates"]) + sum_ets_related_type[report_info["ets_related_type"]] = sum_ets_related_type.get(report_info["ets_related_type"], 0) + 1 + sum_related_variable_type[report_info["related_variable_type"]] = sum_related_variable_type.get(report_info["related_variable_type"], 0) + 1 + sum_related_condition_type[report_info["related_condition_type"]] = sum_related_condition_type.get(report_info["related_condition_type"], 0) + 1 + for candidate in report_info["candidates"]: + reason = candidate["Reasons"][0]["Explanation Type"] + candidate_reason_type[reason] = candidate_reason_type.get(reason, 0) + 1 + result_summary["pending_list"][report_name]["candidate_reason_types"].append(reason) + + report, result_dir = init_report(report_name) + try: + report_info = fetch_information(report) + result_summary["pending_list"][report_name] = { + "candidate_reason_types": [] + } + statistic(report_info, result_summary) + logger.info(f"Pre-check successfully finished for {report_name}") + except (InvalidFrameworkMethodException, EmptyExceptionInfoException, InvalidStateException, InvalidApplicationMethodException) as e: + logger.error(e) + result_summary["failed_list"][report_name] = { + "status": "failed", + "reason": str(e), + } + if RUN_MODE == RunMode.ALL: + shutil.rmtree(result_dir) + +def main(): + from ExplanationGenerator.config import RUN_MODE, GPT_MODEL, SEED, TEMPERATURE, CRASH_REPORT_DIRECTORY + if not os.path.exists(RESULT_SUMMARY_PATH): + result_summary = {"run mode": RUN_MODE.name, "model": GPT_MODEL, "seed": SEED, "temperature": TEMPERATURE, "statistic":{"sum": 0, "sum_candidate": 0, "ets_related_type": {}, "related_variable_type": {}, "related_condition_type": {}, "candidate_reason_type": {}}, "reports": {}, "pending_list": {}, "failed_list": {}} + else: + result_summary = json.loads(open(RESULT_SUMMARY_PATH, 'r').read()) + + work_list = [] + if RUN_MODE == RunMode.ALL: + work_list = os.listdir(CRASH_REPORT_DIRECTORY) + elif RUN_MODE == RunMode.PENDING: + work_list = PENDING_REPORTS + + with logging_redirect_tqdm(): + for report_name in tqdm(work_list): + if any(report_name in collection for collection in [result_summary["pending_list"], result_summary["failed_list"]]): + continue + if not os.path.exists(CRASH_REPORT_PATH(report_name)): + logger.error(f"Report {report_name} not found") + continue + pre_check(report_name, result_summary) + with open(RESULT_SUMMARY_PATH, "w") as f: + json.dump(result_summary, f, indent=4) + + logger.info(f"Pre-check finished, {len(result_summary['pending_list'])} reports are waiting for processing") + + +if __name__ == "__main__": + main() diff --git a/ExplanationGenerator/prompt.py b/ExplanationGenerator/prompt.py new file mode 100644 index 00000000..f7db3018 --- /dev/null +++ b/ExplanationGenerator/prompt.py @@ -0,0 +1,667 @@ +EXTRACTOR_SYSTEM_PROMPT = """ +Your task is to extract the precondition constraint of the target exception in Java methods and convert them into constraint related to method parameters or class field. + +Following are rules for formatting the constraints, you should replace the content in the brackets with the corresponding information: + +You should describe parameter using this format: +Describe class field in this format: +Describe constraint in this format: [{Constrained method name}]: {Constraint} + +// Note: If the variable is a parameter of the method provided in `Code` or a field of the class to which it belongs, it must follow the specified format. +// Note: We will use static analysis tool to check the result, so the parameter type must match the method signature, and the field type must match its declaration. +// Note: You cannot use this format for other content(like local variable, method etc.) + +Please use the following format for the conversation: +Code: ``` +Java method code + +// Note: The class code is incomplete, other methods which is not related to this method will be omitted. +``` + +Exception: ``` +Target exception type name and a possible exception message + +// Note: The crash message is only used to determine which target exception it is when there are multiple exceptions of the same type, for example. +// Do not attempt to resolve exceptions that trigger the same crash message alone. +``` + +Analysis: ``` +Your detailed analysis process, should be step by step. + +// You can start at control flow analysis +``` + +Constraint: ``` +Constraint related to method parameters. The method will throw **target exception** when this constraint is met. + +// Note: The constraint should not include other method, You should describe the specific effects of other methods in the constraints. +// Note: The content of this section is the final result; this section should be independent, and you cannot reference content from other sections. +// Note: Constraints should only be extracted using information from the current method snippet, without incorporating your Android knowledge. Constraints should be as formalized as possible. +``` +""" + +EXTRACTOR_USER_EXAMPLE1 = """ +Code: ``` +public class SELinux { + + public static boolean restorecon(String pathname) throws NullPointerException { + if (pathname == null) { throw new NullPointerException("pathname is null"); } + return native_restorecon(pathname, 0); + } +} +``` + +Exception: ``` +Exception Type: NullPointerException +Exception Message: pathname is null +``` +""" + +EXTRACTOR_ASSISTANT_EXAMPLE1 = """ +Analysis: ``` +The provided Java method 'restorecon' from the class 'SELinux' throws a 'NullPointerException' if a specific condition is met. The condition for throwing the exception is checked at the beginning of the method. + +Here is the step-by-step analysis of the control flow: + +1. The method 'restorecon' takes a single parameter 'pathname' of type 'String'. + +2. The first operation in the method is a conditional check to see if 'pathname' is null. + +3. If 'pathname' is found to be null, the method explicitly throws a 'NullPointerException'. + +4. There is no message provided with the thrown 'NullPointerException', as the exception is thrown using the default constructor. + +From this analysis, we can deduce that the precondition constraint for throwing a 'NullPointerException' is that the 'pathname' parameter must be null. +``` + +Constraint: ``` +[restorecon]: is null. +``` +""" + +EXTRACTOR_USER_EXAMPLE2 = """ +Code: ``` +public final class Pools { + + public boolean release(T instance) { + if (isInPool(instance)) { + throw new IllegalStateException("Already in the pool!"); + } + if (mPoolSize < mPool.length) { + mPool[mPoolSize] = instance; + mPoolSize++; + return true; + } + return false; + } +} +``` + +Exception: ``` +Exception Type: IllegalStateException +Exception Message: Already in the pool! +``` +""" + +EXTRACTOR_ASSISTANT_EXAMPLE2 = """ +Analysis: ``` +The provided Java method 'release' from the final class 'Pools' throws an 'IllegalStateException' if the object 'instance' is already in the pool. To determine the precondition constraint for the target exception, we need to analyze the control flow that leads to the exception. + +Here is the step-by-step analysis: + +1. The method 'release' takes a single parameter 'instance' of generic type 'T'. + +2. The first operation in the method is a call to the method 'isInPool(instance)' which checks if the 'instance' is already in the pool. + +3. If 'isInPool(instance)' returns true, meaning the 'instance' is already in the pool, the method throws an 'IllegalStateException' with the message "Already in the pool!". + +4. There is no other information provided about the 'isInPool' method within the given code snippet. However, based on the exception message, we can infer that the 'isInPool' method is used to check for the presence of the 'instance' in the pool. + +From this analysis, we can deduce that the precondition constraint for throwing an 'IllegalStateException' with the message "Already in the pool!" is that the method 'isInPool' must return true for the given 'instance'. +``` + +Constraint: ``` +[release]: isInPool() returns true +``` +""" + +EXTRACTOR_USER_EXAMPLE3 = """ +Code: ``` +public class Instrumentation { + /*package*/ static void checkStartActivityResult(int res, Object intent) { + if (res >= ActivityManager.START_SUCCESS) { + return; + } + + switch (res) { + case ActivityManager.START_INTENT_NOT_RESOLVED: + case ActivityManager.START_CLASS_NOT_FOUND: + if (intent instanceof Intent && ((Intent)intent).getComponent() != null) + throw new ActivityNotFoundException( + "Unable to find explicit activity class " + + ((Intent)intent).getComponent().toShortString() + + "; have you declared this activity in your AndroidManifest.xml?"); + throw new ActivityNotFoundException( + "No Activity found to handle " + intent); + case ActivityManager.START_PERMISSION_DENIED: + throw new SecurityException("Not allowed to start activity " + + intent); + case ActivityManager.START_FORWARD_AND_REQUEST_CONFLICT: + throw new AndroidRuntimeException( + "FORWARD_RESULT_FLAG used while also requesting a result"); + case ActivityManager.START_NOT_ACTIVITY: + throw new IllegalArgumentException( + "PendingIntent is not an activity"); + default: + throw new AndroidRuntimeException("Unknown error code " + + res + " when starting " + intent); + } + } +} +``` + +Exception: ``` +Exception Type: ActivityNotFoundException +Exception Message: No Activity found to handle Intent { act=android.intent.action.VIEW dat=market://details?id=com.xtreme_.hairstyleswiththeirhands } +``` +""" + +EXTRACTOR_ASSISTANT_EXAMPLE3 = """ +Analysis: ``` +The provided Java method 'checkStartActivityResult' from the class 'Instrumentation' throws an 'ActivityNotFoundException' under certain conditions. To determine the precondition constraint for the target exception, we need to analyze the control flow that leads to the exception. + +The target exception 'ActivityNotFoundException' with the message "No Activity found to handle {intent}" is thrown in the following case: + +1. The method checks if the 'res' parameter is less than 'ActivityManager.START_SUCCESS'. If 'res' is greater than or equal to 'ActivityManager.START_SUCCESS', the method returns early and no exception is thrown. + +2. If the 'res' parameter is less than 'ActivityManager.START_SUCCESS', the method enters a switch statement based on the value of 'res'. + +3. The specific case for the target exception message is 'ActivityManager.START_INTENT_NOT_RESOLVED' or 'ActivityManager.START_CLASS_NOT_FOUND'. If the 'res' matches one of these cases, the method further checks if 'intent' is an instance of 'Intent' and if it has a non-null component via 'getComponent()'. + +4. If 'intent' is not an instance of 'Intent' or 'getComponent()' returns null, the 'ActivityNotFoundException' is thrown with the message "No Activity found to handle {intent}". This is the case that matches our target exception message. + +Therefore, the precondition constraint for the target exception is that 'res' must be one of the specific error codes ('ActivityManager.START_INTENT_NOT_RESOLVED' or 'ActivityManager.START_CLASS_NOT_FOUND') and 'intent' should not be an instance of 'Intent' or 'intent' as an 'Intent' should have a null component. +``` + +Constraint: ``` +[checkStartActivityResult]: is either ActivityManager.START_INTENT_NOT_RESOLVED or ActivityManager.START_CLASS_NOT_FOUND, and ( is not an instance of Intent or as Intent has a null getComponent() result). +``` +""" + +EXTRACTOR_INIT_PROMPT = [] +EXTRACTOR_INIT_PROMPT.append({"role": "system", "content": EXTRACTOR_SYSTEM_PROMPT}) +EXTRACTOR_INIT_PROMPT.append({"role": "user", "content": EXTRACTOR_USER_EXAMPLE1}) +EXTRACTOR_INIT_PROMPT.append({"role": "assistant", "content": EXTRACTOR_ASSISTANT_EXAMPLE1}) +EXTRACTOR_INIT_PROMPT.append({"role": "user", "content": EXTRACTOR_USER_EXAMPLE2}) +EXTRACTOR_INIT_PROMPT.append({"role": "assistant", "content": EXTRACTOR_ASSISTANT_EXAMPLE2}) +EXTRACTOR_INIT_PROMPT.append({"role": "user", "content": EXTRACTOR_USER_EXAMPLE3}) +EXTRACTOR_INIT_PROMPT.append({"role": "assistant", "content": EXTRACTOR_ASSISTANT_EXAMPLE3}) + +EXTRACTOR_USER_PROMPT = ( + lambda code, exception_name, crash_message: f""" +Code: ``` +{code} +``` + +Exception: ``` +Exception Name: {exception_name} +Exception Message: {crash_message} +``` +""" +) + +INFERRER_SYSTEM_PROMPT = """ +You are an Android expert that assist with inferring the triggering constraint of the target exception in Java methods + +We will provide you with the Java method code which may trigger an exception. We will also provide a constraint of method which is invoked in code. A exception will be triggered when this constraint is met. + +Your task is to convert the constraint related with original method into constraint related to current method parameters and class field. + +All the code comes from the Android framework, for methods we have not provided, you can assume they are the Android framework methods that you are familiar with. + +Following are rules for formatting the constraints, you should replace the content in the brackets with the corresponding information: + +You should describe parameter using this format: +Describe class field in this format: +Describe constraint in this format: [{Constrained method name}]: {Constraint} + +// Note: If the variable is a parameter of the method provided in `Code` or a field of the class to which it belongs, it must follow the specified format. +// Note: We will use static analysis tool to check the result, so the parameter type must match the method signature, and the field type must match its declaration. +// Note: You cannot use this format for other content(like local variable, method etc.) + +Please answer in the following format: +Code: ``` +Java method code +``` + +Original_Constraint: ``` +The constrained method and constraint content. A exception will be triggered when this constraint is met. +``` + +Analysis: ``` +Your detailed analysis process, should be step by step. + +// You can start at control flow analysis +``` + +Constraint: ``` +Conditions related to current method parameters or class field. Original constraint will be met if this condition is met. + +// Note: The constraint should not include other method, You should describe the specific effects of other methods in the constraints. +// Note: The content of this section is the final result; this section should be independent, and you cannot reference content from other sections. +``` +""" + +INFERRER_USER_EXAMPLE1 = """ +Code: ``` +public abstract class AbstractWindowedCursor extends AbstractCursor { + + protected void checkPosition() { + super.checkPosition(); + + if (mWindow == null) { + throw new StaleDataException("Attempting to access a closed CursorWindow." + + "Most probable cause: cursor is deactivated prior to calling this method."); + } + } + + +} +``` + +Original_Constraint: ``` +[checkPosition]: == -1 +``` +""" + +INFERRER_ASSISTANT_EXAMPLE1 = """ +Analysis: ``` +The method `checkPosition` in the `AbstractWindowedCursor` class calls `super.checkPosition()`, which is a method from its superclass `AbstractCursor`. + +The constraint on `super.checkPosition()` states that an exception will be triggered if the field `mPos` in the `AbstractCursor` class is equal to -1. + +In the context of `checkPosition` in the `AbstractWindowedCursor` class, there is no change to the value of `mPos` before or after the call to `super.checkPosition()`. Therefore, the constraint from `super.checkPosition()` applies directly to the `checkPosition` method in the `AbstractWindowedCursor` class without any modification. + +Since `mWindow` is checked for null after the call to `super.checkPosition()`, it does not affect the constraint related to `mPos`. +``` + +Constraint: ``` +[checkPosition]: == -1 +``` +""" + +INFERRER_USER_EXAMPLE2 = """ +Code: ``` +public class Resources { + + public String getString(int id) throws NotFoundException { + CharSequence res = getText(id); + if (res != null) { + return res.toString(); + } + throw new NotFoundException("String resource ID #0x" + + Integer.toHexString(id)); + } + +} +``` + +Original_Constraint: ``` +[getText]: correspond to a invalid resource ID that not exists in . +``` +""" + +INFERRER_ASSISTANT_EXAMPLE2 = """ +Analysis: ``` +The method `getString` in the `Resources` class calls the method `getText` with the parameter `id`. + +The constraint on `getText` states that an exception will be triggered if the parameter `id` corresponds to an invalid resource ID that does not exist in the `AssetManager` instance referenced by the field `mAssets` in the `Resources` class. + +In the context of `getString`, the variable `id` passed to `getText` is the same `id` received by `getString`. There are no transformations or conditions applied to `id` before calling `getText`. + +Therefore, the constraint from `getText` can be directly translated to the parameters of `getString` with respect to the `id` variable. +``` + +Constraint: ``` +[getString]: corresponds to an invalid resource ID that does not exist in . +``` +""" + +INFERRER_USER_EXAMPLE3 = """ +Code: ``` +public class Instrumentation { + + public ActivityResult execStartActivity( + Context who, IBinder contextThread, IBinder token, Activity target, + Intent intent, int requestCode, Bundle options) { + IApplicationThread whoThread = (IApplicationThread) contextThread; + if (mActivityMonitors != null) { + synchronized (mSync) { + final int N = mActivityMonitors.size(); + for (int i=0; i= 0 ? am.getResult() : null; + } + break; + } + } + } + } + try { + intent.migrateExtraStreamToClipData(); + intent.prepareToLeaveProcess(); + int result = ActivityManagerNative.getDefault() + .startActivity(whoThread, who.getBasePackageName(), intent, + intent.resolveTypeIfNeeded(who.getContentResolver()), + token, target != null ? target.mEmbeddedID : null, + requestCode, 0, null, null, options); + checkStartActivityResult(result, intent); + } catch (RemoteException e) { + } + return null; + } + +} +``` + +Original_Constraint: ``` +[checkStartActivityResult]: ( is ActivityManager.START_INTENT_NOT_RESOLVED OR is ActivityManager.START_CLASS_NOT_FOUND) AND (NOT ( is an instance of Intent with a non-null component)) +``` +""" + +INFERRER_ASSISTANT_EXAMPLE3 = """ +Analysis: ``` +The method `execStartActivity` in the `Instrumentation` class calls the method `checkStartActivityResult` with the parameters `result` and `intent`. + +The constraint on `checkStartActivityResult` states that an exception will be triggered if the first parameter (`res`) is either `ActivityManager.START_INTENT_NOT_RESOLVED` or `ActivityManager.START_CLASS_NOT_FOUND`, and the second parameter (`intent`) is not an instance of `Intent` or if it is an instance of `Intent`, its `getComponent()` method returns null. + +In the context of `execStartActivity`, the `result` variable that is passed as the first parameter to `checkStartActivityResult` is the return value of the `startActivity` method called on `ActivityManagerNative.getDefault()`. + +The `intent` variable passed as the second parameter to `checkStartActivityResult` is the same `intent` that is received by `execStartActivity`. + +There are no transformations or conditions applied to `result` or `intent` before calling `checkStartActivityResult`, except for the calls to `intent.migrateExtraStreamToClipData()` and `intent.prepareToLeaveProcess()`, which do not affect the type or component of the `intent`. + +Therefore, the constraint from `checkStartActivityResult` can be directly translated to the parameters of `execStartActivity` with respect to the `result` and `intent` variables. +``` + +Constraint: ``` +[execStartActivity]: is not an instance of Intent or as Intent has a null getComponent() result, and the return value of ActivityManagerNative.getDefault().startActivity(...) is either or . +``` +""" + +INFERRER_INIT_PROMPT = [] +INFERRER_INIT_PROMPT.append({"role": "system", "content": INFERRER_SYSTEM_PROMPT}) +INFERRER_INIT_PROMPT.append({"role": "user", "content": INFERRER_USER_EXAMPLE1}) +INFERRER_INIT_PROMPT.append( + {"role": "assistant", "content": INFERRER_ASSISTANT_EXAMPLE1} +) + +INFERRER_USER_PROMPT = ( + lambda code, constraint: f""" +Code: ``` +{code} +``` + +Original_Constraint: ``` +{constraint} +``` +""" +) + +EXPLAINER_SYSTEM_PROMPT = """ +You are an Android expert that assist with explaining the crash of Android application. + +For the crash, we will provide you with the following information: +1. The crash information which include crash message and stack trace +2. A constraint that applies to specific Android API, triggering this crash when this constraint is met. + +Then, we will provide a group of suspicious method with following information: +1. The code snippet of a suspicious method which is detected by our static analysis tool and may cause the crash +2. The reason why the suspicious method is detected + +Your task is to help the developers explain how these method caused the crash using information provided and your Android professional knowledge. + +Please use the following format for the conversation. +Crash_Information: ``` +Crash message and stack trace +``` + +Constraint: ``` +The constraint that applies to specific Android API and trigger this crash +``` + +Suspicious_Method: ``` +The code snippet of suspicious method +``` + +Reason: ``` +The reason why the suspicious method is detected +``` + +Analysis: ``` +Your detailed analysis process. + +This part will be hidden from developers. Do not quote content from this part. +``` + +Android_Knowledge: ``` +You need to assume that the developer has no relevant knowledge of Android frameworks. If your explanation requires any knowledge of Android frameworks, please briefly describe it in this section. + +This part is optional. +``` + +Explanation: ``` +A clear and concise explanation for developers in no more than 3 sentences. +``` +""" + +EXPLAINER_INIT_PROMPT = [] +EXPLAINER_INIT_PROMPT.append({"role": "system", "content": EXPLAINER_SYSTEM_PROMPT}) + +EXPLAINER_CRASH_PROMPT = ( + lambda crash_info, constraint: f""" +Crash_Information: ``` +{crash_info} +``` + +Constraint: ``` +{constraint} +``` +""" +) + +EXPLAINER_USER_PROMPT = ( + lambda method_code, reason: f""" +Suspicious_Method: ``` +{method_code} +``` + +Reason: ``` +{reason} +``` +""" +) + +KEY_VAR_TERMINAL_PROMPT = lambda framework_entry_api, call_chain: f""" +Our static analysis tool detect that some buggy parameter value is passed to `{framework_entry_api}` by call chain {call_chain}. + +The buggy parameter meet the crash constraint which was described in `Constraint` part +""" + +KEY_VAR_NON_TERMINAL_AFTER_TERMINAL_PROMPT = lambda entry_api, terminal_api, call_chain: f""" +Our static analysis tool detect that the method invoke `{terminal_api}` by call chain {call_chain} + +`{terminal_api}` method pass buggy parameter to `{entry_api}` + +The buggy parameter meet the crash constraint which was described in `Constraint` part +""" + +KEY_VAR_NON_TERMINAL_BEFORE_TERMINAL_PROMPT = lambda entry_api, call_chain: f""" +Our static analysis detect that some parameter meet the crash constraints described in the 'Constraint' section + +this parameter is passed to `{entry_api}` through the call chain `{call_chain}`. +""" + +KEY_VAR_MODIFIED_FIELD_PROMPT = lambda field, passed_method: f""" +Our static analysis detect that the method change the value of field `{field}` + +The field was passed to the method `{passed_method}` and meet the crash constraint, resulting in the crash. +""" + +KEY_API_INVOKED_PROMPT = lambda key_api, field, effect: f""" +We detect that the method `{key_api}` is invoked in the method. + +The method can affect the `{field}` field in Android Framework so that cause constraint violation. + +The method has following effect on the field: +{effect} +""" + +KEY_API_EXECUTED_PROMPT = lambda :f""" +This method was detected because it was executed during the process of the application crashing. +""" + +# Extract KeyAPI role + +KEY_API_ROLE_SUMMARIZER_SYSTEM_PROMPT = """ +You are an Android expert that assist with summarizing the effect of the method on specific fields in Android Framework. + +You will be provided with the necessary information with following format: + +Target Method: ``` +The full signature of target method. +``` + +Target Field: ``` +The full signature of target field. +``` + +Call Chain: ``` +The call chain from target method to target field. +``` + +Code Snippet: ``` +The code snippets of methods in call chain. +``` + +Your task is to summarize the effect of the method on target fields in Android Framework using following format: + +Analysis: ``` +Your detailed analysis process, should be step by step. + +Only you can see this part. Do not quote content from this part. +``` + +Effect: ``` +The effect of the method on target fields. +``` +""" + +KEY_API_ROLE_SUMMARIZER_USER_PROMPT = ( + lambda target_method, target_field, call_chain, code_snippet: f""" +Target Method: ``` +{target_method} +``` + +Target Field: ``` +{target_field} +``` + +Call Chain: ``` +{call_chain} +``` + +Code Snippet: ``` +{code_snippet} +``` +""" +) + +KEY_API_ROLE_SUMMARIZER_INIT_PROMPT = [ + {"role": "system", "content": KEY_API_ROLE_SUMMARIZER_SYSTEM_PROMPT} +] + +EXTRACTOR_SYSTEM_PLAIN_PROMPT = """ +Your task is to extract the precondition constraint of the target exception in Java methods and convert them into constraint related to method parameters or class field. + +Please use the following format for the conversation: +Code: ``` +Java method code +``` + +Exception: ``` +Target exception type name and a possible exception message +``` + +Constraint: ``` +Constraint related to method parameters. The method will throw **target exception** when this constraint is met. + +// Note: Constraints should only be extracted using information from the current method snippet, without incorporating your Android knowledge. Constraints should be as formalized as possible. +``` +""" +EXTRACTOR_PLAIN_INIT_PROMPT = [] +EXTRACTOR_PLAIN_INIT_PROMPT.append({"role": "system", "content": EXTRACTOR_SYSTEM_PLAIN_PROMPT}) + +plain_explanation_prompt = """ +You are an Computer expert that assist with explaining the crash of Android application. + +For the crash, we will provide you with the following information: +1. The crash information which include crash message and stack trace + +Then, we will provide a group of suspicious method with following information: +1. The code snippet of a suspicious method + +Your task is to help the developers explain how these method caused the crash using information provided. + +Please use the following format for the conversation. +Crash_Information: ``` +Crash message and stack trace +``` + +Suspicious_Method: ``` +The code snippet of suspicious method +``` + +Explanation: ``` +A clear and concise explanation for developers in no more than 3 sentences. +``` +""" + +GLOBAL_SUMMARY_SYSTEM_PROMPT = """ +You are an Computer expert that assist with explaining the crash of Android application. + +For the crash, we will provide you with the following information: +1. The crash information which include crash message and stack trace +2. A group of candidate method which may cause the crash, the reason why they are detected and the explanation of the candidate if exists. + +Your task is to help the developers summary these all candidate method and explain how these method caused the crash using information provided. + +Please use the following format for the conversation. + +Summary: ``` +Summary of how all these candidates caused the crash. +``` +""" + +GLOBAL_SUMMARY_USER_CRASH_INFORMATION_PROMPT = ( + lambda exception_name, crash_message, stack_trace: f""" +Crash Information: ``` +Exception Name: {exception_name} +Exception Message: {crash_message} +Stack Trace: {stack_trace} +``` +""" +) + +GLOBAL_SUMMARY_USER_CANDIDATE_PROMPT = ( + lambda num, name, reason, explanation: f""" +Candidate {num}: ``` +Method Name: {name} +Reason: {reason} +{f"Explanation: {explanation}" if explanation else ""} +``` +""" +) \ No newline at end of file diff --git a/ExplanationGenerator/report_fill.py b/ExplanationGenerator/report_fill.py new file mode 100644 index 00000000..151233d3 --- /dev/null +++ b/ExplanationGenerator/report_fill.py @@ -0,0 +1,124 @@ +from panflute import run_filter, Header, Doc, Str, stringify, OrderedList, ListItem, Para, RawBlock, convert_text, BulletList +from pathlib import Path +import json +from copy import deepcopy +import re +import os + +FINAL_REPORT_PATH = None +REPORT_INFO_PATH = None +pattern = re.compile(r"\$([A-Za-z_]+)") + +def markdown_list(l): + return OrderedList(*[ListItem(Para(Str(item))) for item in l]) + +def markdown_bullet_list(l): + return BulletList(*[ListItem(Para(Str(item))) for item in l]) + +def sa_explanation(report_name, method_name): + with open(REPORT_INFO_PATH(report_name), mode="r") as f: + report_info = json.load(f) + for candidate in report_info["candidates"]: + candidate_method_name = candidate["Candidate Name"].split(".")[-1] + if candidate_method_name == method_name: + return candidate["Reasons"][0]["Explanation Info"] + raise ValueError(f"Method {method_name} not found in report {report_name}") + + +class Report: + def __init__(self, report_name, report_type) -> None: + self.report_name = report_name + with open(REPORT_INFO_PATH(report_name), mode="r") as f: + self.report_info = json.load(f) + self.data = { + "Exception_Type": Para(Str(self.report_info["exception_type"])), + "Exception_Message": Para(Str(self.report_info["crash_message"])), + "Stack_Trace": markdown_list(self.report_info["stack_trace_short_api"]), + } + survey_path: Path = FINAL_REPORT_PATH(report_name) + self.candidate_num = sum(os.path.isdir(survey_path / i) for i in os.listdir(survey_path)) + self.ordered_data = { + "Candidate_Name": [], + "Method_Code": [], + "Method_Explanation": [] + } + for index in range(self.candidate_num): + self.ordered_data["Candidate_Name"].append(Header(Str(f"Candidate {index + 1}"), level=2)) + with open(FINAL_REPORT_PATH(report_name) / str(index) / "snippet.txt", mode="r") as f: + snippet = f.read() + self.ordered_data["Method_Code"].append(RawBlock(f"```java\n{snippet}\n```", format="markdown")) + + if report_type == "llm": + with open(FINAL_REPORT_PATH(report_name) / str(index) / "explanation.txt", mode="r") as f: + explanation = f.read() + elif report_type == "sa": + match = re.match("\s*(\S+\s+)*(\S+)\(.*\).*\{", snippet.split("\n")[0]) + if match is None: + raise ValueError(f"Method name not found in snippet {snippet}") + method_name = match.group(2) + explanation = sa_explanation(report_name, method_name) + + self.ordered_data["Method_Explanation"].append(RawBlock(explanation, format="markdown")) + + + + def get_data(self, key): + if key in self.data: + return self.data[key] + if key in self.ordered_data: + return self.ordered_data[key].pop(0) + return None + +def find_candidates(report_info): + candidates = [] + for candidate in report_info["candidates"]: + candidate_name = candidate["Candidate Name"] + candidates.append(candidate_name) + return markdown_bullet_list(candidates) + +def init_doc(doc: Doc): + report_name = doc.get_metadata("report_name", None) + if report_name is None: + raise ValueError("report_name must be provided in metadata") + report_type = doc.get_metadata("report_type", None) + if report_type is None or report_type not in ["llm", "sa"]: + raise ValueError("report_type must be provided and must be either 'llm' or 'sa'") + report_dir = doc.get_metadata("report_dir", None) + global FINAL_REPORT_PATH, REPORT_INFO_PATH + FINAL_REPORT_PATH = lambda name: Path(report_dir) / name + REPORT_INFO_PATH = lambda name: FINAL_REPORT_PATH(name) / "report_info.json" + + doc.report = Report(report_name, report_type) + candidate = doc.content[8:] + for _ in range(doc.report.candidate_num - 1): + doc.content.extend(deepcopy(candidate)) + global_summary_path: Path = Path("RQ/global_summary") / report_name / "global_summary.txt" + c = [] + c.extend([Header(Str("Candidate List"), level=2), find_candidates(doc.report.report_info)]) + if global_summary_path.exists() and report_type == "llm": + global_summary = global_summary_path.read_text() + c.append(Header(Str("Global Explanation"), level=2)) + c.extend(convert_text(global_summary)) + doc.content[8:8] = c + doc.content.append(Header(Str("Note"), level=1)) + doc.content.extend(convert_text(r"The [Codes](./Codes/) folder contains the complete Java code for the relevant candidate methods, the [LLM_Appendix](./LLM_Appendix/) folder contains supplementary information related to the LLM report, and the [StaticAnalysis_Appendix](./StaticAnalysis_Appendix/) folder contains supplementary information related to the static analysis report.")) + + +def end_doc(doc: Doc): + del doc.report + del doc.metadata["report_name"] + + +def replace_data(elem, doc: Doc): + if type(elem)==Para or type(elem)==Header: + m = pattern.match(stringify(elem)) + if m: + key = m.group(1) + return doc.report.get_data(key) + + +def main(doc=None): + return run_filter(replace_data, doc=doc, prepare=init_doc, finalize=end_doc) + +if __name__ == "__main__": + main() diff --git a/ExplanationGenerator/report_generator.py b/ExplanationGenerator/report_generator.py new file mode 100644 index 00000000..ab44f935 --- /dev/null +++ b/ExplanationGenerator/report_generator.py @@ -0,0 +1,119 @@ +from ExplanationGenerator.utils.parser import parse_message +from ExplanationGenerator.config import logger +from ExplanationGenerator.config import EXPLANATION_RESULT_PATH, EXPLANATION_RESULT_DIRECTORY, FINAL_REPORT_PATH, APK_CODE_PATH, REPORT_TEMPLATE_PATH, ROOT_PATH +from ExplanationGenerator import survery_global_summary_generator +from pathlib import Path +import shutil +import json +import subprocess + + +def write_full_class_file(report_name, snippet, survey_path: Path): + """ + write related code file + """ + import re + class_file_path = survey_path + class_file_path.mkdir(parents=True, exist_ok=True) + signatures = re.findall(r"(public|private)(\s\w+)+\s(\w+)\(", snippet) + if len(signatures) == 0: + return + visibility, other, method_name = signatures[0] + + report_info_path = Path(EXPLANATION_RESULT_PATH(report_name)) / "report_info.json" + report_info = json.load(report_info_path.open()) + for candidate in report_info["candidates"]: + if candidate["Candidate Name"].split(".")[-1] == method_name: + full_name: str = candidate["Candidate Name"] + names = full_name.split(".") + path = APK_CODE_PATH(report_name) + for name in names: + if "$" in name: + name = name.split("$")[0] + path = path / name + path_with_java = path.with_suffix(".java") + if path_with_java.exists() and path_with_java.is_file(): + shutil.copy2(path_with_java, class_file_path) + return + logger.error(f"Cannot find the class file, path: {path}") + logger.error(f"Cannot find the full class file for {method_name} in {report_name}") + + +def generate_global_explanation(): + pass + + +def pairwise(iterable): + "s -> (s0, s1), (s2, s3), (s4, s5), ..." + if len(iterable) % 2 != 0: + raise ValueError("Length of iterable must be even") + a = iter(iterable) + return zip(a, a) + +def main(): + for dir in EXPLANATION_RESULT_DIRECTORY.iterdir(): + if dir.is_file(): + continue + report_name = dir.name + result_path = dir + survey_path = FINAL_REPORT_PATH(report_name) + survey_path.mkdir(parents=True, exist_ok=True) + + report_info = result_path / "report_info.json" + shutil.copy2(report_info, survey_path) + + candidate_conversation_path = result_path / "explanation" / "candidate_conversation.json" + with open(candidate_conversation_path, mode='r') as f: + candidate_conversation = json.load(f) + candidate_conversation = candidate_conversation[2:] + if len(candidate_conversation) % 2 != 0: + raise ValueError("Length of candidate_conversation must be even") + candidate_num = len(candidate_conversation) // 2 + if candidate_num == 0: + shutil.rmtree(survey_path) + logger.error(f"No candidate explanation found in {report_name}") + continue + for index, (user_message, assistant_message) in enumerate(pairwise(candidate_conversation)): + user_parsed = parse_message(user_message['content']) + assistant_parsed = parse_message(assistant_message['content']) + candidate_path = survey_path / str(index) + candidate_path.mkdir(parents=True, exist_ok=True) + + snippet = user_parsed["Suspicious_Method"] + write_full_class_file(report_name, snippet, candidate_path) + explanation = assistant_parsed["Explanation"] + with open(candidate_path / "snippet.txt", mode='w') as f: + f.write(snippet) + with open(candidate_path / "explanation.txt", mode='w') as f: + f.write(explanation) + + constraint_path = result_path / "constraint" / "constraint.txt" + survey_constraint_path = survey_path / "constraint.txt" + shutil.copy2(constraint_path, survey_constraint_path) + + if (survey_path / "global_summary.txt").exists(): + logger.info(f"Global summary for {report_name} already exists, skipping") + survery_global_summary_generator.main(report_name) + + command = [ + "pandoc", + "-s", + "-f", + "markdown", + "--wrap=none", + "-t", + "commonmark", + f"--metadata=report_name:{report_name}", + "--metadata=report_type:llm", + f"--metadata=report_dir:{survey_path.parent}", + "--filter", + ROOT_PATH / "report_fill.py", + "-o", + survey_path / "report.md", + REPORT_TEMPLATE_PATH + ] + + subprocess.run(command) + +if __name__ == "__main__": + main() diff --git a/ExplanationGenerator/report_template.md b/ExplanationGenerator/report_template.md new file mode 100644 index 00000000..eff845bc --- /dev/null +++ b/ExplanationGenerator/report_template.md @@ -0,0 +1,25 @@ +# Crash Information + +## Exception Type + +$Exception_Type + +## Exception Message + +$Exception_Message + +## Stack Trace + +$Stack_Trace + +# Locating Candidates + +## $Candidate_Name + +### Method Code + +$Method_Code + +### Explanation + +$Method_Explanation diff --git a/ExplanationGenerator/run.py b/ExplanationGenerator/run.py new file mode 100644 index 00000000..6e222979 --- /dev/null +++ b/ExplanationGenerator/run.py @@ -0,0 +1,72 @@ +import os +import shutil +from ExplanationGenerator.config import PRE_CHECK_RESULT_PATH, RESULT_SUMMARY_PATH, EXPLANATION_RESULT_PATH, logger, add_file_handler, EXPLANATION_RESULT_DIRECTORY +from ExplanationGenerator.extractor import query_framework_constraint +from ExplanationGenerator.explainer import explain_candidates +from ExplanationGenerator.exceptions import ConstraintCheckError, TooMuchCandidateError +import json + +def result_dirs(report_name): + result_dir = EXPLANATION_RESULT_PATH(report_name) + os.makedirs(result_dir, exist_ok=True) + shutil.copy(f"{PRE_CHECK_RESULT_PATH(report_name)}/report_info.json", f"{result_dir}/report_info.json") + constraint_result_dir = f"{result_dir}/constraint" + os.makedirs(constraint_result_dir, exist_ok=True) + explanation_result_dir = f"{result_dir}/explanation" + os.makedirs(explanation_result_dir, exist_ok=True) + report_info_path = f"{result_dir}/report_info.json" + return result_dir, constraint_result_dir, explanation_result_dir, report_info_path + +def main(): + add_file_handler(EXPLANATION_RESULT_DIRECTORY / "explanation.log") + if not os.path.exists(RESULT_SUMMARY_PATH): + logger.error(f"Result summary file {RESULT_SUMMARY_PATH} does not exist") + exit(1) + else: + result_summary = json.loads(open(RESULT_SUMMARY_PATH, 'r').read()) + for report_name in result_summary["pending_list"].keys(): + if report_name in result_summary["reports"]: + continue + + result_dir, constraint_result_dir, explanation_result_dir, report_info_path = result_dirs(report_name) + with open(report_info_path, "r") as f: + report_info = json.load(f) + logger.info(f"Start processing {report_name}") + + try: + logger.info("Querying framework constraint") + constraint, bypass_count, query_count, cache_hit_count = query_framework_constraint(report_info, constraint_result_dir) + logger.info(f"Successfully query constraint! Constraint: {constraint}") + explain_candidates(report_info, constraint, explanation_result_dir) + logger.info(f"Successfully explained candidates!") + result_summary["reports"][report_name] = { + "status": "finished", + "query_count": query_count, + "bypass_count": bypass_count, + "cache_hit_count": cache_hit_count + } + result_summary["statistic"]["query_count"] = result_summary["statistic"].get("query_count", 0) + query_count + result_summary["statistic"]["bypass_count"] = result_summary["statistic"].get("bypass_count", 0) + bypass_count + result_summary["statistic"]["cache_hit_count"] = result_summary["statistic"].get("cache_hit_count", 0) + cache_hit_count + except ConstraintCheckError as e: + logger.error(e) + result_summary["reports"][report_name] = { + "status": "failed", + "error": str(e) + } + shutil.rmtree(result_dir) + except TooMuchCandidateError as e: + logger.error(e) + result_summary["reports"][report_name] = { + "status": "failed", + "error": str(e) + } + shutil.rmtree(result_dir) + + with open(RESULT_SUMMARY_PATH, "w") as f: + json.dump(result_summary, f, indent=4) + logger.info(f"Finished processing {report_name}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ExplanationGenerator/survery_global_summary_generator.py b/ExplanationGenerator/survery_global_summary_generator.py new file mode 100644 index 00000000..12871cb1 --- /dev/null +++ b/ExplanationGenerator/survery_global_summary_generator.py @@ -0,0 +1,152 @@ +# 生成问卷的整体解释 +from ExplanationGenerator.config import EXPLANATION_RESULT_PATH, logger, FINAL_REPORT_DIRECTORY, FINAL_REPORT_PATH +from pathlib import Path +import json + +# SURVEY_ROOT = Path("RQ/survey") +# SURVEY_PATH: Path = lambda report_name: SURVEY_ROOT / report_name +# PENDING_SURVEY_PATH = Path("RQ/pending_report.json") +SUMMARY_FILE_NAME = "global_summary.txt" +SUMMARY_MESSAGE_FILE_NAME = "global_summary_message.json" + +def candidate_reason(candidate, key_api_effects, report_info): + from ExplanationGenerator.my_types import ReasonType + from ExplanationGenerator.prompt import KEY_VAR_NON_TERMINAL_AFTER_TERMINAL_PROMPT, KEY_VAR_TERMINAL_PROMPT, KEY_API_INVOKED_PROMPT, KEY_API_EXECUTED_PROMPT, KEY_VAR_MODIFIED_FIELD_PROMPT + from ExplanationGenerator.explainer import summarize_key_api_effect, _extract_var4_field_and_passed_method, find_terminal_api + + terminal_api = find_terminal_api(report_info["candidates"]) + reason_type = candidate['Reasons'][0]['Explanation Type'] + framework_entry_api = report_info["framework_entry_api"] + reason_type = candidate['Reasons'][0]['Explanation Type'] + if reason_type == ReasonType.KEY_VAR_TERMINAL.value: + call_methods = candidate['Reasons'][0]['M_app Trace to Crash API'] + call_chain_to_entry = f"`{call_methods[0]}`" + for method in call_methods[1:]: + call_chain_to_entry += f" -> `{method}`" + terminal_api = call_methods[0] + + reason = KEY_VAR_TERMINAL_PROMPT(framework_entry_api, call_chain_to_entry) + elif reason_type == ReasonType.KEY_VAR_NON_TERMINAL.value: + if terminal_api is not None: + call_methods = candidate['Reasons'][0]['M_app Trace to Crash API'] + call_chain_to_terminal = f"`{call_methods[0]}`" + for method in call_methods[1:]: + call_chain_to_terminal += f" -> `{method}`" + if method == terminal_api: + break + reason = KEY_VAR_NON_TERMINAL_AFTER_TERMINAL_PROMPT(framework_entry_api, terminal_api, call_chain_to_terminal) + else: + raise NotImplementedError("Non-terminal key variable explanation is not implemented yet") + elif reason_type == ReasonType.KEY_API_INVOKED.value: + key_api = candidate['Reasons'][0]['M_frame Triggered KeyAPI'] + key_field = candidate['Reasons'][0]['M_frame Influenced Field'] + if not key_api in key_api_effects: + effect = summarize_key_api_effect(candidate, report_info) + key_api_effects[key_api] = effect + reason = KEY_API_INVOKED_PROMPT(key_api, key_field, key_api_effects[key_api]) + elif reason_type == ReasonType.KEY_API_EXECUTED.value: + reason = KEY_API_EXECUTED_PROMPT() + elif reason_type == ReasonType.KEY_VAR_MODIFIED_FIELD.value: + _, api, field = _extract_var4_field_and_passed_method(candidate['Reasons'][0]['Explanation Info']) + reason = KEY_VAR_MODIFIED_FIELD_PROMPT(field, api) + else: + raise NotImplementedError(f"Unknown explanation type {reason_type}") + + return reason + +def find_explanation(report_name, candidate_name): + explanation_path = Path(EXPLANATION_RESULT_PATH(report_name)) / "explanation" / "explanation.json" + explanations = json.load(explanation_path.open()) + for explanation in explanations: + if explanation["Candidate_Name"] == candidate_name: + return explanation + return None + +def generate_global_summary(report_info, key_api_effects): + from ExplanationGenerator.prompt import GLOBAL_SUMMARY_SYSTEM_PROMPT, GLOBAL_SUMMARY_USER_CANDIDATE_PROMPT, GLOBAL_SUMMARY_USER_CRASH_INFORMATION_PROMPT + from ExplanationGenerator.utils.llm import send_message + from ExplanationGenerator.utils.parser import parse_message + + report_name = report_info["apk_name"] + survey_path = FINAL_REPORT_PATH(report_name) + messages = [] + messages.append({"role": "system", "content": GLOBAL_SUMMARY_SYSTEM_PROMPT}) + crash_prompt = GLOBAL_SUMMARY_USER_CRASH_INFORMATION_PROMPT(report_info["exception_type"], report_info["crash_message"], report_info["stack_trace_short_api"]) + messages.append({"role": "user", "content": crash_prompt}) + for index, candidate in enumerate(report_info["candidates"]): + candidate_name = candidate["Candidate Name"] + reason = candidate_reason(candidate, key_api_effects, report_info) + explanation = find_explanation(report_info["apk_name"], candidate_name) + candidate_prompt = GLOBAL_SUMMARY_USER_CANDIDATE_PROMPT(index, candidate_name, reason, explanation) + messages.append({"role": "user", "content": candidate_prompt}) + messages, system_fingerprint = send_message(messages) + json.dump(messages, open(survey_path / SUMMARY_MESSAGE_FILE_NAME, "w"), indent=4) + parsed = parse_message(messages[-1]["content"]) + return parsed["Summary"] + + +def dict2list(d): + result = [] + def _dict2list(d): + for k, v in d.items(): + if isinstance(v, dict): + _dict2list(v) + elif isinstance(v, list): + result.extend(v) + else: + raise ValueError("Value in dict should be either dict or list") + + _dict2list(d) + return result + + +def generate(report_name, report_info, key_api_effects): + survey_path = FINAL_REPORT_PATH(report_name) + global_summary_path = survey_path / SUMMARY_FILE_NAME + if global_summary_path.exists(): + logger.info(f"Global summary for {report_name} already exists, skipping") + return + + if len(report_info["candidates"]) == 1: + logger.info(f"Only one candidate in {report_name}, skipping") + return + + attempt = 0 + MAX_ATTEMPT = 3 + success = False + logger.info(f"Generating global summary for {report_name}") + while attempt < MAX_ATTEMPT: + try: + global_summary = generate_global_summary(report_info, key_api_effects) + success = True + break + except KeyError: + logger.error(f"Error in generating global summary for {report_name}, retrying") + attempt += 1 + if not success: + logger.error(f"Failed to generate global summary for {report_name}") + return + logger.info(f"Generated global summary for {report_name}, {len(report_info['candidates'])} candidates") + with open(survey_path / SUMMARY_FILE_NAME, mode='w') as f: + f.write(global_summary) + +def main(report_name): + # with open(PENDING_SURVEY_PATH, mode='r') as f: + # pending_report = json.load(f) + # pending_list = dict2list(pending_report) + + # logger.info(f"Generating global summary for {len(pending_list)} reports") + + # for report_name in pending_list: + # for report_dir in FINAL_REPORT_DIRECTORY.iterdir(): + # report_name = report_dir.name + result_path = Path(EXPLANATION_RESULT_PATH(report_name)) + report_info_path = result_path / "report_info.json" + report_info = json.load(report_info_path.open()) + key_api_effects_path = result_path / "explanation" / "key_api_effects.json" + key_api_effects = json.load(key_api_effects_path.open()) + generate(report_name, report_info, key_api_effects) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ExplanationGenerator/utils/__init__.py b/ExplanationGenerator/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ExplanationGenerator/utils/cg.py b/ExplanationGenerator/utils/cg.py new file mode 100644 index 00000000..8482d055 --- /dev/null +++ b/ExplanationGenerator/utils/cg.py @@ -0,0 +1,130 @@ +def get_cg_file_path(signature, apk_name, android_version): + from ExplanationGenerator.config import APK_CG_PATH, ANDROID_CG_PATH + from ExplanationGenerator.utils.helper import get_method_type, MethodType + + method_type = get_method_type(signature) + if method_type == MethodType.ANDROID: + file_path = ANDROID_CG_PATH(android_version) + elif method_type == MethodType.ANDROID_SUPPORT or method_type == MethodType.APPLICATION: + file_path = APK_CG_PATH(apk_name) + elif method_type == MethodType.JAVA: + raise ValueError("Java method signature is not supported") + else: + raise ValueError("Unknown method type") + + return file_path + + +def get_cache_cg_file_path(signature, apk_name, android_version, called): + from ExplanationGenerator.config import ANDROID_CG_CALLED_CACHE_PATH, ANDROID_CG_CALLER_CACHE_PATH, APK_CG_CALLED_CACHE_PATH, APK_CG_CALLER_CACHE_PATH + from ExplanationGenerator.utils.helper import get_method_type, MethodType + import hashlib + + hashed_signature = hashlib.sha256(signature.encode()).hexdigest() + method_type = get_method_type(signature) + if method_type == MethodType.ANDROID: + if android_version == "support": + raise ValueError("Android support version is not supported") + if called: + file_path = ANDROID_CG_CALLED_CACHE_PATH(android_version, hashed_signature) + else: + file_path = ANDROID_CG_CALLER_CACHE_PATH(android_version, hashed_signature) + elif method_type == MethodType.ANDROID_SUPPORT or method_type == MethodType.APPLICATION: + if called: + file_path = APK_CG_CALLED_CACHE_PATH(apk_name, hashed_signature) + else: + file_path = APK_CG_CALLER_CACHE_PATH(apk_name, hashed_signature) + elif method_type == MethodType.JAVA: + raise ValueError("Java method signature is not supported") + else: + raise ValueError("Unknown method type") + + return file_path + + +def get_called_methods(unsafe_signature, apk_name, android_version): + from ExplanationGenerator.utils.parser import is_same_signature + import json + import os + + signature = unsafe_signature.strip("<>") + try: + cache_file_path = get_cache_cg_file_path(signature, apk_name, android_version, True) + except ValueError: + return set() + if os.path.exists(cache_file_path): + with open(cache_file_path, "r") as f: + cache_file = json.load(f) + if signature in cache_file: + return set(cache_file[signature]) + + try: + file_path = get_cg_file_path(signature, apk_name, android_version) + except ValueError: + return set() + called_signature_set = set() + with open(file_path, "r") as lines: + for line in lines: + caller, callee = line.split("->") + if is_same_signature(caller, signature): + called_signature_set.add(callee.strip().strip("<>")) + + os.makedirs(os.path.dirname(cache_file_path), exist_ok=True) + if os.path.exists(cache_file_path): + with open(cache_file_path, "r") as f: + cache_file = json.load(f) + else: + cache_file = {} + + # 更新数据 + cache_file[signature] = list(called_signature_set) + + # 将更新后的数据写回文件 + with open(cache_file_path, "w") as f: + json.dump(cache_file, f, indent=4) + + return called_signature_set + + +def get_callers_method(unsafe_signature, apk_name, android_version): + from ExplanationGenerator.utils.parser import is_same_signature + import json + import os + + signature = unsafe_signature.strip("<>") + try: + cache_file_path = get_cache_cg_file_path(signature, apk_name, android_version, False) + except ValueError: + return set() + if os.path.exists(cache_file_path): + with open(cache_file_path, "r") as f: + cache_file = json.load(f) + if signature in cache_file: + return set(cache_file[signature]) + + try: + file_path = get_cg_file_path(signature, apk_name, android_version) + except ValueError: + return set() + caller_signature_set = set() + with open(file_path, "r") as lines: + for line in lines: + caller, callee = line.split("->") + if is_same_signature(callee, signature): + caller_signature_set.add(caller.strip().strip("<>")) + + os.makedirs(os.path.dirname(cache_file_path), exist_ok=True) + if os.path.exists(cache_file_path): + with open(cache_file_path, "r") as f: + cache_file = json.load(f) + else: + cache_file = {} + + # 更新数据 + cache_file[signature] = list(caller_signature_set) + + # 将更新后的数据写回文件 + with open(cache_file_path, "w") as f: + json.dump(cache_file, f, indent=4) + + return caller_signature_set \ No newline at end of file diff --git a/ExplanationGenerator/utils/constraint_checker.py b/ExplanationGenerator/utils/constraint_checker.py new file mode 100644 index 00000000..5f5bdd34 --- /dev/null +++ b/ExplanationGenerator/utils/constraint_checker.py @@ -0,0 +1,157 @@ +import re +from ExplanationGenerator.config import logger +""" +Constraint: +[checkStartActivityResult]: : The intent parameter should not be null and should have a valid component that can be resolved to an existing activity in the AndroidManifest.xml file. +""" + + +def parse_constraint(full_constraint, findall=False): + """ + Example constraint: + + 1. [foo1]: <= 0 + 2. [foo2]: == null + """ + pattern = r"\[(.*?)\]: (.*)" + if not findall: + match = re.match(pattern, full_constraint) + if match is None: + raise ValueError(f"Invalid constraint: {full_constraint}") + method_name, constraint = match.groups() + else: + match = re.findall(pattern, full_constraint) + if len(match) == 0: + raise ValueError(f"Invalid constraint: {full_constraint}") + method_name, constraint = match[0] + + item_pattern = r"<(Parameter|Field) (\w+): (\w+) (\w+)>" + items = re.findall(item_pattern, constraint) + # if len(items) == 0: + # logger.info(f"No items found in constraint: {full_constraint}") + + for item in items: + item_type, item_index, item_type_name, item_name = item + logger.debug( + f"Parsed Constraint Item: {item_type}, {item_index}, {item_type_name}, {item_name}" + ) + if item_type == "Parameter": + if not item_index.isdigit(): + raise ValueError(f"Invalid parameter index: {item_index}") + return method_name, items + + +def fill_full_signature(method_signature, constraint): + pattern = r"\[(.*?)\]: (.*)" + new_constraint = re.sub(pattern, f"[{method_signature}]: \\2", constraint) + + return new_constraint + + +def constraint_basic_check(constraint, method_signature, android_version, apk_name, allow_no_item=True, findall=False): + """ + Only do some basic check, include: + + 1. Check the format of the constraint + 2. Check constraint items(Parameters and Fields) are valid(same as the method snippet) + """ + from ExplanationGenerator.utils.parser import ( + parse_signature, + parse_code, + parse_code_from_signature, + get_node, + method_node_filter_func_generator, + ) + from ExplanationGenerator.exceptions import ConstraintBasicCheckError + _, _, _, _, signature_method_name, _ = parse_signature(method_signature) + code_lines, original_node = parse_code_from_signature(method_signature, apk_name, android_version) + target_node = get_node( + original_node, [method_node_filter_func_generator(method_signature)], code_lines + ) + parameters = [] + for parameter in target_node.parameters: + parameters.append((parameter.type.name, parameter.name)) + + try: + constraint_method_name, constraint_items = parse_constraint(constraint, findall=findall) + except ValueError as e: + raise ConstraintBasicCheckError(f"Your constraint format is wrong. Please check your constraint follow this format: [method_name]: detailed constraint. example: [foo1]: <= 0") + if constraint_method_name != signature_method_name: + raise ConstraintBasicCheckError( + f"The method name in constraint is mismatch: {constraint_method_name} != {signature_method_name}. Please check your constraint." + ) + + if not allow_no_item: + if len(constraint_items) == 0: + raise ConstraintBasicCheckError("Your constraint does not include any parameters with specified formats, but after being checked by static analysis tools, the constraint is related to some parameters. Please check your constraint follow this format: [method_name]: detailed constraint. example:[foo1]: <= 0") + + for item in constraint_items: + item_type, item_index, item_type_name, item_name = item + if item_type == "Parameter": + idx = int(item_index) + if idx >= len(parameters): + raise ConstraintBasicCheckError( + f"Invalid parameter index: {idx}. The method signature only has {len(parameters)} parameters." + ) + if item_type_name != parameters[idx][0]: + raise ConstraintBasicCheckError( + f"The type of the parameter {idx} does not match. The type in the constraint is {item_type_name}, but it is {parameters[idx][0]} in the method signature." + ) + if item_name != parameters[idx][1]: + raise ConstraintBasicCheckError( + f"The name of the parameter {idx} does not match. The name in the constraint is {item_name}, but it is {parameters[idx][1]} in the method signature." + ) + elif item_type == "Field": + pass + else: + # raise ConstraintBasicCheckError(f"Invalid item type: {constraint}") + pass + + +def constraint_static_analysis_check(constraint, method_signature, android_version, pass_chain_indexes, framework_reference_fields, field_check=True): + """ + Check constraint using the result provided by static analysis. + """ + from ExplanationGenerator.utils.parser import ( + parse_field_signature, + ) + from ExplanationGenerator.exceptions import ConstraintStaticAnalysisCheckError + + if field_check: + field_check_map = {} + for field in framework_reference_fields: + _, field_class_name, field_inner_class, field_type_name, field_name = parse_field_signature(field) + field_check_map[field_name] = { + "field_class_name": field_class_name.split(".")[-1] if field_inner_class is None else field_inner_class.split(".")[-1], + "field_type_name": field_type_name.split(".")[-1], + } + + _, constraint_items = parse_constraint(constraint, findall=True) + for item in constraint_items: + item_type, item_index, item_type_name, item_name = item + if item_type == "Parameter": + idx = int(item_index) + if pass_chain_indexes is not None and idx not in pass_chain_indexes: + raise ConstraintStaticAnalysisCheckError(f"Our static analysis tool identified that parameter {idx} is not related to this constraint, but your constraint include the parameter.") + elif item_type == "Field": + field_class_name = item_index + field_type_name = item_type_name + field_name = item_name + if field_check: + if field_name not in field_check_map: + raise ConstraintStaticAnalysisCheckError(f"Field {field_name} not in field check map") + if item_type_name != field_check_map[field_name]["field_type_name"]: + raise ConstraintStaticAnalysisCheckError( + f"Field type mismatch: {field_name} != {field_check_map[field_name]['field_type_name']}" + ) + if field_class_name != field_check_map[field_name]["field_class_name"]: + raise ConstraintStaticAnalysisCheckError( + f"Field class mismatch: {field_name} != {field_check_map[field_name]['field_class_name']}" + ) + + +if __name__ == "__main__": + constraint = "[foo1]: <= 0" + method_signature = "android.app.ContextImpl.startActivity" + android_version = "8.0" + constraint_basic_check(constraint, method_signature, android_version) diff --git a/ExplanationGenerator/utils/helper.py b/ExplanationGenerator/utils/helper.py new file mode 100644 index 00000000..52288e94 --- /dev/null +++ b/ExplanationGenerator/utils/helper.py @@ -0,0 +1,36 @@ +from ExplanationGenerator.config import ANDROID_CODE_PATH +from enum import Enum + + +class MethodType(Enum): + JAVA = "java" + ANDROID = "android" + APPLICATION = "application" + ANDROID_SUPPORT = "android_support" + + +def get_method_type(method_signature): + from .parser import parse_signature + + package_name, _, _, _, _, _ = parse_signature(method_signature) + if package_name.startswith("java"): + return MethodType.JAVA + elif package_name.startswith("android.support"): + return MethodType.ANDROID_SUPPORT + elif package_name.startswith("android") or package_name.startswith("com.android"): + return MethodType.ANDROID + else: + return MethodType.APPLICATION + + +def method_signature_into_path(method_signature): + from .parser import parse_signature + + package_name, class_name, _, _, _, _ = parse_signature(method_signature) + path = ( + package_name.replace(".", "/") + + "/" + + class_name + + ".java" + ) + return path diff --git a/ExplanationGenerator/utils/llm.py b/ExplanationGenerator/utils/llm.py new file mode 100644 index 00000000..80c81e79 --- /dev/null +++ b/ExplanationGenerator/utils/llm.py @@ -0,0 +1,100 @@ +import openai +from openai import OpenAI +from openai._types import NOT_GIVEN +from ExplanationGenerator.config import logger, GPT_MODEL, TEMPERATURE, SEED +from termcolor import colored +import os +import json + +from ExplanationGenerator.config import OPENAI_API_KEY, OPENAI_BASH_URL +client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASH_URL) + + +def send_message(messages, tools=None, tool_choice=None, json_mode=False): + try: + conversation = messages.copy() + logger.info(f"Sending message, waiting for response...") + response = client.chat.completions.create( + model=GPT_MODEL, + messages=messages, + temperature=TEMPERATURE, + seed=SEED, + tools=tools if tools is not None else NOT_GIVEN, + tool_choice=tool_choice if tool_choice is not None else NOT_GIVEN, + response_format={"type": "json_object"} if json_mode else NOT_GIVEN, + timeout=240, + ) + logger.info(f"Received response!") + response_message = json.loads(response.model_dump_json())["choices"][0][ + "message" + ] + # delete all attribute but 'role' and 'content' + for key in list(response_message.keys()): + if key not in ["role", "content"]: + del response_message[key] + conversation.append(response_message) + return conversation, response.system_fingerprint + except openai.APIConnectionError as e: + logger.error(f"Failed to connect to OpenAI API: {e}") + exit() + except openai.APIError as e: + logger.error(f"OpenAI API returned an API Error: {e}") + exit() + + +def message_writer(message, colorful: bool, write_function): + role_to_color = { + "system": "red", + "user": "green", + "assistant": "blue", + "function": "magenta", + } + color = role_to_color[message["role"]] if colorful else None + + def colorize(text, color): + return colored(text, color) if color else text + + if message["role"] == "system": + write_function(colorize(f"system:\n{message['content']}\n", color)) + elif message["role"] == "user": + write_function(colorize(f"user:\n{message['content']}\n", color)) + elif message["role"] == "assistant" and message.get("function_call"): + write_function( + colorize( + f"assistant function_call:\n{message['function_call']['name']}\n{message['function_call']['arguments']}\n", + color, + ) + ) + elif message["role"] == "assistant" and not message.get("function_call"): + write_function(colorize(f"assistant:\n{message['content']}\n", color)) + elif message["role"] == "function": + write_function( + colorize(f"function ({message['name']}): {message['content']}\n", color) + ) + + +def conversation_writer(messages, colorful: bool, write_function): + for message in messages: + message_writer(message, colorful=colorful, write_function=write_function) + write_function("-----------------------------------") + + +def pretty_print_conversation(messages): + conversation_writer(messages, colorful=True, write_function=print) + + +def write_conversation(messages, file_name): + with open(file_name, "w") as f: + conversation_writer( + messages, colorful=False, write_function=lambda x: f.write(x + "\n") + ) + + +def pretty_log_conversation(messages): + conversation_writer(messages, colorful=False, write_function=logger.debug) + + +def save_messages(message, directory, file_name): + with open(f"{directory}/{file_name}.json", "w") as f: + json.dump(message, f, indent=4) + write_conversation(message, f"{directory}/{file_name}.txt") diff --git a/ExplanationGenerator/utils/parser.py b/ExplanationGenerator/utils/parser.py new file mode 100644 index 00000000..ba1a0094 --- /dev/null +++ b/ExplanationGenerator/utils/parser.py @@ -0,0 +1,504 @@ +import re +import javalang +from ExplanationGenerator.config import logger +from ExplanationGenerator.utils.helper import get_method_type, MethodType + +class NodeNotFoundException(Exception): + pass + +class InvalidSignatureException(Exception): + pass + +class MultipleNodeException(Exception): + pass + + +def parse_signature(method_signature): + """ + Example Method Signature: + 1. android.view.ViewRoot: void checkThread() + 2. android.view.ViewRoot.checkThread + 3. android.view.ViewRoot: android.view.ViewParent invalidateChildInParent(int[],android.graphics.Rect) + + Counter Example: + 1. ; ; + """ + method_signature = method_signature.strip().strip("<>") + pattern1 = r"^(\S+)\.(\w+)(\$\S+)?: (\S+) ([\w$]+|)(\([^()]*?\))?$" + pattern2 = r"^(\S+)\.(\w+)(\$\S+)?\.(\S+)$" + match1 = re.match(pattern1, method_signature) + match2 = re.match(pattern2, method_signature) + if match1: + ( + package_name, + class_name, + inner_class, + return_type, + method_name, + parameter_list, + ) = match1.groups() + if inner_class: + inner_class = inner_class.strip("$") + if parameter_list: + parameters = [ + param.strip() for param in parameter_list.strip("()").split(",") + ] + # remove empty string + parameters = list(filter(None, parameters)) + else: + parameters = None + + return ( + package_name, + class_name, + inner_class, + return_type, + method_name, + parameters, + ) + elif match2: + package_name, class_name, inner_class, method_name = match2.groups() + if inner_class: + inner_class = inner_class.strip("$") + return package_name, class_name, inner_class, None, method_name, None + else: + raise InvalidSignatureException(f"Invalid signature: { method_signature }") + + +def is_same_signature(signature1, signature2): + if signature1 == signature2: + return True + if signature1.strip().strip("<>") == signature2.strip().strip("<>"): + return True + return False + + +def parse_field_signature(field_signature): + """ + Example Field Signature: + + 1. android.view.ViewRoot: java.lang.Thread mThread + 1. android.view.ViewRoot$InnerClass: java.lang.Thread mThread + """ + field_signature = field_signature.strip().strip("<>") + pattern = r"(\S+)\.(\w+)(\$\S+)?: (\S+) (\w+)" + match = re.match(pattern, field_signature) + if match: + package_name, class_name, inner_class, type_name, field_name = match.groups() + if inner_class: + inner_class = inner_class.strip("$") + return package_name, class_name, inner_class, type_name, field_name + else: + raise Exception(f"Invalid signature: { field_signature }") + + +def get_method_start_end(method_node, tree): + startpos = None + endpos = None + startline = None + endline = None + for path, node in tree: + if startpos is not None and method_node not in path: + endpos = node.position + endline = node.position.line if node.position is not None else None + break + if startpos is None and node == method_node: + startpos = node.position + startline = node.position.line if node.position is not None else None + return startpos, endpos, startline, endline + + +def get_method_text( + startpos, endpos, startline, endline, last_endline_index, codelines +): + if startpos is None: + return "", None, None, None + else: + startline_index = startline - 1 + endline_index = endline - 1 if endpos is not None else None + + # 1. check for and fetch annotations + if last_endline_index is not None: + for line in codelines[(last_endline_index + 1) : (startline_index)]: + if "@" in line: + startline_index = startline_index - 1 + meth_text = "".join(codelines[startline_index:endline_index]) + meth_text = meth_text[: meth_text.rfind("}") + 1] + + # 2. remove trailing rbrace for last methods & any external content/comments + if not abs(meth_text.count("}") - meth_text.count("{")) == 0: + # imbalanced braces + brace_diff = abs(meth_text.count("}") - meth_text.count("{")) + + for _ in range(brace_diff): + meth_text = meth_text[: meth_text.rfind("}")] + meth_text = meth_text[: meth_text.rfind("}") + 1] + + meth_lines = meth_text.split("") + meth_text = "".join(meth_lines) + last_endline_index = startline_index + (len(meth_lines) - 1) + + return ( + meth_text, + (startline_index + 1), + (last_endline_index + 1), + last_endline_index, + ) + + +def parse_code(path): + with open(path, "r") as f: + code_lines = f.readlines() + code_text = "".join(code_lines) + + original_node = javalang.parse.parse(code_text) + + return code_lines, original_node + + +def parse_code_from_signature(method_signature, apk_name, android_version): + from ExplanationGenerator.config import ANDROID_CODE_PATH, APK_CODE_PATH + from ExplanationGenerator.utils.helper import method_signature_into_path + + if get_method_type(method_signature) == MethodType.ANDROID: + path = f'{ANDROID_CODE_PATH(android_version)}/{method_signature_into_path(method_signature)}' + elif get_method_type(method_signature) == MethodType.APPLICATION: + path = f'{APK_CODE_PATH(apk_name)}/{method_signature_into_path(method_signature)}' + elif get_method_type(method_signature) == MethodType.ANDROID_SUPPORT: + path = f'{APK_CODE_PATH(apk_name)}/{method_signature_into_path(method_signature)}' + else: + raise Exception(f"I don't know where to find the stupid code for {method_signature}") + + return parse_code(path) + + +def get_node(original_node, node_filters, code_lines): + target_node = original_node + for node_filter_func in node_filters: + target_node = node_filter_func(target_node, code_lines) + + return target_node + + +def first_brace_end_index(code_lines, startline, endline = None): + stack = [] + for line in range(startline, len(code_lines)): + str = code_lines[line] + for char in str: + if char == '{': + stack.append(char) + elif char == '}': + if len(stack) == 1: + if endline is not None and line > endline: + raise Exception(f"Longer than expected!") + return line + elif len(stack) == 0: + logger.error(f"snippet:\n{''.join(code_lines[startline: line + 1])}") + raise Exception(f"Brach match error!") + else: + stack.pop() + raise Exception(f"Brach match error!") + + + +def reverse_first_brace_end_index(code_lines, endline, startline = None): + stack = [] + for line in range(endline, -1, -1): + str = code_lines[line] + for char in str[::-1]: + if char == '}': + stack.append(char) + elif char == '{': + if len(stack) > 0: + stack.pop() + else: + return line + raise Exception(f"Brach match error!") + + +def get_node_snippet(path, node_filters): + code_lines, original_node = parse_code(path) + target_node = get_node(original_node, node_filters, code_lines) + if target_node is None: + raise NodeNotFoundException(f"Node not found: {node_filters}") + + lex = None + method_snippet = "" + if isinstance(target_node, javalang.tree.MethodDeclaration): + startline = target_node.position.line - 1 + endline = first_brace_end_index(code_lines, startline) + method_text = "".join(code_lines[startline: endline + 1]) + # startpos, endpos, startline, endline = get_method_start_end( + # target_node, original_node + # ) + # method_text, startline, endline, lex = get_method_text( + # startpos, endpos, startline, endline, lex, code_lines + # ) + elif isinstance(target_node, javalang.tree.ClassDeclaration): + startline = target_node.position.line - 1 + endline = None + for node in target_node.fields + target_node.methods: + if endline is None or node.position.line < endline: + endline = node.position.line - 1 + for line in range(startline, endline): + if '{' in code_lines[line]: + endline = line + 1 + method_text = "".join(code_lines[startline: endline]) + elif isinstance(target_node, javalang.tree.FieldDeclaration): + startline = target_node.position.line + endline = startline + 1 + method_text = "".join(code_lines[startline - 1: endline - 1]) + else: + raise Exception(f"Unsupported node type: {type(target_node)}") + method_snippet += method_text + + if method_snippet == "": + return None + return method_snippet + + +def method_node_filter_func_generator(method_signature): + package_name, class_name, inner_class, _, method_name, parameters = parse_signature( + method_signature + ) + if inner_class is None: + innermost_class = class_name + elif '$' not in inner_class: + innermost_class = inner_class + else: + innermost_class = inner_class.split('$')[-1] + + def node_filter_func(original_tree, code_lines): + target_class_tree = original_tree + if not innermost_class.isdigit(): + # Not is a anonymous inner class + tree_candidate = [ + node + for _, node in target_class_tree.filter( + javalang.tree.ClassDeclaration + ) + if node.name == innermost_class + ] + else: + # Anonymous inner class + pending_tree_candidate = [ + node + for path, node in target_class_tree.filter( + javalang.tree.ClassCreator + ) + if node.body is not None and len(node.body) > 0 and all(isinstance(item, javalang.tree.MethodDeclaration) for item in node.body) and any(item.name == method_name for item in node.body) + ] + tree_candidate = [] + for class_creator_node in pending_tree_candidate: + min_line = class_creator_node.body[0].position.line + for method_node in class_creator_node.body: + if method_node.position.line < min_line: + min_line = method_node.position.line + + # why -2: min_line is real line number, but code_lines is 0-based, so -1; the line before the class line is the line of class declaration so -1 again + class_line = reverse_first_brace_end_index(code_lines, min_line - 2) + labeled_signature = '.'.join([package_name, class_name, inner_class.replace('$', '.')]) + if labeled_signature in code_lines[class_line]: + tree_candidate.append(class_creator_node) + + if len(tree_candidate) == 0: + return None + if len(tree_candidate) == 1: + target_class_tree = tree_candidate[0] + else: + raise MultipleNodeException(f"multiple class node: {method_signature}") + + method_nodes = [ + node + for _, node in target_class_tree.filter(javalang.tree.MethodDeclaration) + ] + constructor_nodes = [ + node + for _, node in target_class_tree.filter( + javalang.tree.ConstructorDeclaration + ) + ] + nodes = method_nodes + constructor_nodes + target_nodes = [] + for method_node in nodes: + if method_node.name != method_name: + continue + if parameters is not None: + if len(method_node.parameters) != len(parameters): + continue + + unmatched_parameters = False + for index, value in enumerate(method_node.parameters): + if value.type.name != parameters[index].split(".")[-1]: + unmatched_parameters = True + if unmatched_parameters: + continue + target_nodes.append(method_node) + + if len(target_nodes) == 0: + return None + elif len(target_nodes) == 1: + return target_nodes[0] + else: + from ExplanationGenerator.config import bypass_signature + + if method_signature in bypass_signature.values(): + return target_nodes[0] + else: + raise MultipleNodeException(f"multiple method node: {method_signature}") + + return node_filter_func + + +def class_node_filter_func_generator(method_signature): + _, class_name, inner_class, _, _, _ = parse_signature( + method_signature + ) + target_class = class_name if inner_class is None else inner_class + + def node_filter_func(original_tree, _): + tree_candidate = [ + node + for _, node in original_tree.filter(javalang.tree.ClassDeclaration) + if node.name == target_class + ] + if len(tree_candidate) == 0: + return None + if len(tree_candidate) == 1: + return tree_candidate[0] + else: + raise Exception(f"multiple class node: {method_signature}") + + return node_filter_func + + +def field_node_filter_func_generator(field_name): + def node_filter_func(original_tree, _): + if isinstance(original_tree, javalang.tree.ClassDeclaration): + tree_candidate = [ + node + for node in original_tree.fields + if node.declarators[0].name == field_name + ] + else: + tree_candidate = [ + node + for _, node in original_tree.filter(javalang.tree.FieldDeclaration) + if node.declarators[0].name == field_name + ] + if len(tree_candidate) == 0: + return None + if len(tree_candidate) == 1: + return tree_candidate[0] + else: + raise Exception(f"multiple field node: {field_name}") + + return node_filter_func + + +def get_method_snippet(method_signature, apk_name, android_version, framework_reference_fields=[]): + if get_method_type(method_signature) == MethodType.ANDROID: + return get_framework_method_snippet(method_signature, android_version, framework_reference_fields) + elif get_method_type(method_signature) == MethodType.APPLICATION: + return get_application_method_snippet(method_signature, apk_name) + elif get_method_type(method_signature) == MethodType.ANDROID_SUPPORT: + return get_application_method_snippet(method_signature, apk_name) + else: + raise Exception(f"Unsupported method type: {method_signature}") + + +def get_framework_method_snippet(method_signature, android_version, framework_reference_fields=[]): + from ExplanationGenerator.config import ANDROID_CODE_PATH + from ExplanationGenerator.utils.helper import method_signature_into_path + + path = f'{ANDROID_CODE_PATH(android_version)}/{method_signature_into_path(method_signature)}' + class_snippet = get_node_snippet( + path, + [class_node_filter_func_generator(method_signature)], + ) + method_snippet = get_node_snippet( + path, + [method_node_filter_func_generator(method_signature)], + ) + + reference_fields = [] + method_package_name, method_class_name, method_inner_class, _, _, _ = parse_signature(method_signature) + # Find matched field + for field in framework_reference_fields: + field_package_name, field_class_name, field_inner_class, field_type_name, field_name = parse_field_signature(field) + if method_package_name == field_package_name and method_class_name == field_class_name and method_inner_class == field_inner_class: + reference_fields.append(field_name) + if len(reference_fields) > 0: + for field_name in reference_fields: + field_snippet = get_node_snippet( + path, + [class_node_filter_func_generator(method_signature), field_node_filter_func_generator(field_name)], + ) + class_snippet = f"{class_snippet}\n{field_snippet}" + + snippet = f"{class_snippet}\n{method_snippet}\n\n}}" + logger.debug(f"Snippet:\n{snippet}") + return snippet + +def get_application_method_snippet(method_signature, apk_name): + from ExplanationGenerator.config import APK_CODE_PATH + from ExplanationGenerator.utils.helper import method_signature_into_path + + path = f'{APK_CODE_PATH(apk_name)}/{method_signature_into_path(method_signature)}' + return get_node_snippet( + path, + [method_node_filter_func_generator(method_signature)], + ) + + +def get_throw_snippet(method_signature, android_version, exception_name): + def node_filter_func(original_tree, _): + candidates = [ + node for _, node in original_tree.filter(javalang.tree.ThrowStatement) + ] + throw_nodes = [] + for node in candidates: + if node.expression.type.name == exception_name: + throw_nodes.append(node) + + if len(throw_nodes) == 0: + return None + elif len(throw_nodes) == 1: + return throw_nodes[0] + else: + raise Exception(f"multiple throw node: {method_signature}") + + from ExplanationGenerator.config import ANDROID_CODE_PATH + from ExplanationGenerator.utils.helper import method_signature_into_path + + path = f'{ANDROID_CODE_PATH(android_version)}/{method_signature_into_path(method_signature)}' + return get_node_snippet( + path, + [method_node_filter_func_generator(method_signature), node_filter_func], + ) + + +def parse_message(message): + pattern = r"(\w+):\s```\s(.*?)\s```" + # 使用 re.DOTALL 使得 '.' 匹配包括换行符在内的所有字符 + matches = re.findall(pattern, message, re.DOTALL) + + extracted = {} + for label, content in matches: + extracted[label] = content.strip() + + return extracted + + +if __name__ == "__main__": + print( + parse_signature( + "android.app.ActivityThread$ApplicationThread$3: void schedulePauseActivity(android.os.IBinder,boolean,boolean,int,boolean)" + ) + ) + print( + parse_signature( + "android.app.ActivityThread$ApplicationThread$3: void schedulePauseActivity" + ) + ) + print(parse_signature("android.app.ContextImpl.startActivity"))