-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathenv.py
297 lines (248 loc) · 11.9 KB
/
env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
from typing import Tuple, Dict, Optional, Iterable
import numpy as np
import gym
from gym import spaces
from gym.error import DependencyNotInstalled
import pygame
from pygame import gfxdraw
import random
"""
Description:
The environment consists of a grid of (size x size) positions. The agent
starts at (row=0, col=0). The goal is always at (row=size-1, col=size-1).
Actions:
Num Action
0 Move up
1 Move right
2 Move down
3 Move left
Reward:
Agent will receive reward of -1.0 until it reaches the goal.
Episode termination:
The episode terminates when the agent reaches the goal state.
"""
class Maze(gym.Env):
"""
@ param exploring_starts: A boolean indicating whether the agent should restart at a random location.
@ param shaped_rewards: A boolean indicating whether the environment should shape the rewards.
@ param size: An integer representing the size of the maze. It will be of shape (size x size).
@ effects: Initializes the maze environment
@ return: None
"""
def __init__(self, exploring_starts: bool = False, shaped_rewards: bool = False, size: int = 5) -> None:
super().__init__()
self.exploring_starts = exploring_starts
self.shaped_rewards = shaped_rewards
self.state = (size - 1, size - 1)
self.goal = (size - 1, size - 1)
self.maze = self._create_maze(size=size)
self.distances = self._compute_distances(self.goal, self.maze)
self.action_space = spaces.Discrete(n=4)
self.action_space.action_meanings = {0: 'UP', 1: 'RIGHT', 2: 'DOWN', 3: "LEFT"}
self.observation_space = spaces.MultiDiscrete([size, size])
self.traps = []
self.screen = None
self.agent_transform = None
"""
@ param action: integer representing the action to take
@ effects: takes specified action in the environment
@ return: a tuple containing the updated state, reward, completion status, and additional information
"""
def step(self, action: int) -> Tuple[Tuple[int, int], float, bool, Dict]:
reward = self.compute_reward(self.state, action)
self.state = self._get_next_state(self.state, action)
done = self.state == self.goal
info = {}
return self.state, reward, done, info
"""
@ param: None
@ effects: resets environment
@ return: a tuple containing the initial position of the agent
"""
def reset(self) -> Tuple[int, int]:
if self.exploring_starts:
while self.state == self.goal:
self.state = tuple(self.observation_space.sample())
else:
self.state = (0, 0)
return self.state
"""
@ param mode: string representing the mode to render the environment in
@ effects: renders the environment in rgb arrays via numpy
@ return: a numpy array or None
"""
def render(self, mode: str = 'human') -> Optional[np.ndarray]:
screen_size = 600
scale = screen_size / 5
if self.screen is None:
pygame.init()
self.screen = pygame.Surface((screen_size, screen_size))
surf = pygame.Surface((screen_size, screen_size))
surf.fill((22, 36, 71))
for row in range(5):
for col in range(5):
state = (row, col)
for next_state in [(row + 1, col), (row - 1, col), (row, col + 1), (row, col - 1)]:
if next_state not in self.maze[state]:
# Add the geometry of the edges and walls (i.e. the boundaries between
# adjacent squares that are not connected).
row_diff, col_diff = np.subtract(next_state, state)
left = (col + (col_diff > 0)) * scale - 2 * (col_diff != 0)
right = ((col + 1) - (col_diff < 0)) * scale + 2 * (col_diff != 0)
top = (5 - (row + (row_diff > 0))) * scale - 2 * (row_diff != 0)
bottom = (5 - ((row + 1) - (row_diff < 0))) * scale + 2 * (row_diff != 0)
gfxdraw.filled_polygon(surf, [(left, bottom), (left, top), (right, top), (right, bottom)], (255, 255, 255))
# Add the geometry of the goal square to the viewer.
left, right, top, bottom = scale * 4 + 10, scale * 5 - 10, scale - 10, 10
gfxdraw.filled_polygon(surf, [(left, bottom), (left, top), (right, top), (right, bottom)], (40, 199, 172))
# Add the geometry of the agent to the viewer.
agent_row = int(screen_size - scale * (self.state[0] + .5))
agent_col = int(scale * (self.state[1] + .5))
gfxdraw.filled_circle(surf, agent_col, agent_row, int(scale * .6 / 2), (228, 63, 90))
surf = pygame.transform.flip(surf, False, True)
self.screen.blit(surf, (0, 0))
return np.transpose(np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2))
"""
@ param: None
@ effects: closes the environment
@ return: None
"""
def close(self) -> None:
if self.screen is not None:
pygame.display.quit()
pygame.quit()
self.screen = None
"""
@ param state: the state of the agent prior to taking the action
@ param action: the action taken by the agent
@ effects: computes the reward attained by taking action 'a' at state 's'
@ return: a float representing the reward signal received by the agent
"""
def compute_reward(self, state: Tuple[int, int], action: int) -> float:
next_state = self._get_next_state(state, action)
if self.shaped_rewards:
return - (self.distances[next_state] / self.distances.max())
return - float(state != self.goal)
"""
@ param state: state of the agent prior to taking the action
@ param action: the action to simulate the step with
@ effects: simulates taking an action in the environment
@ return: a tuple containing the next state, reward, completion status, and additional information
"""
def simulate_step(self, state: Tuple[int, int], action: int):
reward = self.compute_reward(state, action)
next_state = self._get_next_state(state, action)
done = next_state == self.goal
info = {}
return next_state, reward, done, info
"""
@ param state: state of the agent prior to taking the action
@ param action: move performed by the agent
@ effects: gets the next state after the agent performs action 'a' in state 's'
@ return: state instance representing the new state
"""
def _get_next_state(self, state: Tuple[int, int], action: int) -> Tuple[int, int]:
if action == 0:
next_state = (state[0] - 1, state[1])
elif action == 1:
next_state = (state[0], state[1] + 1)
elif action == 2:
next_state = (state[0] + 1, state[1])
elif action == 3:
next_state = (state[0], state[1] - 1)
else:
raise ValueError("Action value not supported:", action)
if next_state in self.maze[state]:
return next_state
return state
"""
@ param size: number of elements of each side in the square grid
@ effects: creates representation of the maze as a dictionary; keys: states available to agent & values: lists of adjacent states
@ return: adjacency list dictionary.
"""
@staticmethod
def _create_maze(size: int) -> Dict[Tuple[int, int], Iterable[Tuple[int, int]]]:
maze = {(row, col): [(row - 1, col), (row + 1, col), (row, col - 1), (row, col + 1)]
for row in range(size) for col in range(size)}
left_edges = [[(row, 0), (row, -1)] for row in range(size)]
right_edges = [[(row, size - 1), (row, size)] for row in range(size)]
upper_edges = [[(0, col), (-1, col)] for col in range(size)]
lower_edges = [[(size - 1, col), (size, col)] for col in range(size)]
walls = [
[(1, 0), (1, 1)], [(2, 0), (2, 1)], [(3, 0), (3, 1)],
[(1, 1), (1, 2)], [(2, 1), (2, 2)], [(3, 1), (3, 2)],
[(3, 1), (4, 1)], [(0, 2), (1, 2)], [(1, 2), (1, 3)],
[(2, 2), (3, 2)], [(2, 3), (3, 3)], [(2, 4), (3, 4)],
[(4, 2), (4, 3)], [(1, 3), (1, 4)], [(2, 3), (2, 4)],
]
obstacles = upper_edges + lower_edges + left_edges + right_edges + walls
for src, dst in obstacles:
maze[src].remove(dst)
if dst in maze:
maze[dst].remove(src)
return maze
"""
@ param goal: tuple representing the location of the goal in a two-dimensional grid
@ param maze: dictionary holding the adjacency lists of all locations in the two-dimensional grid.
@ effects: computes the distance to the goal from all other positions in the maze using Dijkstra's algorithm.
@ return: A (H x W) numpy array holding the minimum number of moves for each position
to reach the goal.
"""
@staticmethod
def _compute_distances(goal: Tuple[int, int], maze: Dict[Tuple[int, int], Iterable[Tuple[int, int]]]) -> np.ndarray:
distances = np.full((5, 5), np.inf)
visited = set()
distances[goal] = 0.
while visited != set(maze):
sorted_dst = [(v // 5, v % 5) for v in distances.argsort(axis=None)]
closest = next(x for x in sorted_dst if x not in visited)
visited.add(closest)
for neighbour in maze[closest]:
distances[neighbour] = min(distances[neighbour], distances[closest] + 1)
return distances
def add_random_traps(self, num_traps: int):
"""
Randomly add traps to the maze, ensuring the maze remains solvable.
@ param num_traps: The number of traps to add.
@ effects: Randomly places the specified number of traps within the maze.
@ return: None
"""
available_positions = [(i, j) for i in range(self.size) for j in range(self.size) if (i, j) not in [self.state, self.goal]]
for _ in range(num_traps):
random.shuffle(available_positions) # Randomly shuffle positions
for position in available_positions:
# Make a deep copy of the maze to test with the trap
temp_maze = {k: list(v) for k, v in self.maze.items()}
# Remove the chosen trap position from its neighbors' lists (simulate adding a trap)
for neighbour in temp_maze[position]:
temp_maze[neighbour].remove(position)
# Check if the maze with the trap added is still solvable
if is_solvable(temp_maze, self.state, self.goal):
self.maze = temp_maze # Update the main maze to include the trap
self.traps.append(position) # Record the trap
available_positions.remove(position) # Remove position from available positions
break
def is_solvable(maze: Dict[Tuple[int, int], Iterable[Tuple[int, int]]], start: Tuple[int, int], goal: Tuple[int, int]) -> bool:
"""
Check if the maze is solvable using Depth First Search.
@ param maze: The current maze representation.
@ param start: The starting point of the maze.
@ param goal: The goal point of the maze.
@ return: True if solvable, False otherwise.
"""
stack = [start]
visited = set()
while stack:
current = stack.pop()
if current == goal:
return True
if current in visited:
continue
visited.add(current)
for neighbour in maze[current]:
if neighbour not in visited:
stack.append(neighbour)
return False
env = Maze()
env.reset()
env.render(mode='rgb_array')