diff --git a/espn_api/football/helper.py b/espn_api/football/helper.py new file mode 100644 index 00000000..7f8b1a48 --- /dev/null +++ b/espn_api/football/helper.py @@ -0,0 +1,211 @@ +import random +from typing import Callable, Dict, List, Tuple + + +def build_division_record_dict(team_data_list: List[Dict]) -> Dict: + """Create a DataFrame with each team's divisional record.""" + # Create a dictionary with each team's divisional record + div_outcomes = { + team_data["team_id"]: {"wins": 0, "divisional_games": 0} + for team_data in team_data_list + } + + # Loop through each team's schedule and outcomes and build the dictionary + for team_data in team_data_list: + team = team_data["team"] + for opp, outcome in zip(team_data["schedule"], team_data["outcomes"]): + if team_data["division_id"] == opp.division_id: + if outcome == "W": + div_outcomes[team_data["team_id"]]["wins"] += 1 + if outcome == "T": + div_outcomes[team_data["team_id"]]["wins"] += 0.5 + + div_outcomes[team_data["team_id"]]["divisional_games"] += 1 + + # Calculate the divisional record + div_record = { + team_data["team_id"]: ( + div_outcomes[team_data["team_id"]]["wins"] + / max(div_outcomes[team_data["team_id"]]["divisional_games"], 1) + ) + for team_data in team_data_list + } + + return div_record + + +def build_h2h_dict(team_data_list: List[Dict]) -> Dict: + """Create a dictionary with each team's divisional record.""" + # Create a dictionary with each team's head to head record + h2h_outcomes = { + team_data["team_id"]: { + opp["team_id"]: {"h2h_wins": 0, "h2h_games": 0} + for opp in team_data_list + if opp["team_id"] != team_data["team_id"] + } + for team_data in team_data_list + } + + # Loop through each team's schedule and outcomes and build the dictionary + for team_data in team_data_list: + team = team_data["team"] + for opp, outcome in zip(team_data["schedule"], team_data["outcomes"]): + # Ignore teams that are not part of this tiebreaker + if opp.team_id not in h2h_outcomes[team.team_id].keys(): + continue + + # Add the outcome to the dictionary + if outcome == "W": + h2h_outcomes[team.team_id][opp.team_id]["h2h_wins"] += 1 + if outcome == "T": + h2h_outcomes[team.team_id][opp.team_id]["h2h_wins"] += 0.5 + + h2h_outcomes[team.team_id][opp.team_id]["h2h_games"] += 1 + + # # Calculate the head to head record + # for team_data in team_data_list: + # for opp_data in team_data_list: + # h2h_outcomes[team_data["team_id"]][opp_data["team_id"]]["h2h_record"] = ( + # h2h_outcomes[team_data["team_id"]][opp_data["team_id"]]["h2h_wins"] + # / max( + # h2h_outcomes[team_data["team_id"]][opp_data["team_id"]][ + # "h2h_games" + # ], + # 1, + # ) + # ) + + return h2h_outcomes + + +def sort_by_win_pct(team_data_list: List[Dict]) -> List[Dict]: + """Take a list of team standings data and sort it using the TOTAL_POINTS_SCORED tiebreaker""" + return sorted(team_data_list, key=lambda x: x["win_pct"], reverse=True) + + +def sort_by_points_for(team_data_list: List[Dict]) -> List[Dict]: + """Take a list of team standings data and sort it using the TOTAL_POINTS_SCORED tiebreaker""" + return sorted(team_data_list, key=lambda x: x["points_for"], reverse=True) + + +def sort_by_division_record(team_data_list: List[Dict]) -> List[Dict]: + """Take a list of team standings data and sort it using the 3rd level tiebreaker""" + division_records = build_division_record_dict(team_data_list) + for team_data in team_data_list: + team_data["division_record"] = division_records[team_data["team_id"]] + return sorted(team_data_list, key=lambda x: x["division_record"], reverse=True) + + +def sort_by_points_against(team_data_list: List[Dict]) -> List[Dict]: + """Take a list of team standings data and sort it using the 4th level tiebreaker""" + return sorted(team_data_list, key=lambda x: x["points_against"], reverse=True) + + +def sort_by_coin_flip(team_data_list: List[Dict]) -> List[Dict]: + """Take a list of team standings data and sort it using the 5th level tiebreaker""" + for team_data in team_data_list: + team_data["coin_flip"] = random.random() + return sorted(team_data_list, key=lambda x: x["coin_flip"], reverse=True) + + +def sort_by_head_to_head( + team_data_list: List[Dict], +) -> List[Dict]: + """Take a list of team standings data and sort it using the H2H_RECORD tiebreaker""" + # Create a dictionary with each team's head to head record + h2h_dict = build_h2h_dict(team_data_list) + + # If there is only one team, return the dataframe as-is + if len(team_data_list) < 2: + return team_data_list + + # If there are only two teams, sort descending by H2H wins + elif len(h2h_dict) == 2: + # Filter the H2H DataFrame to only include the teams in question + h2h_dict = build_h2h_dict(team_data_list) + + # Sum the H2H wins against all tied opponents + for team_data in team_data_list: + team_data["h2h_wins"] = sum( + h2h_dict[team_data["team_id"]][opp_id]["h2h_wins"] + for opp_id in h2h_dict[team_data["team_id"]].keys() + ) + return sorted(team_data_list, key=lambda x: x["h2h_wins"], reverse=True) + + # If there are more than two teams... + else: + # Filter the H2H DataFrame to only include the teams in question + h2h_dict = build_h2h_dict(team_data_list) + + # Check if the teams have all played each other an equal number of times + matchup_counts = [ + h2h_dict[team_id][opp_id]["h2h_games"] + for team_id in h2h_dict.keys() + for opp_id in h2h_dict[team_id].keys() + ] + if len(set(matchup_counts)) == 1: + # All teams have played each other an equal number of times + # Sort the teams by total H2H wins against each other + for team_data in team_data_list: + team_data["h2h_wins"] = sum( + h2h_dict[team_data["team_id"]][opp_id]["h2h_wins"] + for opp_id in h2h_dict[team_data["team_id"]].keys() + ) + return sorted(team_data_list, key=lambda x: x["h2h_wins"], reverse=True) + else: + # All teams have not played each other an equal number of times + # This tiebreaker is invalid + for team_data in team_data_list: + team_data["h2h_wins"] = 0 + return team_data_list + + +def sort_team_data_list( + team_data_list: List[Dict], + tiebreaker_hierarchy: List[Tuple[Callable, str]], +) -> List[Dict]: + """This recursive function sorts a list of team standings data by the tiebreaker hierarchy. + It iterates through each tiebreaker, sorting any remaning ties by the next tiebreaker. + + Args: + team_data_list (List[Dict]): List of team data dictionaries + tiebreaker_hierarchy (List[Tuple[Callable, str]]): List of tiebreaker functions and columns to sort by + + Returns: + List[Dict]: Sorted list of team data dictionaries + """ + # If there are no more tiebreakers, return the standings list as-is + if not tiebreaker_hierarchy: + return team_data_list + + # If there is only one team to sort, return the standings list as-is + if len(team_data_list) == 1: + return team_data_list + + # Get the tiebreaker function and column name to group by + tiebreaker_function = tiebreaker_hierarchy[0][0] + tiebreaker_col = tiebreaker_hierarchy[0][1] + + # Apply the tiebreaker function to the standings list + team_data_list = tiebreaker_function(team_data_list) + + # Loop through each remaining unique tiebreaker value to see if ties remain + sorted_team_data_list = [] + for val in sorted( + set([team_data[tiebreaker_col] for team_data in team_data_list]), + reverse=True, + ): + # Filter the standings list to only include the teams with the current tiebreaker value + team_data_subset = [ + team_data + for team_data in team_data_list + if team_data[tiebreaker_col] == val + ] + + # Append the sorted subset to the final sorted standings list + sorted_team_data_list = sorted_team_data_list + sort_team_data_list( + team_data_subset, + tiebreaker_hierarchy[1:], + ) + + return sorted_team_data_list diff --git a/espn_api/football/league.py b/espn_api/football/league.py index a07a8166..96f9deb1 100644 --- a/espn_api/football/league.py +++ b/espn_api/football/league.py @@ -1,7 +1,6 @@ -import datetime -import time import json -from typing import List, Tuple, Union +import random +from typing import Callable, Dict, List, Tuple, Union from ..base_league import BaseLeague from .team import Team @@ -14,6 +13,16 @@ from .settings import Settings from .utils import power_points, two_step_dominance from .constant import POSITION_MAP, ACTIVITY_MAP +from .helper import ( + sort_by_coin_flip, + sort_by_division_record, + sort_by_head_to_head, + sort_by_points_against, + sort_by_points_for, + sort_by_win_pct, + sort_team_data_list, +) + class League(BaseLeague): '''Creates a League instance for Public/Private ESPN league''' @@ -116,6 +125,102 @@ def standings(self) -> List[Team]: standings = sorted(self.teams, key=lambda x: x.final_standing if x.final_standing != 0 else x.standing, reverse=False) return standings + def standings_weekly(self, week: int) -> List[Team]: + """This is the main function to get the standings for a given week. + + It controls the tiebreaker hierarchy and calls the recursive League()._sort_team_data_list function. + First, the division winners must be determined. Then, the rest of the teams are sorted. + + The standard tiebreaker hierarchy is: + 1. Head-to-head record among the tied teams + 2. Total points scored for the season + 3. Division record (if all tied teams are in the same division) + 4. Total points scored against for the season + 5. Coin flip + + Args: + week (int): Week to get the standings for + + Returns: + List[Dict]: Sorted standings list + """ + # Get standings data for each team up to the given week + list_of_team_data = [] + for team in self.teams: + team_data = { + "team": team, + "team_id": team.team_id, + "division_id": team.division_id, + "wins": sum([1 for outcome in team.outcomes[:week] if outcome == "W"]), + "ties": sum([1 for outcome in team.outcomes[:week] if outcome == "T"]), + "losses": sum( + [1 for outcome in team.outcomes[:week] if outcome == "L"] + ), + "points_for": sum(team.scores[:week]), + "points_against": sum( + [team.schedule[w].scores[w] for w in range(week)] + ), + "schedule": team.schedule[:week], + "outcomes": team.outcomes[:week], + } + team_data["win_pct"] = (team_data["wins"] + team_data["ties"] / 2) / sum( + [1 for outcome in team.outcomes[:week] if outcome in ["W", "T", "L"]] + ) + list_of_team_data.append(team_data) + + # Identify the proper tiebreaker hierarchy + if self.settings.playoff_seed_tie_rule == "TOTAL_POINTS_SCORED": + tiebreaker_hierarchy = [ + (sort_by_win_pct, "win_pct"), + (sort_by_points_for, "points_for"), + (sort_by_head_to_head, "h2h_wins"), + (sort_by_division_record, "division_record"), + (sort_by_points_against, "points_against"), + (sort_by_coin_flip, "coin_flip"), + ] + elif self.settings.playoff_seed_tie_rule == "H2H_RECORD": + tiebreaker_hierarchy = [ + (sort_by_win_pct, "win_pct"), + (sort_by_head_to_head, "h2h_wins"), + (sort_by_points_for, "points_for"), + (sort_by_division_record, "division_record"), + (sort_by_points_against, "points_against"), + (sort_by_coin_flip, "coin_flip"), + ] + else: + raise ValueError( + "Unkown tiebreaker_method: Must be either 'TOTAL_POINTS_SCORED' or 'H2H_RECORD'" + ) + + # First assign the division winners + division_winners = [] + for division_id in list(self.settings.division_map.keys()): + division_teams = [ + team_data + for team_data in list_of_team_data + if team_data["division_id"] == division_id + ] + division_winner = sort_team_data_list(division_teams, tiebreaker_hierarchy)[ + 0 + ] + division_winners.append(division_winner) + list_of_team_data.remove(division_winner) + + # Sort the division winners + sorted_division_winners = sort_team_data_list( + division_winners, tiebreaker_hierarchy + ) + + # Then sort the rest of the teams + sorted_rest_of_field = sort_team_data_list( + list_of_team_data, tiebreaker_hierarchy + ) + + # Combine all teams + sorted_team_data = sorted_division_winners + sorted_rest_of_field + + return [team_data["team"] for team_data in sorted_team_data] + def top_scorer(self) -> Team: most_pf = sorted(self.teams, key=lambda x: x.points_for, reverse=True) return most_pf[0] @@ -308,4 +413,3 @@ def message_board(self, msg_types: List[str] = None): for msg in msgs: messages.append(msg) return messages - diff --git a/tests/football/unit/test_league.py b/tests/football/unit/test_league.py index f3563bf8..e4b589d7 100644 --- a/tests/football/unit/test_league.py +++ b/tests/football/unit/test_league.py @@ -1,9 +1,20 @@ from unittest import mock, TestCase from espn_api.football import League, BoxPlayer +from espn_api.football.helper import ( + build_division_record_dict, + build_h2h_dict, + sort_by_coin_flip, + sort_by_division_record, + sort_by_head_to_head, + sort_by_points_against, + sort_by_points_for, + sort_by_win_pct, +) import requests_mock import json import io + class LeagueTest(TestCase): def setUp(self): self.league_id = 123 @@ -81,7 +92,224 @@ def test_league_standings(self, m): standings = league.standings() self.assertEqual(standings[0].final_standing, 1) - @requests_mock.Mocker() + @requests_mock.Mocker() + def test_standings_weekly(self, m): + self.mock_setUp(m) + + league = League(self.league_id, self.season) + + # Test various weeks + week1_standings = [team.team_id for team in league.standings_weekly(1)] + self.assertEqual(week1_standings, [3, 11, 2, 10, 7, 8, 4, 5, 9, 1]) + + week4_standings = [team.team_id for team in league.standings_weekly(4)] + self.assertEqual(week4_standings, [2, 7, 11, 4, 3, 9, 1, 8, 5, 10]) + + # # Does not work with the playoffs + # week13_standings = [team.team_id for team in league.standings_weekly(13)] + # final_standings = [team.team_id for team in league.standings()] + # self.assertEqual(week13_standings, final_standings) + + # Test invalid playoff seeding rule + with self.assertRaises(Exception): + league.settings.playoff_seed_tie_rule = "NOT_A_REAL_RULE" + league.standings(week=1) + + def get_list_of_team_data(self, league: League, week: int): + list_of_team_data = [] + for team in league.teams: + team_data = { + "team": team, + "team_id": team.team_id, + "division_id": team.division_id, + "wins": sum([1 for outcome in team.outcomes[:week] if outcome == "W"]), + "ties": sum([1 for outcome in team.outcomes[:week] if outcome == "T"]), + "losses": sum( + [1 for outcome in team.outcomes[:week] if outcome == "L"] + ), + "points_for": sum(team.scores[:week]), + "points_against": sum( + [team.schedule[w].scores[w] for w in range(week)] + ), + "schedule": team.schedule[:week], + "outcomes": team.outcomes[:week], + } + team_data["win_pct"] = (team_data["wins"] + team_data["ties"] / 2) / sum( + [1 for outcome in team.outcomes[:week] if outcome in ["W", "T", "L"]] + ) + list_of_team_data.append(team_data) + return list_of_team_data + + @requests_mock.Mocker() + def test_build_h2h_dict(self, m): + self.mock_setUp(m) + + league = League(self.league_id, self.season) + + # Test build_h2h_dict and build_division_record_dict + # Week 1 + ## Get data for teams 1 and 7 + week1_teams_data = self.get_list_of_team_data(league, 1) + list_of_team_data = [ + team for team in week1_teams_data if team["team_id"] in (1, 7) + ] + h2h_dict = build_h2h_dict(list_of_team_data) + + self.assertEqual(h2h_dict[1][7]["h2h_wins"], 0) # Team 1 is 0/1 vs Team 7 + self.assertEqual(h2h_dict[7][1]["h2h_wins"], 1) # Team 7 is 1/1 vs Team 1 + self.assertEqual(h2h_dict[1][7]["h2h_games"], 1) # Team 1 is 0/1 vs Team 7 + self.assertEqual(h2h_dict[7][1]["h2h_games"], 1) # Team 7 is 1/1 vs Team 1 + + ## Test 3 teams head-to-head + list_of_team_data = [ + team for team in week1_teams_data if team["team_id"] in (1, 2, 3) + ] + h2h_dict = build_h2h_dict(list_of_team_data) + self.assertEqual(h2h_dict[1][2]["h2h_games"], 0) # Teams have not played + self.assertEqual(h2h_dict[1][3]["h2h_games"], 0) # Teams have not played + self.assertEqual(h2h_dict[2][3]["h2h_games"], 0) # Teams have not played + + # Week 10 + ## Get data for teams 1 and 7 + week10_teams_data = self.get_list_of_team_data(league, 10) + list_of_team_data = [ + team for team in week10_teams_data if team["team_id"] in (1, 7) + ] + h2h_dict = build_h2h_dict(list_of_team_data) + + self.assertEqual(h2h_dict[1][7]["h2h_wins"], 1) # Team 1 is 1/2 vs Team 7 + self.assertEqual(h2h_dict[7][1]["h2h_wins"], 1) # Team 7 is 1/2 vs Team 1 + self.assertEqual(h2h_dict[1][7]["h2h_games"], 2) # Team 1 is 0/1 vs Team 7 + self.assertEqual(h2h_dict[7][1]["h2h_games"], 2) # Team 7 is 1/1 vs Team 1 + + # Test 3 teams head-to-head + list_of_team_data = [ + team for team in week10_teams_data if team["team_id"] in (1, 2, 3) + ] + h2h_dict = build_h2h_dict(list_of_team_data) + self.assertEqual(h2h_dict[1][2]["h2h_games"], 1) # Teams have played 1x + self.assertEqual(h2h_dict[1][3]["h2h_games"], 1) # Teams have played 1x + self.assertEqual(h2h_dict[2][3]["h2h_games"], 1) # Teams have played 1x + + @requests_mock.Mocker() + def test_build_division_records_dict(self, m): + self.mock_setUp(m) + + league = League(self.league_id, self.season) + + # Test build_h2h_dict and build_division_record_dict + # Week 1 - get data for teams 1 and 7 + week1_teams_data = self.get_list_of_team_data(league, 1) + list_of_team_data = [ + team for team in week1_teams_data if team["team_id"] in (1, 7) + ] + division_record_dict = build_division_record_dict(list_of_team_data) + + self.assertEqual(division_record_dict[1], 0) + self.assertEqual( + division_record_dict[7], + [ + team_data["win_pct"] + for team_data in list_of_team_data + if team_data["team_id"] == 7 + ][0], + ) + + # Week 10 - get data for teams 1 and 7 + week10_teams_data = self.get_list_of_team_data(league, 10) + list_of_team_data = [ + team for team in week10_teams_data if team["team_id"] in (1, 7) + ] + division_record_dict = build_division_record_dict(week10_teams_data) + + self.assertEqual(division_record_dict[1], 0.6) + self.assertEqual( + division_record_dict[7], + [ + team_data["win_pct"] + for team_data in list_of_team_data + if team_data["team_id"] == 7 + ][0], + ) + + @requests_mock.Mocker() + def test_sort_functions(self, m): + self.mock_setUp(m) + + league = League(self.league_id, self.season) + + week1_teams_data = self.get_list_of_team_data(league, 1) + week10_teams_data = self.get_list_of_team_data(league, 10) + division_record_dict = build_division_record_dict(week10_teams_data) + + # Assert that sort_by_win_pct is correct + sorted_list_of_team_data = sort_by_win_pct(week10_teams_data) + for i in range(len(sorted_list_of_team_data) - 1): + self.assertGreaterEqual( + sorted_list_of_team_data[i]["win_pct"], + sorted_list_of_team_data[i + 1]["win_pct"], + ) + + # Assert that sort_by_points_for is correct + sorted_list_of_team_data = sort_by_points_for(week10_teams_data) + for i in range(len(sorted_list_of_team_data) - 1): + self.assertGreaterEqual( + sorted_list_of_team_data[i]["points_for"], + sorted_list_of_team_data[i + 1]["points_for"], + ) + + # Assert that sort_by_head_to_head is correct - 1 team + sorted_list_of_team_data = sort_by_head_to_head(week10_teams_data[:1].copy()) + self.assertEqual(sorted_list_of_team_data == week10_teams_data[:1], True) + + # Assert that sort_by_head_to_head is correct - 2 teams + sorted_list_of_team_data = sort_by_head_to_head( + [team for team in week10_teams_data if team["team_id"] in (1, 2)] + ) + self.assertEqual(sorted_list_of_team_data[0]["team_id"], 1) + + # Assert that sort_by_head_to_head is correct - 3 teams, valid + sorted_list_of_team_data = sort_by_head_to_head( + [team for team in week10_teams_data if team["team_id"] in (1, 2, 3)] + ) + self.assertEqual(sorted_list_of_team_data[0]["team_id"], 1) + self.assertEqual(sorted_list_of_team_data[1]["team_id"], 3) + self.assertEqual(sorted_list_of_team_data[2]["team_id"], 2) + + # Assert that sort_by_head_to_head is correct - 3 teams, invalid + sorted_list_of_team_data = sort_by_head_to_head( + [team for team in week1_teams_data if team["team_id"] in (1, 2, 3)] + ) + self.assertEqual(sorted_list_of_team_data[0]["h2h_wins"], 0) + self.assertEqual(sorted_list_of_team_data[1]["h2h_wins"], 0) + self.assertEqual(sorted_list_of_team_data[2]["h2h_wins"], 0) + + # Assert that sort_by_division_record is correct + sorted_list_of_team_data = sort_by_division_record(week10_teams_data) + for i in range(len(sorted_list_of_team_data) - 1): + self.assertGreaterEqual( + division_record_dict[sorted_list_of_team_data[i]["team_id"]], + division_record_dict[sorted_list_of_team_data[i + 1]["team_id"]], + ) + + # Assert that sort_by_points_against is correct + sorted_list_of_team_data = sort_by_points_against(week10_teams_data) + for i in range(len(sorted_list_of_team_data) - 1): + self.assertGreaterEqual( + sorted_list_of_team_data[i]["points_against"], + sorted_list_of_team_data[i + 1]["points_against"], + ) + + # Assert that sort_by_coin_flip is not deterministic + standings_list = [] + for i in range(5): + sorted_list_of_team_data = sort_by_coin_flip(week10_teams_data) + standings_list.append( + (team["team_id"] for team in sorted_list_of_team_data) + ) + self.assertGreater(len(set(standings_list)), 1) + + @requests_mock.Mocker() def test_top_scorer(self, m): self.mock_setUp(m)