diff --git a/src/model_signing/signing/in_toto.py b/src/model_signing/signing/in_toto.py index c797e639..90e4e042 100644 --- a/src/model_signing/signing/in_toto.py +++ b/src/model_signing/signing/in_toto.py @@ -75,7 +75,7 @@ def manifest_from_payload( if predicate_type == subcls.predicate_type: return subcls.manifest_from_payload(payload) - raise ValueError("Unknown in-toto predicate type {predicate_type}") + raise ValueError(f"Unknown in-toto predicate type {predicate_type}") class SingleDigestIntotoPayload(IntotoPayload): @@ -182,10 +182,10 @@ def manifest_from_payload( predicate = payload["predicate"] if len(subjects) != 1: - raise ValueError("Expected one single subject, got {subjects}") + raise ValueError(f"Expected one single subject, got {subjects}") algorithm = predicate["actual_hash_algorithm"] - digest_value = subjects[0]["digest"]["sha256"] + digest_value = bytes.fromhex(subjects[0]["digest"]["sha256"]) digest = hashing.Digest(algorithm, digest_value) return manifest_module.DigestManifest(digest) @@ -343,7 +343,7 @@ def manifest_from_payload( predicate = payload["predicate"] if len(subjects) != 1: - raise ValueError("Expected one single subject, got {subjects}") + raise ValueError(f"Expected one single subject, got {subjects}") hasher = memory.SHA256() items = [] @@ -360,7 +360,7 @@ def manifest_from_payload( obtained_digest = hasher.compute().digest_hex if obtained_digest != expected_digest: raise ValueError( - f"Verification failed. " + "Verification failed. " f"Expected {expected_digest}, got {obtained_digest}" ) @@ -486,7 +486,7 @@ def manifest_from_payload( predicate = payload["predicate"] if len(subjects) != 1: - raise ValueError("Expected one single subject, got {subjects}") + raise ValueError(f"Expected one single subject, got {subjects}") hasher = memory.SHA256() items = [] @@ -505,7 +505,7 @@ def manifest_from_payload( obtained_digest = hasher.compute().digest_hex if obtained_digest != expected_digest: raise ValueError( - f"Verification failed. " + "Verification failed. " f"Expected {expected_digest}, got {obtained_digest}" ) diff --git a/src/model_signing/signing/sigstore.py b/src/model_signing/signing/sigstore.py index 10da0271..ff8dd465 100644 --- a/src/model_signing/signing/sigstore.py +++ b/src/model_signing/signing/sigstore.py @@ -337,8 +337,8 @@ def verify(self, signature: signing.Signature) -> manifest.Manifest: if payload_type != _IN_TOTO_JSON_PAYLOAD_TYPE: raise ValueError( - f"Only {_IN_TOTO_JSON_PAYLOAD_TYPE} DSSE payload acceped, " - f"got {payload_type}" + f"Expected DSSE payload {_IN_TOTO_JSON_PAYLOAD_TYPE}, " + f"but got {payload_type}" ) payload = json.loads(payload) diff --git a/tests/signing/sigstore_test.py b/tests/signing/sigstore_test.py new file mode 100644 index 00000000..8a459302 --- /dev/null +++ b/tests/signing/sigstore_test.py @@ -0,0 +1,651 @@ +# Copyright 2024 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for signing and verification with Sigstore.""" + +import json +import pathlib +from typing import Self +from unittest import mock + +import pytest + +from model_signing.hashing import file +from model_signing.hashing import memory +from model_signing.serialization import serialize_by_file +from model_signing.serialization import serialize_by_file_shard +from model_signing.signing import as_bytes +from model_signing.signing import empty_signing +from model_signing.signing import in_toto +from model_signing.signing import sigstore + + +class MockedSigstoreBundle: + """Mocked SigstoreBundle that just records the signed payload.""" + + def __init__(self, data: object): + self._data = data + + def to_json(self) -> str: + """Convert the bundle to json for saving. + + Since we just store the signed payload, we need to differentiate between + the case when this is already an in-toto statement or just the bytes + (differentiate between `sign_dsse`/`sign_artifact` results). For bytes, + we create a fake json object. + """ + if hasattr(self._data, "_contents"): + return self._data._contents.decode("utf-8") + + return json.dumps({"test_payload": self._data.hex()}) + + @classmethod + def from_json(cls, data) -> Self: + """Reads a bundle from json. + + Assumptions in `to_json` must be maintained here, specifically for the + fake json object created for the bytes payload. + """ + json_data = json.loads(data) + if "test_payload" in json_data: + return cls(json_data["test_payload"]) + return cls(json_data) + + +def _mocked_verify_dsse( + bundle, policy, json_type=sigstore._IN_TOTO_JSON_PAYLOAD_TYPE +): + """Mocked replacement for `sigstore.Verifier.verify_dsse`.""" + return json_type, json.dumps(bundle._data) + + +@pytest.fixture +def mocked_oidc_provider(): + with mock.patch.object( + sigstore.sigstore_oidc, "Issuer", autospec=True + ) as mocked_issuer: + mocked_issuer.return_value.identity_token.return_value = "fake_token" + yield mocked_issuer + + +@pytest.fixture +def mocked_oidc_ambient(): + with mock.patch.multiple( + sigstore.sigstore_oidc, + detect_credential=mock.DEFAULT, + IdentityToken=mock.DEFAULT, + autospec=True, + ) as mocked_objects: + mocked_detect_credential = mocked_objects["detect_credential"] + mocked_detect_credential.return_value = "fake_token" + + mocked_identity_token = mocked_objects["IdentityToken"] + mocked_identity_token.return_value = "fake_token" + + yield mocked_objects + + +@pytest.fixture +def mocked_sigstore_models(): + with mock.patch.object( + sigstore.sigstore_models, "Bundle", autospec=True + ) as mocked_bundle: + mocked_bundle.from_json = MockedSigstoreBundle.from_json + yield mocked_bundle + + +@pytest.fixture +def mocked_sigstore_signer(): + with mock.patch.object( + sigstore.sigstore_signer, "Signer", autospec=True + ) as mocked_signer: + mocked_signer.return_value.sign_artifact = MockedSigstoreBundle + mocked_signer.return_value.sign_dsse = MockedSigstoreBundle + yield mocked_signer + + +@pytest.fixture +def mocked_sigstore_verifier(): + with mock.patch.object( + sigstore.sigstore_verifier, "Verifier", autospec=True + ) as mocked_verifier: + mocked_verifier.verify_dsse = _mocked_verify_dsse + mocked_verifier.staging = lambda: mocked_verifier + mocked_verifier.production = lambda: mocked_verifier + yield mocked_verifier + + +@pytest.fixture +def mocked_sigstore_verifier_bad_payload(): + def _verify_dsse(bundle, policy): + return _mocked_verify_dsse(bundle, policy, "not DDSE") + + with mock.patch.object( + sigstore.sigstore_verifier, "Verifier", autospec=True + ) as mocked_verifier: + mocked_verifier.verify_dsse = _verify_dsse + mocked_verifier.staging = lambda: mocked_verifier + yield mocked_verifier + + +@pytest.fixture +def mocked_sigstore( + mocked_oidc_provider, + mocked_sigstore_models, + mocked_sigstore_signer, + mocked_sigstore_verifier, +): + """Collect all sigstore mocking fixtures in just one.""" + return True # keep in scope + + +class TestSigstoreSigning: + def _file_hasher_factory(self, path: pathlib.Path) -> file.FileHasher: + return file.SimpleFileHasher(path, memory.SHA256()) + + def _shard_hasher_factory( + self, path: pathlib.Path, start: int, end: int + ) -> file.ShardedFileHasher: + return file.ShardedFileHasher( + path, memory.SHA256(), start=start, end=end + ) + + def _sign_manifest( + self, + manifest, + signature_path, + payload_type, + signer_type, + use_staging=True, + oidc_issuer=None, + ): + payload = payload_type.from_manifest(manifest) + signer = signer_type(use_staging=use_staging, oidc_issuer=oidc_issuer) + signature = signer.sign(payload) + signature.write(signature_path) + + def _verify_artifact_signature( + self, manifest, signature_path, use_staging=True + ): + signature = sigstore.SigstoreSignature.read(signature_path) + verifier = sigstore.SigstoreArtifactVerifier( + expected_digest=manifest.digest.digest_value, + identity="test", + oidc_issuer="test", + use_staging=use_staging, + ) + return verifier.verify(signature) + + def _verify_dsse_signature(self, signature_path, use_staging=True): + signature = sigstore.SigstoreSignature.read(signature_path) + verifier = sigstore.SigstoreDSSEVerifier( + identity="test", oidc_issuer="test", use_staging=use_staging + ) + return verifier.verify(signature) + + def test_sign_verify_artifacts( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + # Serialize and sign model + file_hasher = file.SimpleFileHasher( + pathlib.Path("unused"), memory.SHA256() + ) + serializer = serialize_by_file.DigestSerializer( + file_hasher, memory.SHA256, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + as_bytes.BytesPayload, + sigstore.SigstoreArtifactSigner, + ) + + # Read signature and check against expected serialization + local_manifest = serializer.serialize(sample_model_folder) + expected_manifest = self._verify_artifact_signature( + local_manifest, signature_path + ) + assert expected_manifest == manifest + + def test_sign_verify_dsse_single_digest( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + # Serialize and sign model + file_hasher = file.SimpleFileHasher( + pathlib.Path("unused"), memory.SHA256() + ) + serializer = serialize_by_file.DigestSerializer( + file_hasher, memory.SHA256, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.SingleDigestIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + # Read signature and check against expected serialization + expected_manifest = self._verify_dsse_signature(signature_path) + assert expected_manifest == manifest + + def test_sign_verify_dsse_digest_of_digests( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + # Serialize and sign model + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestOfDigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + # Read signature and check against expected serialization + expected_manifest = self._verify_dsse_signature(signature_path) + assert expected_manifest == manifest + + def test_sign_verify_dsse_digests( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + # Serialize and sign model + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + # Read signature and check against expected serialization + expected_manifest = self._verify_dsse_signature(signature_path) + assert expected_manifest == manifest + + def test_sign_verify_dsse_digest_of_digests_sharded( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + # Serialize and sign model + serializer = serialize_by_file_shard.ManifestSerializer( + self._shard_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestOfShardDigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + # Read signature and check against expected serialization + expected_manifest = self._verify_dsse_signature(signature_path) + assert expected_manifest == manifest + + def test_sign_verify_dsse_digests_sharded( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + # Serialize and sign model + serializer = serialize_by_file_shard.ManifestSerializer( + self._shard_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.ShardDigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + # Read signature and check against expected serialization + expected_manifest = self._verify_dsse_signature(signature_path) + assert expected_manifest == manifest + + def test_sign_verify_mocked_prod( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + # Serialize and sign model + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + use_staging=False, + ) + + # Read signature and check against expected serialization + expected_manifest = self._verify_dsse_signature( + signature_path, use_staging=False + ) + assert expected_manifest == manifest + + def test_sign_verify_mocked_prod_oidc( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + # Serialize and sign model + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + use_staging=False, + oidc_issuer="test", + ) + + # Read signature and check against expected serialization + expected_manifest = self._verify_dsse_signature( + signature_path, use_staging=False + ) + assert expected_manifest == manifest + + def test_sign_verify_mocked_ambient( + self, + sample_model_folder, + mocked_sigstore, + mocked_oidc_ambient, + tmp_path, + ): + # Serialize and sign model + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + # Read signature and check against expected serialization + expected_manifest = self._verify_dsse_signature(signature_path) + assert expected_manifest == manifest + + def test_sign_digest_as_bytes( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + + with pytest.raises( + TypeError, match="Only `BytesPayload` payloads are supported" + ): + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestsIntotoPayload, + sigstore.SigstoreArtifactSigner, + ) + + def test_sign_bytes_as_digest( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + file_hasher = file.SimpleFileHasher( + pathlib.Path("unused"), memory.SHA256() + ) + serializer = serialize_by_file.DigestSerializer( + file_hasher, memory.SHA256, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + + with pytest.raises( + TypeError, match="Only `IntotoPayload` payloads are supported" + ): + self._sign_manifest( + manifest, + signature_path, + as_bytes.BytesPayload, + sigstore.SigstoreDSSESigner, + ) + + def test_verify_artifact_signature_not_sigstore(self, mocked_sigstore): + signature = empty_signing.EmptySignature() + verifier = sigstore.SigstoreArtifactVerifier( + expected_digest="", identity="", oidc_issuer="" + ) + + with pytest.raises( + TypeError, match="Only `SigstoreSignature` signatures are supported" + ): + verifier.verify(signature) + + def test_verify_dsse_signature_not_sigstore(self, mocked_sigstore): + signature = empty_signing.EmptySignature() + verifier = sigstore.SigstoreDSSEVerifier(identity="", oidc_issuer="") + + with pytest.raises( + TypeError, match="Only `SigstoreSignature` signatures are supported" + ): + verifier.verify(signature) + + def test_verify_not_into_json_payload( + self, + sample_model_folder, + mocked_oidc_provider, + mocked_sigstore_signer, + mocked_sigstore_models, + mocked_sigstore_verifier_bad_payload, + tmp_path, + ): + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + with pytest.raises(ValueError, match="Expected DSSE payload"): + self._verify_dsse_signature(signature_path) + + def test_verify_not_intoto_statement( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + correct_signature = signature_path.read_text() + json_signature = json.loads(correct_signature) + json_signature["_type"] = "Not in-toto" + invalid_signature = json.dumps(json_signature) + signature_path.write_text(invalid_signature) + + with pytest.raises(ValueError, match="Expected in-toto .* payload"): + self._verify_dsse_signature(signature_path) + + def test_verify_intoto_predicate_not_matched( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + correct_signature = signature_path.read_text() + json_signature = json.loads(correct_signature) + json_signature["predicateType"] = "Invalid" + invalid_signature = json.dumps(json_signature) + signature_path.write_text(invalid_signature) + + with pytest.raises(ValueError, match="Unknown in-toto predicate type"): + self._verify_dsse_signature(signature_path) + + def test_verify_intoto_single_digest_more_than_one_digests( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + file_hasher = file.SimpleFileHasher( + pathlib.Path("unused"), memory.SHA256() + ) + serializer = serialize_by_file.DigestSerializer( + file_hasher, memory.SHA256, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.SingleDigestIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + correct_signature = signature_path.read_text() + json_signature = json.loads(correct_signature) + json_signature["subject"].extend(json_signature["subject"]) + invalid_signature = json.dumps(json_signature) + signature_path.write_text(invalid_signature) + + with pytest.raises(ValueError, match="Expected one single subject"): + self._verify_dsse_signature(signature_path) + + def test_verify_intoto_digest_of_digests_more_than_one_digests( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestOfDigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + correct_signature = signature_path.read_text() + json_signature = json.loads(correct_signature) + json_signature["subject"].extend(json_signature["subject"]) + invalid_signature = json.dumps(json_signature) + signature_path.write_text(invalid_signature) + + with pytest.raises(ValueError, match="Expected one single subject"): + self._verify_dsse_signature(signature_path) + + def test_verify_intoto_digest_of_digests_invalid_root_digest( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + serializer = serialize_by_file.ManifestSerializer( + self._file_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestOfDigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + correct_signature = signature_path.read_text() + json_signature = json.loads(correct_signature) + json_signature["subject"][0]["digest"]["sha256"] = "invalid" + invalid_signature = json.dumps(json_signature) + signature_path.write_text(invalid_signature) + + with pytest.raises(ValueError, match="Verification failed"): + self._verify_dsse_signature(signature_path) + + def test_verify_intoto_digest_of_shard_digests_more_than_one_digests( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + serializer = serialize_by_file_shard.ManifestSerializer( + self._shard_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestOfShardDigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + correct_signature = signature_path.read_text() + json_signature = json.loads(correct_signature) + json_signature["subject"].extend(json_signature["subject"]) + invalid_signature = json.dumps(json_signature) + signature_path.write_text(invalid_signature) + + with pytest.raises(ValueError, match="Expected one single subject"): + self._verify_dsse_signature(signature_path) + + def test_verify_intoto_digest_of_shard_digests_invalid_root_digest( + self, sample_model_folder, mocked_sigstore, tmp_path + ): + serializer = serialize_by_file_shard.ManifestSerializer( + self._shard_hasher_factory, allow_symlinks=True + ) + manifest = serializer.serialize(sample_model_folder) + signature_path = tmp_path / "model.sig" + self._sign_manifest( + manifest, + signature_path, + in_toto.DigestOfShardDigestsIntotoPayload, + sigstore.SigstoreDSSESigner, + ) + + correct_signature = signature_path.read_text() + json_signature = json.loads(correct_signature) + json_signature["subject"][0]["digest"]["sha256"] = "invalid" + invalid_signature = json.dumps(json_signature) + signature_path.write_text(invalid_signature) + + with pytest.raises(ValueError, match="Verification failed"): + self._verify_dsse_signature(signature_path)