-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdistributed_agent.py
659 lines (547 loc) · 26.2 KB
/
distributed_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
import airsim
from rl_model import RlModel
import time
import numpy as np
import json
import os
import sys
import requests
import copy
import datetime
import msgpackrpc
import math
import errno
from airsim_client import AirSimClientBase, AirSimImageType, ImageRequest, Vector3r, Pose
# A class that represents the agent that will drive the vehicle, train the model, and send the gradient updates to the trainer.
class DistributedAgent:
def __init__(self, parameters):
required_parameters = ['data_dir', 'max_epoch_runtime_sec', 'replay_memory_size', 'batch_size',
'min_epsilon', 'per_iter_epsilon_reduction', 'experiment_name', 'train_conv_layers']
for required_parameter in required_parameters:
if required_parameter not in parameters:
raise ValueError('Missing required parameter {0}'.format(required_parameter))
parameters['role_type'] = 'agent'
print('Starting time: {0}'.format(datetime.datetime.utcnow()), file=sys.stderr)
self.__model_buffer = None
self.__model = None
self.__airsim_started = False
self.__data_dir = parameters['data_dir']
self.__per_iter_epsilon_reduction = float(parameters['per_iter_epsilon_reduction'])
self.__min_epsilon = float(parameters['min_epsilon'])
self.__max_epoch_runtime_sec = float(parameters['max_epoch_runtime_sec'])
self.__replay_memory_size = int(parameters['replay_memory_size'])
self.__batch_size = int(parameters['batch_size'])
self.__experiment_name = parameters['experiment_name']
self.__train_conv_layers = bool((parameters['train_conv_layers'].lower().strip() == 'true'))
self.__epsilon = 1
self.__num_batches_run = 0
self.__last_checkpoint_batch_count = 0
if 'batch_update_frequency' in parameters:
self.__batch_update_frequency = int(parameters['batch_update_frequency'])
if 'weights_path' in parameters:
self.__weights_path = parameters['weights_path']
else:
self.__weights_path = None
if 'airsim_path' in parameters:
self.__airsim_path = parameters['airsim_path']
else:
self.__airsim_path = None
self.__local_run = 'local_run' in parameters
self.__car_client = None
self.__car_controls = None
self.__minibatch_dir = os.path.join(self.__data_dir, 'minibatches')
self.__output_model_dir = os.path.join(self.__data_dir, 'models')
self.__make_dir_if_not_exist(self.__minibatch_dir)
self.__make_dir_if_not_exist(self.__output_model_dir)
self.__last_model_file = ''
self.__possible_ip_addresses = []
self.__trainer_ip_address = None
self.__experiences = {}
self.__init_road_points()
self.__init_reward_points()
# Starts the agent
def start(self):
self.__run_function()
# The function that will be run during training.
# It will initialize the connection to the trainer, start AirSim, and continuously run training iterations.
def __run_function(self):
print('Starting run function')
# Once the trainer is online, it will write its IP to a file in (data_dir)\trainer_ip\trainer_ip.txt
# Wait for that file to exist
if not self.__local_run:
print('Waiting for trainer to come online')
while True:
trainer_ip_dir = os.path.join(os.path.join(self.__data_dir, 'trainer_ip'), self.__experiment_name)
print('Checking {0}...'.format(trainer_ip_dir))
if os.path.isdir(trainer_ip_dir):
with open(os.path.join(trainer_ip_dir, 'trainer_ip.txt'), 'r') as f:
self.__possible_ip_addresses.append(f.read().replace('\n', ''))
break
print('Not online yet. Sleeping...')
time.sleep(5)
# We now have the IP address for the trainer. Attempt to ping the trainer.
ping_idx = -1
while True:
ping_idx += 1
print('Attempting to ping trainer...')
try:
print('\tPinging {0}...'.format(
self.__possible_ip_addresses[ping_idx % len(self.__possible_ip_addresses)]))
response = requests.get('http://{0}:80/ping'.format(
self.__possible_ip_addresses[ping_idx % len(self.__possible_ip_addresses)])).json()
if response['message'] != 'pong':
raise ValueError('Received unexpected message: {0}'.format(response))
print('Success!')
self.__trainer_ip_address = self.__possible_ip_addresses[
ping_idx % len(self.__possible_ip_addresses)]
break
except Exception as e:
print('Could not get response. Message is {0}'.format(e))
if ping_idx % len(self.__possible_ip_addresses) == 0:
print('Waiting 5 seconds and trying again...')
time.sleep(5)
# Get the latest model from the trainer
print('Getting model from the trainer')
sys.stdout.flush()
self.__model = RlModel(self.__weights_path, self.__train_conv_layers)
self.__get_latest_model()
else:
print('Run is local. Skipping connection to trainer.')
self.__model = RlModel(self.__weights_path, self.__train_conv_layers)
# Connect to the AirSim exe
self.__connect_to_airsim()
# Fill the replay memory by driving randomly.
print('Filling replay memory...')
while True:
print('Running Airsim Epoch.')
try:
self.__run_airsim_epoch(True)
percent_full = 100.0 * len(self.__experiences['actions']) / self.__replay_memory_size
print('Replay memory now contains {0} members. ({1}% full)'.format(len(self.__experiences['actions']),
percent_full))
if percent_full >= 100.0:
break
except msgpackrpc.error.TimeoutError:
print('Lost connection to AirSim while fillling replay memory. Attempting to reconnect.')
self.__connect_to_airsim()
# Get the latest model. Other agents may have finished before us.
print('Replay memory filled. Starting main loop...')
if not self.__local_run:
self.__get_latest_model()
while True:
try:
if (self.__model is not None):
# Generate a series of training examples by driving the vehicle in AirSim
print('Running Airsim Epoch.')
experiences, frame_count = self.__run_airsim_epoch(False)
# If we didn't immediately crash, train on the gathered experiences
if (frame_count > 0):
print('Generating {0} minibatches...'.format(frame_count))
print('Sampling Experiences.')
# Sample experiences from the replay memory
sampled_experiences = self.__sample_experiences(experiences, frame_count, True)
self.__num_batches_run += frame_count
# If we successfully sampled, train on the collected minibatches and send the gradients to the trainer node
if (len(sampled_experiences) > 0):
print('Publishing AirSim Epoch.')
self.__publish_batch_and_update_model(sampled_experiences, frame_count)
# Occasionally, the AirSim exe will stop working.
# For example, if a user connects to the node to visualize progress.
# In that case, attempt to reconnect.
except msgpackrpc.error.TimeoutError:
print('Lost connection to AirSim. Attempting to reconnect.')
self.__connect_to_airsim()
# Connects to the AirSim Exe.
# Assume that it is already running. After 10 successive attempts, attempt to restart the executable.
def __connect_to_airsim(self):
attempt_count = 0
while True:
try:
print('Attempting to connect to AirSim (attempt {0})'.format(attempt_count))
self.__car_client = airsim.CarClient()
self.__car_client.confirmConnection()
self.__car_client.enableApiControl(True)
self.__car_controls = airsim.CarControls()
print('Connected!')
return
except:
print('Failed to connect.')
attempt_count += 1
if (attempt_count % 10 == 0):
print('10 consecutive failures to connect. Attempting to start AirSim on my own.')
if self.__local_run:
os.system('START "" powershell.exe {0}'.format(
os.path.join(self.__airsim_path, 'AD_Cookbook_Start_AirSim.ps1 neighborhood -windowed')))
else:
os.system(
'START "" powershell.exe D:\\AD_Cookbook_AirSim\\Scripts\\DistributedRL\\restart_airsim_if_agent.ps1')
print('Waiting a few seconds.')
time.sleep(10)
# Appends a sample to a ring buffer.
# If the appended example takes the size of the buffer over buffer_size, the example at the front will be removed.
def __append_to_ring_buffer(self, item, buffer, buffer_size):
if (len(buffer) >= buffer_size):
buffer = buffer[1:]
buffer.append(item)
return buffer
# Runs an iteration of data generation from AirSim.
# Data will be saved in the replay memory.
def __run_airsim_epoch(self, always_random):
print('Running AirSim epoch.')
# Pick a random starting point on the roads
starting_points, starting_direction = self.__get_next_starting_point()
# Initialize the state buffer.
# For now, save 4 images at 0.01 second intervals.
state_buffer_len = 4
state_buffer = []
wait_delta_sec = 0.01
print('Getting Pose')
self.__car_client.simSetPose(Pose(Vector3r(starting_points[0], starting_points[1], starting_points[2]),
AirSimClientBase.toQuaternion(starting_direction[0], starting_direction[1],
starting_direction[2])), True)
# Currently, simSetPose does not allow us to set the velocity.
# So, if we crash and call simSetPose, the car will be still moving at its previous velocity.
# We need the car to stop moving, so push the brake and wait for a few seconds.
print('Waiting for momentum to die out')
self.__car_controls.steering = 0
self.__car_controls.throttle = 0
self.__car_controls.brake = 1
self.__car_client.setCarControls(self.__car_controls)
time.sleep(4)
print('Resetting')
self.__car_client.simSetPose(Pose(Vector3r(starting_points[0], starting_points[1], starting_points[2]),
AirSimClientBase.toQuaternion(starting_direction[0], starting_direction[1],
starting_direction[2])), True)
# Start the car rolling so it doesn't get stuck
print('Running car for a few seconds...')
self.__car_controls.steering = 0
self.__car_controls.throttle = 1
self.__car_controls.brake = 0
self.__car_client.setCarControls(self.__car_controls)
# While the car is rolling, start initializing the state buffer
stop_run_time = datetime.datetime.now() + datetime.timedelta(seconds=2)
while (datetime.datetime.now() < stop_run_time):
time.sleep(wait_delta_sec)
state_buffer = self.__append_to_ring_buffer(self.__get_image(), state_buffer, state_buffer_len)
done = False
actions = [] # records the state we go to
pre_states = []
post_states = []
rewards = []
predicted_rewards = []
car_state = self.__car_client.getCarState()
start_time = datetime.datetime.utcnow()
end_time = start_time + datetime.timedelta(seconds=self.__max_epoch_runtime_sec)
num_random = 0
far_off = False
# Main data collection loop
while not done:
collision_info = self.__car_client.getCollisionInfo()
utc_now = datetime.datetime.utcnow()
# Check for terminal conditions:
# 1) Car has collided
# 2) Car is stopped
# 3) The run has been running for longer than max_epoch_runtime_sec.
# This constraint is so the model doesn't end up having to churn through huge chunks of data, slowing down training
# 4) The car has run off the road
if (collision_info.has_collided or car_state.speed < 2 or utc_now > end_time or far_off):
print('Start time: {0}, end time: {1}'.format(start_time, utc_now), file=sys.stderr)
if (utc_now > end_time):
print('timed out.')
print('Full autonomous run finished at {0}'.format(utc_now), file=sys.stderr)
done = True
sys.stderr.flush()
else:
# The Agent should occasionally pick random action instead of best action
do_greedy = np.random.random_sample()
pre_state = copy.deepcopy(state_buffer)
if (do_greedy < self.__epsilon or always_random):
num_random += 1
next_state = self.__model.get_random_state()
predicted_reward = 0
else:
next_state, predicted_reward = self.__model.predict_state(pre_state)
print('Model predicts {0}'.format(next_state))
# Convert the selected state to a control signal
next_control_signals = self.__model.state_to_control_signals(next_state,
self.__car_client.getCarState())
# Take the action
self.__car_controls.steering = next_control_signals[0]
self.__car_controls.throttle = next_control_signals[1]
self.__car_controls.brake = next_control_signals[2]
self.__car_client.setCarControls(self.__car_controls)
# Wait for a short period of time to see outcome
time.sleep(wait_delta_sec)
# Observe outcome and compute reward from action
state_buffer = self.__append_to_ring_buffer(self.__get_image(), state_buffer, state_buffer_len)
car_state = self.__car_client.getCarState()
collision_info = self.__car_client.getCollisionInfo()
reward, far_off = self.__compute_reward(collision_info, car_state)
# Add the experience to the set of examples from this iteration
pre_states.append(pre_state)
post_states.append(state_buffer)
rewards.append(reward)
predicted_rewards.append(predicted_reward)
actions.append(next_state)
# Only the last state is a terminal state.
is_not_terminal = [1 for i in range(0, len(actions) - 1, 1)]
is_not_terminal.append(0)
# Add all of the states from this iteration to the replay memory
self.__add_to_replay_memory('pre_states', pre_states)
self.__add_to_replay_memory('post_states', post_states)
self.__add_to_replay_memory('actions', actions)
self.__add_to_replay_memory('rewards', rewards)
self.__add_to_replay_memory('predicted_rewards', predicted_rewards)
self.__add_to_replay_memory('is_not_terminal', is_not_terminal)
print('Percent random actions: {0}'.format(num_random / max(1, len(actions))))
print('Num total actions: {0}'.format(len(actions)))
# If we are in the main loop, reduce the epsilon parameter so that the model will be called more often
# Note: this will be overwritten by the trainer's epsilon if running in distributed mode
if not always_random:
self.__epsilon -= self.__per_iter_epsilon_reduction
self.__epsilon = max(self.__epsilon, self.__min_epsilon)
return self.__experiences, len(actions)
# Adds a set of examples to the replay memory
def __add_to_replay_memory(self, field_name, data):
if field_name not in self.__experiences:
self.__experiences[field_name] = data
else:
self.__experiences[field_name] += data
start_index = max(0, len(self.__experiences[field_name]) - self.__replay_memory_size)
self.__experiences[field_name] = self.__experiences[field_name][start_index:]
# Sample experiences from the replay memory
def __sample_experiences(self, experiences, frame_count, sample_randomly):
sampled_experiences = {}
sampled_experiences['pre_states'] = []
sampled_experiences['post_states'] = []
sampled_experiences['actions'] = []
sampled_experiences['rewards'] = []
sampled_experiences['predicted_rewards'] = []
sampled_experiences['is_not_terminal'] = []
# Compute the surprise factor, which is the difference between the predicted an the actual Q value for each state.
# We can use that to weight examples so that we are more likely to train on examples that the model got wrong.
suprise_factor = np.abs(
np.array(experiences['rewards'], dtype=np.dtype(float)) - np.array(experiences['predicted_rewards'],
dtype=np.dtype(float)))
suprise_factor_normalizer = np.sum(suprise_factor)
suprise_factor /= float(suprise_factor_normalizer)
# Generate one minibatch for each frame of the run
for _ in range(0, frame_count, 1):
if sample_randomly:
idx_set = set(np.random.choice(list(range(0, suprise_factor.shape[0], 1)), size=(self.__batch_size),
replace=False))
else:
idx_set = set(np.random.choice(list(range(0, suprise_factor.shape[0], 1)), size=(self.__batch_size),
replace=False, p=suprise_factor))
sampled_experiences['pre_states'] += [experiences['pre_states'][i] for i in idx_set]
sampled_experiences['post_states'] += [experiences['post_states'][i] for i in idx_set]
sampled_experiences['actions'] += [experiences['actions'][i] for i in idx_set]
sampled_experiences['rewards'] += [experiences['rewards'][i] for i in idx_set]
sampled_experiences['predicted_rewards'] += [experiences['predicted_rewards'][i] for i in idx_set]
sampled_experiences['is_not_terminal'] += [experiences['is_not_terminal'][i] for i in idx_set]
return sampled_experiences
# Train the model on minibatches and post to the trainer node.
# The trainer node will respond with the latest version of the model that will be used in further data generation iterations.
def __publish_batch_and_update_model(self, batches, batches_count):
# Train and get the gradients
print('Publishing epoch data and getting latest model from parameter server...')
gradients = self.__model.get_gradient_update_from_batches(batches)
# Post the data to the trainer node
if not self.__local_run:
post_data = {}
post_data['gradients'] = gradients
post_data['batch_count'] = batches_count
response = requests.post('http://{0}:80/gradient_update'.format(self.__trainer_ip_address), json=post_data)
print('Response:')
print(response)
new_model_parameters = response.json()
# Update the existing model with the new parameters
self.__model.from_packet(new_model_parameters)
# If the trainer sends us a epsilon, allow it to override our local value
if ('epsilon' in new_model_parameters):
new_epsilon = float(new_model_parameters['epsilon'])
print('Overriding local epsilon with {0}, which was sent from trainer'.format(new_epsilon))
self.__epsilon = new_epsilon
else:
if (self.__num_batches_run > self.__batch_update_frequency + self.__last_checkpoint_batch_count):
self.__model.update_critic()
checkpoint = {}
checkpoint['model'] = self.__model.to_packet(get_target=True)
checkpoint['batch_count'] = batches_count
checkpoint_str = json.dumps(checkpoint)
checkpoint_dir = os.path.join(os.path.join(self.__data_dir, 'checkpoint'), self.__experiment_name)
if not os.path.isdir(checkpoint_dir):
try:
os.makedirs(checkpoint_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise
file_name = os.path.join(checkpoint_dir, '{0}.json'.format(self.__num_batches_run))
with open(file_name, 'w') as f:
print('Checkpointing to {0}'.format(file_name))
f.write(checkpoint_str)
self.__last_checkpoint_batch_count = self.__num_batches_run
# Gets the latest model from the trainer node
def __get_latest_model(self):
print('Getting latest model from parameter server...')
response = requests.get('http://{0}:80/latest'.format(self.__trainer_ip_address)).json()
self.__model.from_packet(response)
# Gets an image from AirSim
def __get_image(self):
image_response = self.__car_client.simGetImages([ImageRequest(0, AirSimImageType.Scene, False, False)])[0]
image1d = np.fromstring(image_response.image_data_uint8, dtype=np.uint8)
image_rgba = image1d.reshape(image_response.height, image_response.width, 4)
return image_rgba[76:135, 0:255, 0:3].astype(float)
# Computes the reward functinon based on the car position.
def __compute_reward(self, collision_info, car_state):
# Define some constant parameters for the reward function
THRESH_DIST = 3.5 # The maximum distance from the center of the road to compute the reward function
DISTANCE_DECAY_RATE = 1.2 # The rate at which the reward decays for the distance function
CENTER_SPEED_MULTIPLIER = 2.0 # The ratio at which we prefer the distance reward to the speed reward
# If the car has collided, the reward is always zero
if (collision_info.has_collided):
return 0.0, True
# If the car is stopped, the reward is always zero
speed = car_state.speed
if (speed < 2):
return 0.0, True
# Get the car position
position_key = bytes('position', encoding='utf8')
x_val_key = bytes('x_val', encoding='utf8')
y_val_key = bytes('y_val', encoding='utf8')
car_point = np.array(
[car_state.kinematics_true[position_key][x_val_key], car_state.kinematics_true[position_key][y_val_key], 0])
# Distance component is exponential distance to nearest line
distance = 999
# Compute the distance to the nearest center line
for line in self.__reward_points:
local_distance = 0
length_squared = ((line[0][0] - line[1][0]) ** 2) + ((line[0][1] - line[1][1]) ** 2)
if length_squared != 0:
t = max(0, min(1, np.dot(car_point - line[0], line[1] - line[0]) / length_squared))
proj = line[0] + (t * (line[1] - line[0]))
local_distance = np.linalg.norm(proj - car_point)
distance = min(local_distance, distance)
distance_reward = math.exp(-(distance * DISTANCE_DECAY_RATE))
return distance_reward, distance > THRESH_DIST
# Initializes the points used for determining the starting point of the vehicle
def __init_road_points(self):
self.__road_points = []
car_start_coords = [12961.722656, 6660.329102, 0]
with open(os.path.join(os.path.join(self.__data_dir, 'data'), 'road_lines.txt'), 'r') as f:
for line in f:
points = line.split('\t')
first_point = np.array([float(p) for p in points[0].split(',')] + [0])
second_point = np.array([float(p) for p in points[1].split(',')] + [0])
self.__road_points.append(tuple((first_point, second_point)))
# Points in road_points.txt are in unreal coordinates
# But car start coordinates are not the same as unreal coordinates
for point_pair in self.__road_points:
for point in point_pair:
point[0] -= car_start_coords[0]
point[1] -= car_start_coords[1]
point[0] /= 100
point[1] /= 100
# Initializes the points used for determining the optimal position of the vehicle during the reward function
def __init_reward_points(self):
self.__reward_points = []
with open(os.path.join(os.path.join(self.__data_dir, 'data'), 'reward_points.txt'), 'r') as f:
for line in f:
point_values = line.split('\t')
first_point = np.array([float(point_values[0]), float(point_values[1]), 0])
second_point = np.array([float(point_values[2]), float(point_values[3]), 0])
self.__reward_points.append(tuple((first_point, second_point)))
# Randomly selects a starting point on the road
# Used for initializing an iteration of data generation from AirSim
def __get_next_starting_point(self):
# Get the current state of the vehicle
car_state = self.__car_client.getCarState()
# Pick a random road.
random_line_index = np.random.randint(0, high=len(self.__road_points))
# Pick a random position on the road.
# Do not start too close to either end, as the car may crash during the initial run.
random_interp = (np.random.random_sample() * 0.4) + 0.3
# Pick a random direction to face
random_direction_interp = np.random.random_sample()
# Compute the starting point of the car
random_line = self.__road_points[random_line_index]
random_start_point = list(random_line[0])
random_start_point[0] += (random_line[1][0] - random_line[0][0]) * random_interp
random_start_point[1] += (random_line[1][1] - random_line[0][1]) * random_interp
# Compute the direction that the vehicle will face
# Vertical line
if (np.isclose(random_line[0][1], random_line[1][1])):
if random_direction_interp > 0.5:
random_direction = (0, 0, 0)
else:
random_direction = (0, 0, math.pi)
# Horizontal line
elif (np.isclose(random_line[0][0], random_line[1][0])):
if random_direction_interp > 0.5:
random_direction = (0, 0, math.pi / 2)
else:
random_direction = (0, 0, -1.0 * math.pi / 2)
# The z coordinate is always zero
random_start_point[2] = -0
return random_start_point, random_direction
# A helper function to make a directory if it does not exist
def __make_dir_if_not_exist(self, directory):
if not (os.path.exists(directory)):
try:
os.makedirs(directory)
except OSError as e:
if e.errno != errno.EEXIST:
raise
# Sets up the logging framework.
# This allows us to log using simple print() statements.
# The output is redirected to a unique file on the file share.
def setup_logs(parameters):
output_dir = 'Z:\\logs\\{0}\\agent'.format(parameters['experiment_name'])
if not os.path.isdir(output_dir):
try:
os.makedirs(output_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise
sys.stdout = open(os.path.join(output_dir, '{0}.stdout.txt'.format(os.environ['AZ_BATCH_NODE_ID'])), 'w')
sys.stderr = open(os.path.join(output_dir, '{0}.stderr.txt'.format(os.environ['AZ_BATCH_NODE_ID'])), 'w')
# Parse the command line parameters
parameters = {}
for arg in sys.argv:
if '=' in arg:
args = arg.split('=')
print('0: {0}, 1: {1}'.format(args[0], args[1]))
parameters[args[0].replace('--', '')] = args[1]
if arg.replace('-', '') == 'local_run':
parameters['local_run'] = True
# Make the debug statements easier to read
np.set_printoptions(threshold=np.nan, suppress=True)
# Check additional parameters needed for local run
if 'local_run' in parameters:
if 'airsim_path' not in parameters:
print('ERROR: for a local run, airsim_path must be defined.')
print('Please provide the path to airsim in a parameter like "airsim_path=<path_to_airsim>"')
print('It should point to the folder containing AD_Cookbook_Start_AirSim.ps1')
sys.exit()
if 'batch_update_frequency' not in parameters:
print('ERROR: for a local run, batch_update_frequency must be defined.')
print('Please provide the path to airsim in a parameter like "batch_update_frequency=<int>"')
sys.exit()
# Set up the logging to the file share if not running locally.
if 'local_run' not in parameters:
setup_logs(parameters)
print('------------STARTING AGENT----------------')
print(parameters)
print('***')
print(os.environ)
print('***')
# Identify the node as an agent and start AirSim
if 'local_run' not in parameters:
os.system('echo 1 >> D:\\agent.agent')
os.system('START "" powershell.exe D:\\AD_Cookbook_AirSim\\Scripts\\DistributedRL\\restart_airsim_if_agent.ps1')
else:
os.system('START "" powershell.exe {0}'.format(
os.path.join(parameters['airsim_path'], 'AD_Cookbook_Start_AirSim.ps1 neighborhood -windowed')))
# Start the training
agent = DistributedAgent(parameters)
agent.start()