6
6
import logging
7
7
import os
8
8
import tempfile
9
+ from dataclasses import dataclass
9
10
from functools import partial
10
11
11
12
from packageurl import PackageURL
30
31
logger : logging .Logger = logging .getLogger (__name__ )
31
32
32
33
34
+ @dataclass (frozen = True )
35
+ class ProvenanceAsset :
36
+ """This class exists to hold a provenance payload with the original asset's name and URL."""
37
+
38
+ payload : InTotoPayload
39
+ name : str
40
+ url : str
41
+
42
+
33
43
class ProvenanceFinder :
34
44
"""This class is used to find and retrieve provenance files from supported registries."""
35
45
@@ -44,7 +54,7 @@ def __init__(self) -> None:
44
54
elif isinstance (registry , JFrogMavenRegistry ):
45
55
self .jfrog_registry = registry
46
56
47
- def find_provenance (self , purl : PackageURL ) -> list [InTotoPayload ]:
57
+ def find_provenance (self , purl : PackageURL ) -> list [ProvenanceAsset ]:
48
58
"""Find the provenance file(s) of the passed PURL.
49
59
50
60
Parameters
@@ -54,8 +64,8 @@ def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]:
54
64
55
65
Returns
56
66
-------
57
- list[InTotoPayload ]
58
- The provenance payload , or an empty list if not found.
67
+ list[ProvenanceAsset ]
68
+ The provenance asset , or an empty list if not found.
59
69
"""
60
70
logger .debug ("Seeking provenance of: %s" , purl )
61
71
@@ -88,7 +98,7 @@ def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]:
88
98
logger .debug ("Provenance finding not supported for PURL type: %s" , purl .type )
89
99
return []
90
100
91
- def _find_provenance (self , discovery_functions : list [partial [list [InTotoPayload ]]]) -> list [InTotoPayload ]:
101
+ def _find_provenance (self , discovery_functions : list [partial [list [ProvenanceAsset ]]]) -> list [ProvenanceAsset ]:
92
102
"""Find the provenance file(s) using the passed discovery functions.
93
103
94
104
Parameters
@@ -99,7 +109,7 @@ def _find_provenance(self, discovery_functions: list[partial[list[InTotoPayload]
99
109
Returns
100
110
-------
101
111
list[InTotoPayload]
102
- The provenance payload (s) from the first successful function, or an empty list if none were.
112
+ The provenance asset (s) from the first successful function, or an empty list if none were.
103
113
"""
104
114
if not discovery_functions :
105
115
return []
@@ -114,7 +124,7 @@ def _find_provenance(self, discovery_functions: list[partial[list[InTotoPayload]
114
124
return []
115
125
116
126
117
- def find_npm_provenance (purl : PackageURL , registry : NPMRegistry ) -> list [InTotoPayload ]:
127
+ def find_npm_provenance (purl : PackageURL , registry : NPMRegistry ) -> list [ProvenanceAsset ]:
118
128
"""Find and download the NPM based provenance for the passed PURL.
119
129
120
130
Two kinds of attestation can be retrieved from npm: "Provenance" and "Publish". The "Provenance" attestation
@@ -131,8 +141,8 @@ def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoP
131
141
132
142
Returns
133
143
-------
134
- list[InTotoPayload ]
135
- The provenance payload (s), or an empty list if not found.
144
+ list[ProvenanceAsset ]
145
+ The provenance asset (s), or an empty list if not found.
136
146
"""
137
147
if not registry .enabled :
138
148
logger .debug ("The npm registry is not enabled." )
@@ -178,16 +188,19 @@ def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoP
178
188
publish_payload = load_provenance_payload (signed_download_path )
179
189
except LoadIntotoAttestationError as error :
180
190
logger .error ("Error while loading publish attestation: %s" , error )
181
- return [provenance_payload ]
191
+ return [ProvenanceAsset ( provenance_payload , npm_provenance_asset . name , npm_provenance_asset . url ) ]
182
192
183
- return [provenance_payload , publish_payload ]
193
+ return [
194
+ ProvenanceAsset (provenance_payload , npm_provenance_asset .name , npm_provenance_asset .url ),
195
+ ProvenanceAsset (publish_payload , npm_provenance_asset .name , npm_provenance_asset .url ),
196
+ ]
184
197
185
198
except OSError as error :
186
199
logger .error ("Error while storing provenance in the temporary directory: %s" , error )
187
200
return []
188
201
189
202
190
- def find_gav_provenance (purl : PackageURL , registry : JFrogMavenRegistry ) -> list [InTotoPayload ]:
203
+ def find_gav_provenance (purl : PackageURL , registry : JFrogMavenRegistry ) -> list [ProvenanceAsset ]:
191
204
"""Find and download the GAV based provenance for the passed PURL.
192
205
193
206
Parameters
@@ -199,8 +212,8 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
199
212
200
213
Returns
201
214
-------
202
- list[InTotoPayload ] | None
203
- The provenance payload if found, or an empty list otherwise.
215
+ list[ProvenanceAsset ] | None
216
+ The provenance asset if found, or an empty list otherwise.
204
217
205
218
Raises
206
219
------
@@ -269,7 +282,7 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
269
282
if not is_witness_provenance_payload (provenance_payload , witness_verifier_config .predicate_types ):
270
283
continue
271
284
272
- provenances .append (provenance_payload )
285
+ provenances .append (ProvenanceAsset ( provenance_payload , provenance_asset . name , provenance_asset . url ) )
273
286
except OSError as error :
274
287
logger .error ("Error while storing provenance in the temporary directory: %s" , error )
275
288
@@ -281,7 +294,7 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
281
294
return provenances [:1 ]
282
295
283
296
284
- def find_pypi_provenance (purl : PackageURL ) -> list [InTotoPayload ]:
297
+ def find_pypi_provenance (purl : PackageURL ) -> list [ProvenanceAsset ]:
285
298
"""Find and download the PyPI based provenance for the passed PURL.
286
299
287
300
Parameters
@@ -291,11 +304,11 @@ def find_pypi_provenance(purl: PackageURL) -> list[InTotoPayload]:
291
304
292
305
Returns
293
306
-------
294
- list[InTotoPayload] | None
295
- The provenance payload if found, or an empty list otherwise.
307
+ list[ProvenanceAsset]
308
+ The provenance assets found, or an empty list otherwise.
296
309
"""
297
- attestation , verified = DepsDevRepoFinder .get_attestation (purl )
298
- if not attestation :
310
+ attestation , url , verified = DepsDevRepoFinder .get_attestation (purl )
311
+ if not ( attestation and url ) :
299
312
return []
300
313
301
314
with tempfile .TemporaryDirectory () as temp_dir :
@@ -306,15 +319,15 @@ def find_pypi_provenance(purl: PackageURL) -> list[InTotoPayload]:
306
319
try :
307
320
payload = load_provenance_payload (file_name )
308
321
payload .verified = verified
309
- return [payload ]
322
+ return [ProvenanceAsset ( payload , purl . name , url ) ]
310
323
except LoadIntotoAttestationError as load_error :
311
324
logger .error ("Error while loading provenance: %s" , load_error )
312
325
return []
313
326
314
327
315
328
def find_provenance_from_ci (
316
329
analyze_ctx : AnalyzeContext , git_obj : Git | None , download_path : str
317
- ) -> InTotoPayload | None :
330
+ ) -> ProvenanceAsset | None :
318
331
"""Try to find provenance from CI services of the repository.
319
332
320
333
Note that we stop going through the CI services once we encounter a CI service
@@ -409,7 +422,10 @@ def find_provenance_from_ci(
409
422
download_provenances_from_ci_service (ci_info , download_path )
410
423
411
424
# TODO consider how to handle multiple payloads here.
412
- return ci_info ["provenances" ][0 ].payload if ci_info ["provenances" ] else None
425
+ if ci_info ["provenances" ]:
426
+ provenance = ci_info ["provenances" ][0 ]
427
+ return ProvenanceAsset (provenance .payload , provenance .asset .name , provenance .asset .url )
428
+ return None
413
429
414
430
else :
415
431
logger .debug ("CI service not supported for provenance finding: %s" , ci_service .name )
0 commit comments