-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstvcontest.py
413 lines (330 loc) · 17.3 KB
/
stvcontest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
import csv
import itertools
import math
import random
from contest import Contest
from entry import Entry
from voter import Voter
class STVContest(Contest):
"""
An STVContest contains Voters who have assigned rankings (numbers) to various Entries.
It also has a desired number of winners.
"""
# how wide in characters the vote bars printed by _print_instant_runoff_chart should be
# if the bar represents 100%
NUM_CHARS_IN_FULL_VOTE_BAR = 100
# title of the spreadsheet column containing Voters who didn't cast any valid votes;
# also used when printing instant runoff results to console
INVALID_VOTER_COLUMN_NAME = "voters who didn't cast any valid votes"
# title of the spreadsheet column containing Voters whose preferred entries can't receive votes;
# also used when printing instant runoff results to console
ELIMINATED_VOTER_COLUMN_NAME = "voters who stopped as all their remaining picks left the race"
def _write_current_round_to_spreadsheet(self, output_file_name_prefix):
"""
Write out the contest's current status to a spreadsheet at the path
{output_file_name_prefix}-round{self._round_number}.csv
The first column contains the names of those Voters who did not provide any valid votes.
The second column contains the names of those Voters who provided valid votes, but only for
entries that were eliminated already.
Each of the remaining columns represents an Entry that's still in the running.
Each cell in those columns represents a Voter who voted for that column's Entry.
Those cells contain the Voter name, the rank they gave the Entry, and the round during
which they voted for the Entry
"""
output_file_name = f"{output_file_name_prefix}-round{self._round_number}.csv"
if self.verbose:
print(f"Writing round {self._round_number} vote data to {output_file_name}...",
end="", flush=True)
with open(output_file_name, "w", newline="") as spreadsheet:
writer = csv.writer(spreadsheet, delimiter=",")
header = [
STVContest.INVALID_VOTER_COLUMN_NAME,
STVContest.ELIMINATED_VOTER_COLUMN_NAME,
*[entry.name for entry in self.entries]
]
writer.writerow(header)
# construct a list for each entry column
entry_columns = []
for entry in self.entries:
# each column is filled with data on the user that voted for that Entry
# and which rank the user gave that Entry
entry_column = []
for voter in entry.instant_runoff_voters:
voter_info_string = (
f"{voter.name}: round {voter.round_when_last_moved}, "
f"rank {voter.get_ranking_of_entry(entry)}"
)
entry_column.append(voter_info_string)
entry_columns.append(entry_column)
# rearrange the body of the spreadsheet into a list of rows (so each list passed in as
# an argument becomes a column in the final body)
body = itertools.zip_longest(
[voter.name for voter in self._voters_with_no_valid_votes],
[
f"{voter.name}: round {voter.round_when_last_moved}"
for voter in self._voters_with_no_remaining_valid_votes
],
*entry_columns,
fillvalue=""
)
for row in body:
writer.writerow(row)
if self.verbose:
print(" done.")
def _print_chart_to_console(self):
"""
Output a chart of the current votes to the console.
"""
longest_entry_name_length = max(len(entry.name) for entry in self.entries)
print()
for entry in self.entries:
# once entry has won or lost, we never actually removed its Voters (there's no need to)
# so len(entry.instant_runoff_voters) won't actually reflect entry's true number of Voters
if entry.has_won:
status_indicator = "WON"
elif entry.has_lost:
status_indicator = "LOST"
else:
status_indicator = ""
vote_fraction = len(entry.instant_runoff_voters) / self._num_valid_voters
# bar in chart showing vote count
num_chars_in_vote_bar = round(STVContest.NUM_CHARS_IN_FULL_VOTE_BAR * vote_fraction)
vote_bar = "\u25A0" * num_chars_in_vote_bar
# text showing percentage of voters voting for this entry
percentage_text = f"{round(100 * vote_fraction, 1)}%"
# text showing fraction of voters voting for this entry
fraction_text = f"{len(entry.instant_runoff_voters)}/{self._num_valid_voters}"
# text showing how much the count changed this round
change_text_sign = "+" if entry.num_voters_gained_in_current_instant_runoff_round >= 0 else ""
change_text = f"{change_text_sign}{entry.num_voters_gained_in_current_instant_runoff_round} this round"
entry_output = status_indicator.ljust(6)
entry_output += entry.name.ljust(longest_entry_name_length + 2)
entry_output += f"{vote_bar} {percentage_text} ({fraction_text}; {change_text})"
print(entry_output)
print()
# print info about unused voters
print(f"{STVContest.INVALID_VOTER_COLUMN_NAME}: {len(self._voters_with_no_valid_votes)}")
print(
f"{STVContest.ELIMINATED_VOTER_COLUMN_NAME}: "
f"{len(self._voters_with_no_remaining_valid_votes)}"
f" (+{self._num_voters_exhausted_in_current_round} this round)"
)
def _allocate_voters(self, voters_to_allocate):
"""
Allocate the given Voters to their first-choice entry.
If they don't have a first choice, add them to self._voters_with_no_valid_votes.
"""
for voter in voters_to_allocate:
# prepare voter to iterate through their valid votes
iter(voter)
favorite_entry = next(voter)
if favorite_entry is None:
# the voter cast no valid votes
self._voters_with_no_valid_votes.append(voter)
else:
favorite_entry.instant_runoff_voters.append(voter)
favorite_entry.num_voters_gained_in_current_instant_runoff_round += 1
voter.round_when_last_moved = self._round_number
def _reallocate_voters(self, current_entry, voters_to_reallocate):
"""
Reallocate the given Voters to their next-choice entry.
If they don't have a next choice, add them to self._voters_with_no_remaining_valid_votes.
"""
for voter in voters_to_reallocate:
next_favorite_entry = next(voter)
if next_favorite_entry is None:
# the voter cast no valid votes
self._voters_with_no_remaining_valid_votes.append(voter)
self._num_voters_exhausted_in_current_round += 1
else:
next_favorite_entry.instant_runoff_voters.append(voter)
next_favorite_entry.num_voters_gained_in_current_instant_runoff_round += 1
voter.round_when_last_moved = self._round_number
# for bookkeeping purposes, remove all the Voters from the old Entry
# NOTE not the most efficient but doesn't really need to be
current_entry.instant_runoff_voters = [
voter for voter in current_entry.instant_runoff_voters
if voter not in voters_to_reallocate
]
current_entry.num_voters_gained_in_current_instant_runoff_round -= len(voters_to_reallocate)
def _run_first_round(self):
"""
Run the first round of the contest. In this round, everybody votes for their first choice.
"""
if self.verbose:
self._print_round_name()
# entries have not yet gained any votes this round
for entry in self.entries:
entry.num_voters_gained_in_current_instant_runoff_round = 0
self._num_voters_exhausted_in_current_round = 0
self._allocate_voters(self.voters)
# all Entries are still in the race; even if an Entry got no votes, we say it is still in,
# and we'll just remove it in a later round
for entry in self.entries:
self._entries_still_in_race.append(entry)
if self.verbose:
print("* all voters voted for their top choice")
def _run_winner_declaration_round(self, undeclared_winners):
"""
Run a "winner declaration" round of the contest. In this round, all Entries tied for the
most votes that haven't won yet are marked as winners.
"""
if self.verbose:
self._print_round_name()
# entries have not yet gained any votes this round
for entry in self.entries:
entry.num_voters_gained_in_current_instant_runoff_round = 0
self._num_voters_exhausted_in_current_round = 0
for winner in undeclared_winners:
self._entries_still_in_race.remove(winner)
self._winners.append(winner)
winner.has_won = True
if self.verbose:
print(
f"* {winner.name} won"
f" (earned {len(winner.instant_runoff_voters)} votes,"
f" needed {self._min_num_voters_to_win})"
)
def _run_winner_reallocation_round(self, declared_winners_still_with_surplus):
"""
Run a "winner reallocation" round of the contest. In this round, one of the winning Entries
that hasn't reallocated its Voters is selected at random and reallocates its Voters.
"""
if self.verbose:
self._print_round_name()
# entries have not yet gained any votes this round
for entry in self.entries:
entry.num_voters_gained_in_current_instant_runoff_round = 0
self._num_voters_exhausted_in_current_round = 0
winner = random.choice(declared_winners_still_with_surplus)
num_surplus_voters = len(winner.instant_runoff_voters) - self._min_num_voters_to_win
surplus_voters = random.sample(winner.instant_runoff_voters, k=num_surplus_voters)
self._reallocate_voters(winner, surplus_voters)
if self.verbose:
print(
f"* only {len(self._winners)}/{self._num_winners} winners have been found,"
f" so winner {winner.name} reallocated its {num_surplus_voters} surplus votes"
)
def _run_elimination_round(self):
"""
Run an elimination round of the STVContest. In this round, a least popular Entry is removed
from the race, and its Voters vote for their next choice.
"""
if self.verbose:
self._print_round_name()
# entries have not yet gained any votes this round
for entry in self.entries:
entry.num_voters_gained_in_current_instant_runoff_round = 0
self._num_voters_exhausted_in_current_round = 0
# pick a loser at random out of all the bottom vote-getters
num_voters_for_bottom_entry_still_in_race = min(
len(entry.instant_runoff_voters) for entry in self._entries_still_in_race
)
bottom_entries_still_in_race = [
entry for entry in self._entries_still_in_race
if len(entry.instant_runoff_voters) == num_voters_for_bottom_entry_still_in_race
]
loser = random.choice(bottom_entries_still_in_race)
if self.verbose:
print(
f"* {loser.name} was eliminated"
f" as it had the fewest votes ({num_voters_for_bottom_entry_still_in_race})"
)
self._entries_still_in_race.remove(loser)
loser.has_lost = True
self._reallocate_voters(loser, loser.instant_runoff_voters)
if self.verbose:
print(
f"* {loser.name} reallocated its"
f" {num_voters_for_bottom_entry_still_in_race} votes"
)
def get_winners(self, num_winners, output_file_name_prefix):
"""
Run the contest using multi-winner instant-runoff voting using the Droop quota and
random surplus allocation.
For every round of voting, output a spreadsheet whose columns are entries, with the rows
populated by users who voted for those entries.
The contest terminates once self._num_winners winners have won.
Return the Entry objects representing the winners.
"""
if num_winners >= len(self.entries):
raise ValueError(
"A STVContest must have fewer winners then entries."
f" This STVContest seeks to produce {num_winners} winners"
f" but has only {len(self.entries)} entries."
)
self._num_winners = num_winners
# users who cast no valid votes
self._voters_with_no_valid_votes = []
# users who cast valid votes, but only for Entries that have been eliminated already
self._voters_with_no_remaining_valid_votes = []
# the amount of voters who have had all of their entries eliminated during this round
self._num_voters_exhausted_in_current_round = 0
self._entries_still_in_race = []
self._winners = []
self._round_number = 1
# round 1: everyone votes for their top pick
self._run_first_round()
# Use the "Droop Quota" as the minimum vote threshold.
# Say there are v Voters and w winners. An Entry wins once it has so many votes that it's
# impossible for w other Entries to perform as well as the winner.
# If an Entry gets exactly v/(w+1) votes, then it is still possible for w other Entries to
# also get exactly v/(w+1) votes, in which case there would be a (w+1)-way tie.
# If an Entry gets more than v/(w+1) votes, so at least (v/(w+1)) + 1 votes, then there's
# no way for w other Entries to perform equally well, as that would imply at least
# (w+1) * ((v/(w+1)) + 1) = v + w + 1 votes were cast, and that's more than v votes.
self._num_valid_voters = len(self.voters) - len(self._voters_with_no_valid_votes)
self._min_num_voters_to_win = math.floor(self._num_valid_voters / (self._num_winners + 1)) + 1
self._write_current_round_to_spreadsheet(output_file_name_prefix)
if self.verbose:
self._print_chart_to_console()
while len(self._winners) < self._num_winners:
self._round_number += 1
undeclared_winners = [
entry for entry in self._entries_still_in_race
if len(entry.instant_runoff_voters) >= self._min_num_voters_to_win and not entry.has_won
]
declared_winners_still_with_surplus = [
winner for winner in self._winners
if len(winner.instant_runoff_voters) > self._min_num_voters_to_win
]
if undeclared_winners:
# declare all new winners
self._run_winner_declaration_round(undeclared_winners)
elif declared_winners_still_with_surplus:
# There are no new winners, but some existing winners' Voters have not been
# redistributed yet. Move one of those winner's surplus votes.
# Conducting the reallocations after marking all winners reduces the amount of
# wasted votes. If, say, entry 1 and entry 2 both won in the same round, then the
# surplus votes of entry 1 will not move to entry 2 (since entry 2 was removed from
# the race already).
self._run_winner_reallocation_round(declared_winners_still_with_surplus)
else:
# there are no winners in the race, and all winners votes have been reallocated;
# remove one of the last-place entries from the race
self._run_elimination_round()
self._write_current_round_to_spreadsheet(output_file_name_prefix)
if self.verbose:
self._print_chart_to_console()
# special case: if we still have to crown, say, 10 winners and there are only
# 10 Entries left, then those 10 Entries are the winners
# (situations like this can happen if lots of Voters have exhausted their ballots)
if len(self._entries_still_in_race) + len(self._winners) == self._num_winners:
if self.verbose:
print(
f"* because there are {len(self._entries_still_in_race)} entries left"
f" in the race and there are still {self._num_winners - len(self._winners)}"
" winners left to find,\nthe remaining entries are crowned as winners"
)
for entry_still_in_race in self._entries_still_in_race:
self._winners.append(entry_still_in_race)
entry_still_in_race.has_won = True
break
if self.verbose:
print()
print(
f"* all {len(self._winners)}/{self._num_winners} winners have been found,"
f" so the contest is over"
)
print(f"WINNERS: {[winner.name for winner in self._winners]}")
return self._winners