From 6ec27b91a0a0c19c38ce296a0d41fb82295d54fb Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 7 Apr 2024 17:15:08 +0400 Subject: [PATCH] fix error with poll (#22) * fix error with poll * Update src/data_models/Game.py counter init ref Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update src/data_models/Game.py better error message while rules are broken Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * fix bug with saving results and drawing * ref role determine --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- src/data_models/Game.py | 109 ++++++++++++++++++++++++++++++++------ src/data_models/Record.py | 8 +-- src/handlers/save.py | 5 +- 3 files changed, 99 insertions(+), 23 deletions(-) diff --git a/src/data_models/Game.py b/src/data_models/Game.py index ce011d3..18905dc 100644 --- a/src/data_models/Game.py +++ b/src/data_models/Game.py @@ -1,8 +1,7 @@ -from typing import Literal +from typing import Literal, Tuple from pydantic import BaseModel, field_validator - -from src import config +from collections import Counter from src.data_models.PollResult import PollResult @@ -14,28 +13,106 @@ class Game(BaseModel): ] # Literal["Hitler Chancellor", "Fascist Law", "Hitler Death", "Liberal Law"] creator_id: int + @staticmethod + def extract_player_outcomes(results: Tuple[PollResult, ...]) -> Counter: + return Counter(outcome.get_answer_as_text() for outcome in results) + + @staticmethod + def remove_spectators(outcomes_counter: Counter) -> None: + outcomes_counter.pop("👀 SPECTATOR | NOT A PLAYER 👀", None) + + @staticmethod + def count_player_roles(outcomes_counter: Counter) -> tuple[int, int, int]: + total_hitlers = sum( + outcomes_counter[role] + for role in [ + "I'm Chancellor Hitler", + "I'm Dead Hitler", + "I'm Hitler Loser", + "I'm Hitler Winner", + ] + ) + total_liberals = ( + outcomes_counter["I'm Liberal Winner"] + + outcomes_counter["I'm Liberal Loser"] + ) + total_fascists = ( + outcomes_counter["I'm Fascistic Winner"] + + outcomes_counter["I'm Fascistic Loser"] + + total_hitlers + ) + return total_hitlers, total_liberals, total_fascists + + @staticmethod + def validate_player_distribution( + total_hitlers: int, + total_liberals: int, + total_fascists: int, + max_hitlers: int = 1, + max_liberals: int = 6, + max_fascists: int = 4, + ) -> None: + if ( + total_hitlers > max_hitlers + or total_liberals > max_liberals + or total_fascists > max_fascists + ): + raise ValueError( + f"Invalid player distribution: {total_hitlers} Hitlers, {total_liberals} Liberals, {total_fascists} Fascists. Max allowed - Hitlers: {max_hitlers}, Liberals: {max_liberals}, Fascists: {max_fascists}." + ) + + @staticmethod + def check_mutually_exclusive_victory_conditions(outcomes_counter: Counter) -> None: + liberal_win = ( + outcomes_counter["I'm Liberal Winner"] > 0 + or outcomes_counter["I'm Dead Hitler"] > 0 + or outcomes_counter["I'm Fascistic Loser"] > 0 + or outcomes_counter["I'm Hitler Loser"] > 0 + ) + + fascist_win = ( + outcomes_counter["I'm Fascistic Winner"] > 0 + or outcomes_counter["I'm Hitler Winner"] > 0 + or outcomes_counter["I'm Chancellor Hitler"] > 0 + or outcomes_counter["I'm Liberal Loser"] > 0 + ) + if liberal_win and fascist_win: + raise ValueError("Invalid results: Winners from both teams cannot exist.") + @field_validator("results", mode="after") - @classmethod def validate_results( cls, results: tuple[PollResult] ) -> Literal["CH", "DH", "FW", "LW"]: - outcomes = set(outcome.get_answer_as_text() for outcome in results) - if "I'm Chancellor Hitler" in outcomes: + outcomes_counter = cls.extract_player_outcomes(results=results) + cls.remove_spectators(outcomes_counter=outcomes_counter) + total_hitlers, total_liberals, total_fascists = cls.count_player_roles( + outcomes_counter=outcomes_counter + ) + cls.validate_player_distribution( + total_hitlers=total_hitlers, + total_liberals=total_liberals, + total_fascists=total_fascists, + ) + cls.check_mutually_exclusive_victory_conditions( + outcomes_counter=outcomes_counter + ) + + # Determine outcome based on specific conditions + if outcomes_counter["I'm Chancellor Hitler"] > 0: return "CH" - if "I'm Dead Hitler" in outcomes: + if outcomes_counter["I'm Dead Hitler"] > 0: return "DH" if ( - "I'm Liberal Winner" - or "I'm Hitler Looser" - or "I'm Fascistic Looser" in outcomes + outcomes_counter["I'm Liberal Winner"] > 0 + or outcomes_counter["I'm Hitler Loser"] > 0 + or outcomes_counter["I'm Fascistic Loser"] > 0 ): return "LW" if ( - "I'm Fascistic Winner" - or "I'm Hitler Winner" - or "I'm Liberal Looser" in outcomes + outcomes_counter["I'm Fascistic Winner"] > 0 + or outcomes_counter["I'm Hitler Winner"] > 0 + or outcomes_counter["I'm Liberal Loser"] > 0 ): return "FW" - raise ValueError( - f"Invalid results '{results}' for Game. Results must be one of {config.GAME_POLL_OUTCOMES}" - ) + + raise ValueError("Invalid game results: No clear win condition met.") diff --git a/src/data_models/Record.py b/src/data_models/Record.py index 0403619..8144a54 100644 --- a/src/data_models/Record.py +++ b/src/data_models/Record.py @@ -42,9 +42,11 @@ def shorten_role( ) def get_team(self) -> Optional[Literal["Fascist", "Liberal"]]: - if self.role in {"CH", "DH", "FW", "FL", "HL"}: + FASCIST_ROLES = {"CH", "DH", "HL", "HW", "FW", "FL"} + LIBERAL_ROLES = {"LW", "LL"} + if self.role in FASCIST_ROLES: return "Fascist" - elif self.role in {"LW", "LL"}: + elif self.role in LIBERAL_ROLES: return "Liberal" else: - return None + raise ValueError(f"Invalid role '{self.role}' for Record.") diff --git a/src/handlers/save.py b/src/handlers/save.py index 34d1236..199c071 100644 --- a/src/handlers/save.py +++ b/src/handlers/save.py @@ -86,12 +86,10 @@ async def save( ) -> None: msg_with_poll = update.effective_message.reply_to_message if await _pass_checks(msg_with_poll=msg_with_poll, update=update, context=context): - await context.bot.stop_poll(update.effective_chat.id, msg_with_poll.message_id) poll_id = int(msg_with_poll.poll.id) poll_data, poll_results = await asyncio.gather( fetch_poll_data(poll_id), fetch_poll_results(poll_id) ) - records = [ Record( creator_id=poll_data.creator_id, @@ -102,14 +100,13 @@ async def save( ) for results in poll_results ] - game = Game( poll_id=poll_data.message_id, chat_id=poll_data.chat_id, creator_id=poll_data.creator_id, results=poll_results, ) - + await context.bot.stop_poll(update.effective_chat.id, msg_with_poll.message_id) # Execute post-game tasks await asyncio.gather( save_game(game),