-
Notifications
You must be signed in to change notification settings - Fork 16
/
roslaunch-to-dot.py
executable file
·1443 lines (1163 loc) · 53.9 KB
/
roslaunch-to-dot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
'''This script takes a ROS launch file as input and generates a dot graph
file based on the tree of nodes and launch files that will be launched
based on the input launch file.
usage: roslaunch-to-dot.py [-h] [--landscape] [--aspect-ratio ASPECTRATIO]
[--png] [--disable-groups] [--show-node-type]
[--show-rosparam-nodes]
launchFile outputFile [arg [arg ...]]
Create a dot graph file from a ROS launch file.
positional arguments:
launchFile path to the desired launch file
outputFile the output dot file to save
arg override an arg specified anywhere in the launch
file tree
optional arguments:
-h, --help show this help message and exit
--landscape display the nodes from left to right instead of top
to bottom
--aspect-ratio ASPECTRATIO
the approximate aspect ratio desired (default =
8.5/11)
--png automatically convert the dot file to a PNG
--svg automatically convert the dot file to a SVG
--pdf automatically convert the dot file to a PDF
--disable-groups don't group nodes/launch files based on their
package
--show-node-type label ROS nodes with their type in addition to
their name
--show-rosparam-nodes
display nodes and connections for all rosparam
files used
'''
import re
import traceback
from sys import argv
from copy import deepcopy
from random import randint
from datetime import datetime
from os import system, environ
from argparse import ArgumentParser
from collections import namedtuple
import xml.etree.ElementTree as ET
from os.path import abspath, exists, basename, splitext, sep, dirname
import roslib
import rospkg
from roslaunch import substitution_args
try:
import pygraphviz as gv
except ImportError:
raise ImportError("Please run 'sudo apt-get install python3-pygraphviz'")
# List of filetypes that are recognized/supported as ROS launch files
LAUNCH_FILE_TYPES = [".launch", ".test", ".xml"]
# Keep track of a global set of launch files that have already been
# visited so that we can protect ourselves from entering into an
# infinite loop of launch files if there happens to be a recursive
# cycle in the graph
VISITED_LAUNCH_FILES = set()
# Create a named tuple to store attributes pertaining to a node
Node = namedtuple("Node", [
"launchFile", # The launch file that contains this node
"package", # The name of the ROS package containing this node
"nodeType", # The type of ROS node this is
"name", # The name of the ROS node
"dotNodeName", # The name for the corresponding dot node
"argSubs", # The package that contains the rosparam file
"isTestNode"]) # True if this is a test node, False otherwise
# Create a named tuple to store attributes pertaining to a rosparam file
RosParam = namedtuple("RosParamFile", [
"filename", # The resolved filename for the rosparam file
"name", # The base name for the rosparam file
"dotNodeName", # The name for the corresponding dot node
"package", # The package that contains the rosparam file
"argSubs", # The dictionary of argument substitutions needed for the file
])
# Create a named tuple to store items contained in a single ROS package
PackageItems = namedtuple("PackageItems", [
"launchFiles", # The list of launch files contained in this package
"nodes", # The list of nodes contains in this package
"rosParamFiles", # The list of rosparam files contained in this package
])
class LaunchFile:
'''The LaunchFile class encapsulates a single ROS launch file. This
class is responsible for parsing the launch file XML and keeping track
of nodes and launch files that are included within the launch file. It
is also responsible for properly resolving any ROS launch substitution
arguments that may exist within strings that it uses within the launch
file.
In addition to this, this class is capable of producing the text to
export this launch file (and all of the nodes and other launch files
it connects to) into a dot graph.
'''
# Identifiers used as elements within a launch file
ArgTag = "arg"
GroupTag = "group"
IncludeTag = "include"
NodeTag = "node"
RosParamTag = "rosparam"
TestTag = "test"
# Identifiers used as element attribute names within a launch file
CommandAttribute = "command"
DefaultAttribute = "default"
FileAttribute = "file"
IfAttribute = "if"
NameAttribute = "name"
PkgAttribute = "pkg"
TestNameAttribute = "test-name"
TypeAttribute = "type"
UnlessAttribute = "unless"
ValueAttribute = "value"
PassAllArgsAttribute = "pass_all_args"
# Identifiers used as substitution arguments within a launch
# file, e.g,. $(find package)
AnonSubstitutionArg = "anon"
ArgSubstitutionArg = "arg"
EnvSubstitutionArg = "env"
FindSubstitutionArg = "find"
OptEnvSubstitutionArg = "optenv"
DirnameSubstitutionArg = "dirname"
# Identifiers for various rosparam commands
DumpCommand = "dump"
LoadCommand = "load"
# Colors used within the dot graph
ConditionalLineColor = "#ff8c00"
CycleLineColor = "red"
DuplicateNodeColor = "red"
LaunchFileColor = "#d3d3d3"
LineColor = "black"
MissingFileColor = "#cc0000"
NodeColor = "#6495ed"
TestNodeColor = "#009900"
# Other commonly used attributes
LinePenWidth = "3"
SubgraphPenWidth = "3"
# The regular expression used to match substitution arguments
SubArgsPattern = "\$\(([a-zA-Z_]+) ?([a-zA-Z0-9_! ]*)\)"
FindArgsPattern = "\$\(arg ([a-zA-Z0-9_]+)\)"
def __init__(self,
args,
filename,
includeArgs=None,
overrideArgs=None,
ancestors=None):
'''
* args -- command line arguments
* filename -- the ROS launch file
* includeArgs -- dictionary of arg substitution name value pairs
that were used to resolve the name of this launch file
* overrideArgs -- dictionary of arguments that override any
arguments specified in this launch file
* ancestors -- List of launch files which are ancestors of
this launch file
'''
self.__inputArgs = args
self.__ancestors = [] if ancestors is None else deepcopy(ancestors)
# Include ourself as an ancestor, since references to ourself would
# also constitute a cycle
self.__ancestors.append(filename)
# Cannot use dictionary in default argument because the same
# object will get reused
self.__includeArgs = {} if includeArgs is None else includeArgs
self.__overrideArgs = {} if overrideArgs is None else overrideArgs
# Determine if this launch file has been parsed before
hasVisited = (filename in VISITED_LAUNCH_FILES)
self.__filename = filename
VISITED_LAUNCH_FILES.add(filename)
# Check if the filename actually exists
self.__missing = (not exists(self.__filename))
# Dictionary of args defined in the launch file mapping arg name to
# its resolved value
self.__args = {}
# Map launch file substitution arguments (e.g, 'find', 'arg') to the
# function that handle resolving the substitution argument
self.__substitutionArgFnMap = {
self.FindSubstitutionArg: self.__onFindSubstitutionArg,
self.ArgSubstitutionArg: self.__onArgSubstitutionArg,
self.EnvSubstitutionArg: self.__onEnvSubstitutionArg,
self.OptEnvSubstitutionArg: self.__onEnvSubstitutionArg,
self.AnonSubstitutionArg: self.__onAnonSubstitutionArg,
self.DirnameSubstitutionArg: self.__onDirnameSubstitutionArg,
}
# List of launch file objects which are included by the launch file
self.__includes = []
# Create a list of launch filenames that cycles back to previously
# visited launch files so that these cycles can be differentiated
# in the graph
self.__cycles = []
# List of Node namedtuple objects associated with this launch file
self.__nodes = []
# List of RosParam namedtuplem objects included by this launch file
self.__rosParamFiles = []
# Protect against cycles in the launch files
if not hasVisited:
#### Only parse the file when it has not been visited before
if not self.__missing:
# Only parse if the file exists
self.__parseLaunchFile(filename)
else:
print("WARNING: Could not locate launch " \
"file: %s" % self.__filename)
#### Getter functions
def getFilename(self):
'''Get the filename for this launch file.'''
return self.__filename
def isMissing(self):
'''Determine if this launch file is missing or not.'''
return self.__missing
def getNodes(self):
'''Get all the nodes included by this launch file.'''
return self.__nodes
def getCycles(self):
'''Return the list of launch file names that are included by this
launch file but are cycles back to previously visited launch files.
'''
return self.__cycles
def getCleanName(self):
'''Get the clean (no periods) name for this launch file.'''
return splitext(basename(self.__filename))[0].replace(".", "_")
def getDotNodeName(self):
'''Get the name of the dot node corresponding to this launch file.'''
cleanName = self.getCleanName()
packageName = self.getPackageName()
return "launch_%s_%s" % (packageName, cleanName)
def getPackageName(self):
'''Get the name of the package that contains this launch file.'''
packageName = rospkg.get_package_name(self.__filename)
if not packageName:
raise Exception("Failed to get package name for: %s" %
self.__filename)
return packageName
def getAllLaunchFiles(self):
'''Get the entire list of launch files included because of
this launch file.
'''
launchFiles = [self] # Add ourself
# Recursively add all of our children
for launchFile in self.__includes:
launchFiles.extend(launchFile.getAllLaunchFiles())
return launchFiles
def getAllNodes(self):
'''Get all of the nodes that will be launched because of this launch
file and any launch files it includes.
'''
allNodes = []
# Add our own nodes
for node in self.__nodes:
allNodes.append(node)
# Recursively add remaining nodes
for include in self.__includes:
allNodes.extend(include.getAllNodes())
return allNodes
def getAllRosParamFiles(self):
'''Get the entire list of rosparam files included because of
this launch file.
'''
rosParamFiles = []
# Add our own rosparam files
for rosParam in self.__rosParamFiles:
rosParamFiles.append(rosParam)
# Recursively add all of our children's rosparam files
for launchFile in self.__includes:
rosParamFiles.extend(launchFile.getAllRosParamFiles())
return rosParamFiles
def getIncludeMap(self):
'''Return the dictionary mapping launch filenames to the list
of launch files that they include.
'''
includeMap = {}
# Include a mapping for my own included launch files
includeMap[self.__filename] = self.__includes
# Add mappings for each of the subchildren
for childLaunch in self.__includes:
childMappings = childLaunch.getIncludeMap()
# Join the two dictionaries together
includeMap = dict(list(includeMap.items()) + list(childMappings.items()))
return includeMap
def getPackageMap(self):
'''Get a dictionary mapping names of ROS packages to a tuple
where the first item in the tuple is the list of launch files included
in that ROS package, and the second item in the tuple is the list of
ROS nodes included from that ROS package.
'''
# Grab the list of all launch files
allLaunchFiles = self.getAllLaunchFiles()
packageMap = {}
########################################
# Add all launch files to their respective packages
for launchFile in allLaunchFiles:
packageName = launchFile.getPackageName()
items = packageMap.get(packageName, PackageItems([], [], []))
items.launchFiles.append(launchFile)
packageMap[packageName] = items
########################################
# Add all nodes to their respective packages
for node in self.getAllNodes():
items = packageMap.get(node.package, PackageItems([], [], []))
items.nodes.append(node)
packageMap[node.package] = items
########################################
# Add all rosparam files to their respective packages
for rosParam in self.getAllRosParamFiles():
items = packageMap.get(rosParam.package, PackageItems([], [], []))
items.rosParamFiles.append(rosParam)
packageMap[rosParam.package] = items
return packageMap
#### Dot graph functions
def toDot(self):
'''Return the graph that represents this launch file tree.'''
# Grab the map of all packages, nodes, and include files
# used by this launch tree
packageMap = self.getPackageMap()
graph = gv.AGraph(
name=self.getCleanName(),
strict=False,
directed=True,
fontsize=35,
ranksep=2,
nodesep=2,
ratio=self.__inputArgs.aspectRatio,
compound=True)
# Enable landscape mode, if specified
if self.__inputArgs.landscapeMode:
graph.graph_attr.update(rankdir="LR")
# Set node attributes
graph.node_attr.update(fontsize="35")
# Set edge attributes
graph.edge_attr.update(fontsize="35")
#### Create a subgraph for every known package
self.__clusterNum = 0
allNodeNames = set() # Set of node names to check for duplicates
for packageName, packageItems in packageMap.items():
self.__createPackageSubgraph(
graph, packageName, packageItems, allNodeNames)
#### Create the input command line arguments node
if len(self.__overrideArgs) > 0:
# List of lines to add to the label for the command
# line arguments node
claLines = [
"Command Line Arguments:" # Title
]
# Add a single line to the label for the node for each arg
# name value pair that was specified on the command line
for argName, argValue in self.__overrideArgs.items():
# Create a label each command line argument
label = "%s:=%s" % (argName, argValue)
claLines.append(label)
# Create the entire label for the node by putting each item
# in the list on its own line
label = '\n'.join(claLines)
# Create a single node for the command line arguments
claNodeName = "command_line_args" # Unique name for the node
graph.add_node(
claNodeName,
label=label,
shape="rectangle",
style="dashed")
# Add a connection from the command line argument node
# to the main launch file
mainLaunchFileNodeName = self.getDotNodeName()
graph.add_edge(
claNodeName,
mainLaunchFileNodeName,
penwidth=self.LinePenWidth,
color=self.LineColor)
#### Create connections between all launch files
# Iterate over all packages contained in the launch tree
for _, packageItems in packageMap.items():
# Iterate over all launch files in this package
for launchFile in packageItems.launchFiles:
parentNodeName = launchFile.getDotNodeName()
# Grab the list of cycles for this launch file
cycles = launchFile.getCycles()
# Iterate over all launch files included by the
# current launch file
for include in launchFile.__includes:
includeFilename = include.getFilename()
includeNodeName = include.getDotNodeName()
# Determine if this include is a cycle to a previously
# visited node
isCycle = (includeFilename in cycles)
# Select a color depending on if this is a standard
# connection between launch files, or a cycle to a
# previously parsed launch file
color = self.CycleLineColor if isCycle else self.LineColor
label = ""
# Grab the set of arg substitutions used to conditionally
# include the launch file so that the edge can be labeled
# and styled accordingly
argSubs = include.__includeArgs
if len(argSubs) > 0:
# Change the color of the line to indicate that it
# required arg substitutions
color = self.ConditionalLineColor
# Convert all arg name value pairs into a single
# string, e.g., given {"one": "two", "three": "four"}
# the resulting string should be:
# "one:=two\nthree:=four"
# So that each arg pair is on its own line
# for improved readability
label = '\n'.join([":=".join(map(str, t)) for t in list(argSubs.items())])
graph.add_edge(
parentNodeName,
includeNodeName,
label=label,
penwidth=self.LinePenWidth,
color=color)
#### Create connections between launch files and nodes
for _, packageItems in packageMap.items():
# Iterate over the nodes in this package
for node in packageItems.nodes:
# Grab the dot node name of the launch file for this node
launchNodeName = node.launchFile.getDotNodeName()
# Check if this ROS node required any arguments to be evaluated
# in order to include it in the tree, and label the edge with
# the arguments and their values
label = ""
color = self.LineColor
if len(node.argSubs) > 0:
# Change the color of the line to indicate that it
# required arg substitutions
color = self.ConditionalLineColor
# Convert all arg name value pairs into a single
# string, e.g., given {"one": "two", "three": "four"}
# the resulting string should be:
# "one:=two\nthree:=four"
# So that each arg pair is on its own line
# for improved readability
label = '\n'.join([":=".join(map(str, t)) for t in list(node.argSubs.items())])
graph.add_edge(
launchNodeName,
node.dotNodeName,
label=label,
penwidth=self.LinePenWidth,
color=color)
#### Create connections between launch files and rosparam files
if self.__inputArgs.showRosParamNodes:
# Iterate over all packages contained in the launch tree
for _, packageItems in packageMap.items():
# Iterate over all launch files in this package
for launchFile in packageItems.launchFiles:
launchNodeName = launchFile.getDotNodeName()
# Iterate over all rosparam files needed by the launch file
for rosParam in launchFile.__rosParamFiles:
# Default attributes
color = self.LineColor
label = ""
# Grab the set of arg substitutions used to
# conditionally include the launch file so that the
# edge can be labeled and styled accordingly
argSubs = rosParam.argSubs
if len(argSubs) > 0:
# Change the color of the line to indicate that it
# required arg substitutions
color = self.ConditionalLineColor
# Convert all arg name value pairs into a single
# string, e.g., given {"one": "2", "three": "4"}
# the resulting string should be:
# "one:=2\nthree:=4"
# So that each arg pair is on its own line
# for improved readability
label = '\n'.join([":=".join(map(str, t)) for t in list(argSubs.items())])
graph.add_edge(
launchNodeName,
rosParam.dotNodeName,
label=label,
penwidth=self.LinePenWidth,
color=color)
return graph
def __createPackageSubgraph(self,
graph,
packageName,
packageItems,
allNodeNames):
'''Create a subgraph for a single ROS package.
* packageName -- the name of the ROS package
* packageItems -- The PackageItems object
* allNodeNames -- the set of node names that have already been found so
that duplicate nodes can be highlighted
'''
subgraphNodes = []
#### Add one node per launch file contained within this package
for launchFile in packageItems.launchFiles:
baseFilename = basename(launchFile.getFilename())
launchNodeName = launchFile.getDotNodeName()
# Select the color based on whether or not the file is missing
color = self.MissingFileColor if launchFile.isMissing() else \
self.LaunchFileColor
label = baseFilename
if self.__inputArgs.disableGroups:
#### Include the package name if groups are disabled
label = label + "\npkg: " + packageName
# Add a node for each launch file
graph.add_node(
launchNodeName,
label=label,
shape="rectangle",
style="filled",
fillcolor=color)
# Support disabling subgraph grouping
if not self.__inputArgs.disableGroups:
subgraphNodes.append(launchNodeName)
#### Add one node per node contained within this package
for node in packageItems.nodes:
# Change the color to indicate that this is a test node
color = self.TestNodeColor if node.isTestNode else \
self.NodeColor
# ROS nodes must have unique names, thus alert the user if
# there are two nodes that have the same name
if node.name in allNodeNames:
print("WARNING: There are two nodes in the launch tree " \
"that have the same name: %s" % node.name)
# Modify the style of the node if it is a duplicate
color = self.DuplicateNodeColor
allNodeNames.add(node.name)
label = node.name
if self.__inputArgs.disableGroups:
#### Include the package name if groups are disabled
label = label + "\npkg: " + packageName
# Create the label for the node
if self.__inputArgs.showNodeType:
#### Include the node type in addition to its name
label = label + "\ntype: " + node.nodeType
## Add a node for each node
graph.add_node(
node.dotNodeName,
label=label,
shape="rectangle",
style="filled",
fillcolor=color)
# Support disabling subgraph grouping
if not self.__inputArgs.disableGroups:
subgraphNodes.append(node.dotNodeName)
#### Add one node per rosparam file contained in this package
if self.__inputArgs.showRosParamNodes:
for rosParam in packageItems.rosParamFiles:
# Add the package name when groups are disabled
label = rosParam.name
if self.__inputArgs.disableGroups:
label = label + "\npkg: " + rosParam.package
# Update the color of the node in the event that the
# file is not found
color = ''
style = ''
if not exists(rosParam.filename):
style = "filled"
color = self.MissingFileColor
graph.add_node(
rosParam.dotNodeName,
label=label,
style=style,
color=color)
# Support disabling subgraph grouping
if not self.__inputArgs.disableGroups:
subgraphNodes.append(rosParam.dotNodeName)
# Support disabling subgraph grouping
if not self.__inputArgs.disableGroups:
graph.add_subgraph(
nbunch=subgraphNodes,
name="cluster_" + packageName,
label=packageName,
penwidth=self.SubgraphPenWidth)
##### Launch file XML parsing functions
#
def __parseLaunchFile(self, filename):
'''Parse a single launch file.
* filename -- the launch file
'''
try:
tree = ET.parse(filename)
root = tree.getroot()
except Exception as e:
raise Exception(
"Error while parsing launch file: %s: %s" % (filename, e))
# Parse all of the launch elements. The XML is parsed serially, meaning
# that if an argument is used before it is defined then it will
# generate and error
self.__parseLaunchElements(root)
def __parseLaunchElements(self, root):
'''Parse all of the launch file elements to find other launch files as
well as ROS nodes contained within the launch file.
* root -- the launch file XML element
'''
# Now we can load all include tags and nodes because we have all of the
# potential arguments we need
for child in root:
# Handle all types of tags
if child.tag == self.ArgTag:
# Parse the argument
self.__parseArgTag(child)
elif child.tag == self.IncludeTag:
try:
launchFile = self.__parseIncludeTag(child)
except:
traceback.print_exc()
continue # Ignore error
else:
if launchFile is not None:
self.__includes.append(launchFile)
elif child.tag == self.GroupTag:
try:
self.__parseGroupTag(child)
except:
traceback.print_exc()
continue # Ignore error
elif child.tag == self.NodeTag:
try:
node = self.__parseNodeTag(child)
except:
traceback.print_exc()
continue # Ignore error
else:
# Node is disabled (i.e., if=false, or unless=true)
if node is not None:
self.__nodes.append(node)
elif child.tag == self.RosParamTag:
try:
self.__parseRosParam(child)
except:
traceback.print_exc()
continue # Ignore error
elif child.tag == self.TestTag:
try:
testNode = self.__parseTestNodeTag(child)
except:
traceback.print_exc()
continue # Ignore error
else:
# Test node is disabled (i.e., if=false, or unless=true)
if testNode is not None:
self.__nodes.append(testNode)
def __parseArgTag(self, arg):
'''Parse the argument tag from a launch file.
* arg -- the argument tag
'''
name, value = self.__parseArg(arg)
# Store the argument -- if it is defined
if value is not None:
self.__args[name] = value
# Determine if the 'value' attribute was specified for this
# argument, if it was then we do not want to allow it to
# be overriden
valueSpecified = (self.ValueAttribute in arg.attrib)
if valueSpecified and name in self.__overrideArgs:
del self.__overrideArgs[name] # Remove it from the override
print("WARNING: cannot override arg '%s', which has " \
"already been set." % name)
def __parseIncludeTag(self, include):
'''Parse the include tag from a launch file.
* include -- the include tag
'''
# Make sure the include is enabled before continuing
if not self.__isEnabled(include):
return None # Node is disabled
filename = self.__getAttribute(include, self.FileAttribute)
if filename is None:
raise Exception(
"Include tag missing %s attribute" % self.FileAttribute)
#### We have another launch file to resolve and parse
#
# Resolve the full path to the include file
resolved = self.__resolveText(filename)
# If the filename contained any arg substitutions then we want to
# label the edge indicating the arg and value that was used to
# conditionally select this launch file
argSubs = {} # The file does not use any arg substitutions
if resolved != filename:
# Get the dictionary of arg substitutions that this
# filename contains
argSubs = self.__getSubstitutionArgs(filename)
# If this include uses an if/unless conditional, then the args required
# to satisfy that condition should be included
# Handle the if attribute
ifArg = self.__getArgumentForConditional(include, self.IfAttribute)
if ifArg is not None:
# At this point the argument must be true
argSubs[ifArg] = "true"
# Handle the unless attribute
unlessArg = self.__getArgumentForConditional(
include, self.UnlessAttribute)
if unlessArg is not None:
# At this point the argument must be false
argSubs[unlessArg] = "false"
# Protect against cycles in the launch file graph which occurs when a
# launch file includes a launch file directly in its own ancestor tree
hasVisited = (resolved in self.__ancestors)
if hasVisited:
print("ERROR: There is a cycle in the launch file " \
"graph from: '%s' to '%s'" % (self.__filename, resolved))
self.__cycles.append(resolved) # Add the filename
# Iterate over all children of the include tag to determine if we
# are expected to pass any args to the included launch file.
inheritedArgs = {}
passAllArgs = self.__getAttribute(include, self.PassAllArgsAttribute, False)
if passAllArgs:
inheritedArgs = self.__args
inheritedArgs.update(self.__overrideArgs)
else:
for child in include:
if child.tag == self.ArgTag:
#### Found an arg that should be inherited
# Grab (and resolve) the name and value of the arg
name, value = self.__parseArg(child)
# Allow the child launch file to inherit this argument
if value is not None:
inheritedArgs[name] = value
# Check for rosparams specified under the include tag
self.__findRosParams(include)
# Create the new launch file and parse it -- pass the argument
# overrides to the child launch file to override the argument anywhere
# it is defined
return LaunchFile(
self.__inputArgs,
resolved,
includeArgs=argSubs,
overrideArgs=inheritedArgs,
ancestors=self.__ancestors)
def __parseNodeTag(self, node):
'''Parse the node tag from a launch file.
* node -- the node tag
'''
# Make sure the node is enabled before continuing
if not self.__isEnabled(node):
return None # Node is disabled
# Grab all of the node attributes
pkg = self.__getAttribute(node, self.PkgAttribute)
nodeType = self.__getAttribute(node, self.TypeAttribute)
name = self.__getAttribute(node, self.NameAttribute)
# Any of these attributes may have substitution arguments
# that need to be resolved
pkg = self.__resolveText(pkg)
nodeType = self.__resolveText(nodeType)
name = self.__resolveText(name)
# Check for rosparams specified under the node tag
self.__findRosParams(node)
# Create a dictionary mapping args to values for args that are
# required to select this node
argSubs = {}
# Handle the if attribute
ifArg = self.__getArgumentForConditional(node, self.IfAttribute)
if ifArg is not None:
# At this point the argument must be true
argSubs[ifArg] = "true"
# Handle the unless attribute
unlessArg = self.__getArgumentForConditional(
node, self.UnlessAttribute)
if unlessArg is not None:
# At this point the argument must be false
argSubs[unlessArg] = "false"
# Name for the dot node that will represent this ROS node
# use the package, node type, and node name for the dot node name
# to make it fully unique
dotNodeName = "node_%s_%s_%s" % (pkg, nodeType, name)
# False means this is not a test node
return Node(self, pkg, nodeType, name, dotNodeName, argSubs, False)
def __findRosParams(self, element):
'''Find any and all rosparam elements specified under the given
element.
* element -- the XML element that may contain rosparam elements
'''
# Iterate over children looking for rosparams
for child in element:
if child.tag == self.RosParamTag:
try:
self.__parseRosParam(child)
except:
traceback.print_exc()
print("WARNING: parsing rosparam")
continue # Ignore error
def __parseRosParam(self, rosparam):
'''Parse the rosparam tag from a launch file.
* rosparam -- the rosparam tag
'''
# Load is the default command if it is not provided
command = rosparam.attrib.get(self.CommandAttribute, self.LoadCommand)
# The file attribute is only valid for load and dump commands
if command in [self.LoadCommand, self.DumpCommand]:
paramFile = rosparam.attrib.get(self.FileAttribute, None)
if paramFile is not None:
# Resolve the path to the included file
resolved = self.__resolveText(paramFile)
# If the filename contained any arg substitutions then we
# want to label the edge indicating the arg and value that
# was used to conditionally select this rosparam file
argSubs = {} # The file does not use any arg substitutions
if resolved != paramFile:
# Get the dictionary of arg substitutions that
# this filename contains
argSubs = self.__getSubstitutionArgs(paramFile)
# Determine what ROS package contains the rosparam file
package = rospkg.get_package_name(resolved)
if package is None:
print("ERROR: Failed to find package for rosparam " \
"file: %s" % resolved)
return # Skip the file
# Create a unique name for the dot node
name = basename(resolved)
cleanName = name.replace(".", "_")
dotNodeName = "yaml_%s_%s" % (package, cleanName)
param = RosParam(resolved, name, dotNodeName, package, argSubs)
self.__rosParamFiles.append(param)
def __parseTestNodeTag(self, testNode):
'''Parse the test tag from a launch file.
* testNode -- the testNode tag
'''
# Make sure the test node is enabled before continuing
if not self.__isEnabled(testNode):
return None # Test node is disabled
# Grab all of the test node attributes
pkg = self.__getAttribute(testNode, self.PkgAttribute)
nodeType = self.__getAttribute(testNode, self.TypeAttribute)
name = self.__getAttribute(testNode, self.TestNameAttribute)
# Any of these attributes may have substitution arguments