|
3 | 3 | import re
|
4 | 4 | import subprocess
|
5 | 5 | import sys
|
| 6 | +import sysconfig |
6 | 7 | import types
|
7 | 8 | import unittest
|
8 | 9 |
|
@@ -173,6 +174,87 @@ class SystemTapOptimizedTests(TraceTests, unittest.TestCase):
|
173 | 174 | backend = SystemTapBackend()
|
174 | 175 | optimize_python = 2
|
175 | 176 |
|
| 177 | +class CheckDtraceProbes(unittest.TestCase): |
| 178 | + @classmethod |
| 179 | + def setUpClass(cls): |
| 180 | + if sysconfig.get_config_var('WITH_DTRACE'): |
| 181 | + readelf_major_version, readelf_minor_version = cls.get_readelf_version() |
| 182 | + if support.verbose: |
| 183 | + print(f"readelf version: {readelf_major_version}.{readelf_minor_version}") |
| 184 | + else: |
| 185 | + raise unittest.SkipTest("CPython must be configured with the --with-dtrace option.") |
| 186 | + |
| 187 | + |
| 188 | + @staticmethod |
| 189 | + def get_readelf_version(): |
| 190 | + try: |
| 191 | + cmd = ["readelf", "--version"] |
| 192 | + proc = subprocess.Popen( |
| 193 | + cmd, |
| 194 | + stdout=subprocess.PIPE, |
| 195 | + stderr=subprocess.PIPE, |
| 196 | + universal_newlines=True, |
| 197 | + ) |
| 198 | + with proc: |
| 199 | + version, stderr = proc.communicate() |
| 200 | + |
| 201 | + if proc.returncode: |
| 202 | + raise Exception( |
| 203 | + f"Command {' '.join(cmd)!r} failed " |
| 204 | + f"with exit code {proc.returncode}: " |
| 205 | + f"stdout={version!r} stderr={stderr!r}" |
| 206 | + ) |
| 207 | + except OSError: |
| 208 | + raise unittest.SkipTest("Couldn't find readelf on the path") |
| 209 | + |
| 210 | + # Regex to parse: |
| 211 | + # 'GNU readelf (GNU Binutils) 2.40.0\n' -> 2.40 |
| 212 | + match = re.search(r"^(?:GNU) readelf.*?\b(\d+)\.(\d+)", version) |
| 213 | + if match is None: |
| 214 | + raise unittest.SkipTest(f"Unable to parse readelf version: {version}") |
| 215 | + |
| 216 | + return int(match.group(1)), int(match.group(2)) |
| 217 | + |
| 218 | + def get_readelf_output(self): |
| 219 | + command = ["readelf", "-n", sys.executable] |
| 220 | + stdout, _ = subprocess.Popen( |
| 221 | + command, |
| 222 | + stdout=subprocess.PIPE, |
| 223 | + stderr=subprocess.STDOUT, |
| 224 | + universal_newlines=True, |
| 225 | + ).communicate() |
| 226 | + return stdout |
| 227 | + |
| 228 | + def test_check_probes(self): |
| 229 | + readelf_output = self.get_readelf_output() |
| 230 | + |
| 231 | + available_probe_names = [ |
| 232 | + "Name: import__find__load__done", |
| 233 | + "Name: import__find__load__start", |
| 234 | + "Name: audit", |
| 235 | + "Name: gc__start", |
| 236 | + "Name: gc__done", |
| 237 | + ] |
| 238 | + |
| 239 | + for probe_name in available_probe_names: |
| 240 | + with self.subTest(probe_name=probe_name): |
| 241 | + self.assertIn(probe_name, readelf_output) |
| 242 | + |
| 243 | + @unittest.expectedFailure |
| 244 | + def test_missing_probes(self): |
| 245 | + readelf_output = self.get_readelf_output() |
| 246 | + |
| 247 | + # Missing probes will be added in the future. |
| 248 | + missing_probe_names = [ |
| 249 | + "Name: function__entry", |
| 250 | + "Name: function__return", |
| 251 | + "Name: line", |
| 252 | + ] |
| 253 | + |
| 254 | + for probe_name in missing_probe_names: |
| 255 | + with self.subTest(probe_name=probe_name): |
| 256 | + self.assertIn(probe_name, readelf_output) |
| 257 | + |
176 | 258 |
|
177 | 259 | if __name__ == '__main__':
|
178 | 260 | unittest.main()
|
0 commit comments