From 98dc4fcfccf5977066dc8092d2b78907998d37d3 Mon Sep 17 00:00:00 2001 From: Hourunze1997 <1043170898@qq.com> Date: Fri, 18 Apr 2025 15:30:58 +0800 Subject: [PATCH] fix codecheck problem --- app/api/endpoints/webhook.py | 36 ++++++++++++++++++++++++++-------- app/config.py | 7 +++++-- app/main.py | 17 +++++++++++----- app/utils/euler_api.py | 3 +++ app/utils/euler_maker_api.py | 12 ++++++++---- app/utils/git_api.py | 12 ++++++++---- app/utils/gitee_tool.py | 20 ++++++++++++------- app/utils/processor.py | 7 ++++--- tests/test_processor.py | 38 +++++++++++++++++++----------------- 9 files changed, 101 insertions(+), 51 deletions(-) diff --git a/app/api/endpoints/webhook.py b/app/api/endpoints/webhook.py index 0126347..3702659 100644 --- a/app/api/endpoints/webhook.py +++ b/app/api/endpoints/webhook.py @@ -1,22 +1,26 @@ +# 标准库 import asyncio import re import time -import traceback +import hmac +import hashlib +import logging +# 第三方库 from fastapi import APIRouter, Request, HTTPException, Header, status, BackgroundTasks + +# 应用程序自定义模块 from app.config import settings, init_db_pool from app.utils import git_api, gitee_tool from app.utils.client import silicon_client from app.utils import euler_maker_api as maker -import hmac -import hashlib -import logging router = APIRouter() logger = logging.getLogger(__name__) MAX_RETRIES = 0 db_pool = init_db_pool() + def verify_signature(body: bytes, signature: str) -> bool: """Verify HMAC signature of webhook payload.""" try: @@ -88,8 +92,16 @@ async def handle_webhook( conn = db_pool.get_connection() cursor = conn.cursor() cursor.execute( - "INSERT INTO pending_requests (repo_url,source_url,pr_number,repo_name,pr_url,spec_content) VALUES (%s,%s,%s,%s,%s,%s)", - (pr_data["repo_url"],pr_data["source_url"],pr_data["pr_number"],pr_data["repo_name"],pr_data["pr_url"],spec_content) + "INSERT INTO pending_requests (repo_url, source_url, pr_number, repo_name, pr_url, spec_content) " + "VALUES (%s, %s, %s, %s, %s, %s)", + ( + pr_data["repo_url"], + pr_data["source_url"], + pr_data["pr_number"], + pr_data["repo_name"], + pr_data["pr_url"], + spec_content + ) ) conn.commit() return {"status": "处理已启动"} @@ -157,7 +169,12 @@ async def handle_build_retries(pr_data: dict, current_spec: str, srcDir: str, bu ) repair_job_id = maker.get_job_id(settings.os_repair_project, pr_data["repo_name"]) commit_url = f"{fork_url}/commit/{commit_sha}" - maker_url = f"https://eulermaker.compass-ci.openeuler.openatom.cn/package/build-record?osProject={settings.os_repair_project}&packageName={pr_data['repo_name']}&jobId={repair_job_id}" + maker_url = ( + f"https://eulermaker.compass-ci.openeuler.openatom.cn/package/build-record?" + f"osProject={settings.os_repair_project}&" + f"packageName={pr_data['repo_name']}&" + f"jobId={repair_job_id}" + ) # 递归处理 await handle_build_retries(pr_data, new_spec, srcDir, new_build_id, retry_count + 1, commit_url, maker_url) @@ -228,7 +245,10 @@ async def process_initial_repair(pr_data: dict, original_spec: str): repair_job_id = maker.get_job_id(settings.os_repair_project, pr_data["repo_name"]) commit_url = f"{fork_url}/commit/{commit_sha}" - maker_url = f"https://eulermaker.compass-ci.openeuler.openatom.cn/package/build-record?osProject={settings.os_repair_project}&packageName={pr_data['repo_name']}&jobId={repair_job_id}" + maker_url = (f"https://eulermaker.compass-ci.openeuler.openatom.cn/package/build-record?" + f"osProject={settings.os_repair_project}&" + f"packageName={pr_data['repo_name']}&" + f"jobId={repair_job_id}") await handle_build_retries(pr_data, fixed_spec, srcDir, repair_build_id, 0, commit_url, maker_url) except Exception as e: diff --git a/app/config.py b/app/config.py index 1d6604e..02e8c71 100644 --- a/app/config.py +++ b/app/config.py @@ -3,6 +3,7 @@ import yaml from mysql.connector import pooling + class Settings: def __init__(self): config_path_env = os.getenv("CONFIG_PATH") @@ -44,7 +45,7 @@ def __init__(self): self.password: str = config.get("PASSWORD") self.database: str = config.get("DATABASE") self.pool_size: int = config.get("POOL_SIZE") - #os.remove(config_path_env) + # os.remove(config_path_env) settings = Settings() @@ -57,9 +58,11 @@ def __init__(self): "database": settings.database, 'charset': 'utf8mb4' } + + def init_db_pool(): return pooling.MySQLConnectionPool( pool_name="request_pool", pool_size=settings.pool_size, **db_config - ) \ No newline at end of file + ) diff --git a/app/main.py b/app/main.py index 9daf539..d70e0e9 100644 --- a/app/main.py +++ b/app/main.py @@ -1,10 +1,15 @@ +# 标准库 import logging +import asyncio + +# 第三方库 from fastapi import FastAPI, HTTPException, Request, status + +# 应用程序自定义模块 from app.api.endpoints import webhook from app.config import settings from app.utils.processor import RequestProcessor -import asyncio -import time + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -13,7 +18,8 @@ app.include_router(webhook.router, prefix="/api/v1", tags=["webhooks"]) processor = RequestProcessor() -@app.on_event ("startup") + +@app.on_event("startup") def startup_event(): """ 在应用程序启动时执行的事件处理函数。 @@ -22,8 +28,9 @@ def startup_event(): """ # 延迟5秒后执行后台任务 loop = asyncio.get_running_loop() - loop.call_later(5, lambda: asyncio.create_task(processor.start())) - + loop.call_later(5, lambda: asyncio.create_task(processor.start())) + + @app.middleware("http") async def log_requests(request: Request, call_next): # 记录请求的基本信息 diff --git a/app/utils/euler_api.py b/app/utils/euler_api.py index 6391e27..d3ec2db 100644 --- a/app/utils/euler_api.py +++ b/app/utils/euler_api.py @@ -69,6 +69,7 @@ def get_privacy_version(self) -> str: except (RequestException, json.JSONDecodeError) as e: self._handle_request_exception(e, "获取隐私协议版本") + return "" def authenticate(self, version: str) -> None: """进行用户认证并维护会话状态""" @@ -166,6 +167,7 @@ def get_auth_code(self) -> str: except (RequestException, IndexError) as e: self._handle_request_exception(e, "获取授权码") + return "" def get_access_token(self, code: str) -> str: """使用授权码获取访问令牌""" @@ -184,6 +186,7 @@ def get_access_token(self, code: str) -> str: except RequestException as e: self._handle_request_exception(e, "获取访问令牌") + return "" def execute_flow(self) -> str: """执行完整的OAuth流程""" diff --git a/app/utils/euler_maker_api.py b/app/utils/euler_maker_api.py index cde138b..4dec616 100644 --- a/app/utils/euler_maker_api.py +++ b/app/utils/euler_maker_api.py @@ -1,12 +1,16 @@ +# 标准库 import asyncio import logging import time import uuid +# 第三方库 import httpx import requests from typing import Any, Dict, List, Optional from requests.exceptions import RequestException + +# 应用程序自定义模块 from . import euler_api # 常量定义 @@ -40,7 +44,7 @@ def _request_wrapper( time.sleep(RETRY_DELAY * (attempt + 1)) else: logger.info(f"最终请求失败: {url}") - return None + return None def get_request_headers() -> Dict[str, str]: @@ -267,6 +271,7 @@ def get_build_log(url: str) -> Optional[str]: logger.error(f"Log fetch error: {str(e)}") return None + async def get_build_status( build_id: str, max_retries: int = 10, @@ -283,7 +288,7 @@ async def get_build_status( for attempt in range(1, max_retries + 1): try: - # 添加请求追踪ID + # 添加请求追踪ID headers = get_request_headers() | {"X-Request-ID": uuid.uuid4().hex} response = requests.post( @@ -309,9 +314,8 @@ async def get_build_status( except httpx.HTTPStatusError as e: logger.error(f"HTTP error {e.response.status_code}: {e.response.text}") if e.response.status_code == 404: - break # 立即终止不存在的请求 + break # 立即终止不存在的请求 except Exception as e: logger.error(f"Unexpected error: {str(e)}") return None - diff --git a/app/utils/git_api.py b/app/utils/git_api.py index 67c18cd..56e7f0b 100644 --- a/app/utils/git_api.py +++ b/app/utils/git_api.py @@ -1,19 +1,22 @@ +# 标准库 import os import shutil import stat import subprocess +import logging from abc import ABC, abstractmethod from base64 import b64encode from urllib.parse import urlparse +# 第三方库 import httpx - -from app.config import settings import requests -import logging +# 应用程序自定义模块 +from app.config import settings from app.utils.client import api_client + logger = logging.getLogger("git_api") @@ -336,6 +339,7 @@ def update_spec_file(service, owner, repo, file_content, branch): return clone_url, sha, branch except Exception as e: logger.info(f"提交失败: {str(e)}") + return "", "", "" def comment_on_pr(repo_url, pr_num, comment): @@ -405,7 +409,7 @@ def check_and_push(repo_url, new_content, pr_num): ["git", "rev-parse", "HEAD"], cwd=temp_dir, text=True).strip() if os.path.exists(temp_dir): shutil.rmtree(temp_dir, onerror=force_remove_readonly) - return f'{repo_url}.git', commit_sha, branch + return f'{repo_url}.git', commit_sha, branch def force_remove_readonly(func, path, _): diff --git a/app/utils/gitee_tool.py b/app/utils/gitee_tool.py index ae24f95..1bddb53 100644 --- a/app/utils/gitee_tool.py +++ b/app/utils/gitee_tool.py @@ -1,10 +1,14 @@ +# 标准库 import json import logging import os import tarfile -import requests import tempfile -from urllib.parse import urlparse, urlencode +from urllib.parse import urlparse + +# 第三方库 +import requests + GITEE_API = "https://gitee.com/api/v5" logger = logging.getLogger(__name__) @@ -69,11 +73,13 @@ def download_gitee_file(raw_url, token=None): _, ext = os.path.splitext(raw_url) fd, path = tempfile.mkstemp(suffix=ext) - with os.fdopen(fd, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - if chunk: - f.write(chunk) - + try: + with os.fdopen(fd, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + finally: + os.remove(path) return path diff --git a/app/utils/processor.py b/app/utils/processor.py index 1b417a7..4566c79 100644 --- a/app/utils/processor.py +++ b/app/utils/processor.py @@ -10,12 +10,11 @@ # PURPOSE. # See the Mulan PSL v2 for more details. # ******************************************************************************/ +import logging +import asyncio -import time from app.config import settings, init_db_pool from app.api.endpoints.webhook import process_initial_repair -import logging -import asyncio REPAIR_STATUS_COMPLETED = "completed" @@ -24,6 +23,8 @@ REPAIR_STATUS_PENDING = "pending" logger = logging.getLogger(__name__) + + class RequestProcessor: def __init__(self): """ diff --git a/tests/test_processor.py b/tests/test_processor.py index 0bc2b0e..5392c8e 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -20,17 +20,18 @@ settings.check_interval = 1 # 缩短检查间隔便于测试 settings.thread_pool_size = 2 + class TestRequestProcessor(unittest.IsolatedAsyncioTestCase): def setUp(self): # 模拟数据库连接池 self.mock_db_pool = MagicMock() self.mock_conn = MagicMock() self.mock_cursor = MagicMock() - + # 设置模拟对象行为 self.mock_db_pool.get_connection.return_value = self.mock_conn self.mock_conn.cursor.return_value = self.mock_cursor - + # 初始化被测对象 self.processor = RequestProcessor() self.processor.db_pool = self.mock_db_pool @@ -40,7 +41,7 @@ async def test_initialization(self): """测试初始化是否正确创建线程池""" # 验证线程池创建 self.assertEqual(len(asyncio.all_tasks()), settings.thread_pool_size) - + # 验证数据库连接池初始化 self.mock_db_pool.get_connection.assert_called_once() @@ -52,25 +53,25 @@ async def test_process_pending_requests_normal_flow(self): (2, 'repo2', 'source2', 456, 'repo_name2', 'pr_url2', 'spec2') ] self.mock_cursor.fetchall.return_value = mock_requests - + # 执行测试方法 await self.processor.process_pending_requests() - + # 验证状态更新 self.mock_cursor.execute.assert_any_call( "UPDATE pending_requests SET status = 'processing' WHERE id IN (%s,%s)", ['1', '2'] ) - + # 验证任务提交 self.assertEqual(self.processor.task_queue.qsize(), 2) async def test_process_pending_requests_no_requests(self): """测试无挂起请求时的处理""" self.mock_cursor.fetchall.return_value = [] - + await self.processor.process_pending_requests() - + # 验证没有执行更新操作 self.mock_cursor.execute.assert_not_called() @@ -79,7 +80,7 @@ async def test_process_request_success(self): # 模拟修复处理成功 with patch('your_module.process_initial_repair', return_value=None) as mock_repair: await self.processor.process_request(1, {'spec_content': 'valid_spec'}) - + # 验证状态更新 self.mock_cursor.execute.assert_any_call( "UPDATE pending_requests SET status = 'completed' WHERE id = %s", @@ -92,7 +93,7 @@ async def test_process_request_failure(self): # 模拟修复处理抛出异常 with patch('your_module.process_initial_repair', side_effect=Exception("Test error")): await self.processor.process_request(1, {'spec_content': 'invalid_spec'}) - + # 验证状态更新为failed self.mock_cursor.execute.assert_any_call( "UPDATE pending_requests SET status = 'failed' WHERE id = %s", @@ -116,10 +117,10 @@ async def test_concurrent_processing(self): # 添加多个任务到队列 for i in range(5): await self.processor.task_queue.put((i, {})) - + # 等待所有worker处理完成 await asyncio.sleep(0.1) # 等待事件循环处理 - + # 验证队列清空 self.assertEqual(self.processor.task_queue.qsize(), 0) self.assertTrue(self.processor.task_queue.empty()) @@ -128,25 +129,26 @@ async def test_database_rollback_on_error(self): """测试数据库事务回滚""" # 强制触发数据库异常 self.mock_cursor.execute.side_effect = Exception("Database error") - + with self.assertLogs() as captured: await self.processor.process_pending_requests() - + # 验证回滚和日志记录 self.assertIn("Database error", captured.output[0]) self.mock_conn.rollback.assert_called_once() async def test_task_queue_error_handling(self): """测试任务队列异常处理""" + # 创建会抛出异常的任务 async def failing_task(): raise ValueError("Task failed") - + # 提交异常任务 await self.processor.task_queue.put(failing_task()) - + # 等待worker处理 await asyncio.sleep(0.1) - + # 验证任务完成(即使失败) - self.assertTrue(self.processor.task_queue.empty()) \ No newline at end of file + self.assertTrue(self.processor.task_queue.empty())