Skip to content

Commit

Permalink
Fix L3-iGrant/api#651: Ensure full support of IETF SD-JWT VC
Browse files Browse the repository at this point in the history
  • Loading branch information
albinpa authored and georgepadayatti committed Oct 17, 2024
1 parent 6d7d2ef commit bb130ac
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 15 deletions.
32 changes: 19 additions & 13 deletions sdjwt/pex.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class PresentationDefinition(BaseModel):
"type": "object",
"additionalProperties": False,
"patternProperties": {
"^(jwt|jwt_vc|jwt_vp|vp\+sd-jwt|vc\+sd-jwt|sd-jwt)$": {
"^(jwt|jwt_vc|jwt_vc_json|jwt_vp|vp\+sd-jwt|vc\+sd-jwt|sd-jwt)$": {
"type": "object",
"additionalProperties": False,
"properties": {
Expand Down Expand Up @@ -229,6 +229,7 @@ class PresentationDefinitionValidationError(Exception):
"vp+sd-jwt",
"sd-jwt",
"mso_mdoc",
"jwt_vc_json",
],
},
},
Expand Down Expand Up @@ -524,7 +525,7 @@ def find_all_sd_values(data):
# Function to extract relevant disclosure values
def extract_disclosure_values(input_descriptor, credential, disclosure):
fields = input_descriptor["constraints"]["fields"]
sd_values = find_all_sd_values(credential["credentialSubject"])
sd_values = find_all_sd_values(credential)

matching_disclosures = []
for field in fields:
Expand Down Expand Up @@ -628,13 +629,11 @@ def match_credentials_for_sd_jwt(
credential_subject, key_mapping = (
decode_credential_sd_to_credential_subject_with_key_mapping(
disclosure_mapping=disclosure_mapping,
credential_subject=credential_decoded.get("vc").get(
"credentialSubject"
),
credential_subject=credential_decoded
)
)
credential = credential_decoded.get("vc")
credential["credentialSubject"] = credential_subject
credential = credential_decoded
credential = credential_subject
credential = json.dumps(credential)

# Iterate through fields specified in the constraints
Expand Down Expand Up @@ -768,10 +767,10 @@ def validate_vp_token(
disclosure_mapping = get_all_disclosures_with_sd_from_token(vc_token)

credential_subject = create_credential_subject_for_sdjwt(
credential_subject=vc_claims.get("vc").get("credentialSubject"),
credential_subject=vc_claims,
disclosure_mapping=disclosure_mapping,
)
vc_claims["vc"]["credentialSubject"] = credential_subject
vc_claims = credential_subject
elif vc_claims and format == "jwt_vc":
pass

Expand All @@ -780,10 +779,17 @@ def validate_vp_token(
)
for input_descriptor in input_descriptors:
if input_descriptor.get("id") == id:
matches = match_credentials(
json.dumps(input_descriptor),
credentials=[json.dumps(vc_claims["vc"])],
)
limit_disclosure = input_descriptor.get("constraints").get("limit_disclosure",None)
if limit_disclosure and limit_disclosure == "required":
matches = match_credentials(
json.dumps(input_descriptor),
credentials=[json.dumps(vc_claims)],
)
else:
matches = match_credentials(
json.dumps(input_descriptor),
credentials=[json.dumps(vc_claims["vc"])],
)
if not matches or not matches[0]:
return False
else:
Expand Down
88 changes: 86 additions & 2 deletions sdjwt/sdjwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,90 @@ def _create_disclosure_mapping_from_credential_definition(data):

def create_disclosure_mapping_from_credential_definition(credential_definition):
data = credential_definition["properties"]
disclosure_mapping = {}
disclosure_mapping["credentialSubject"] = _create_disclosure_mapping_from_credential_definition(data)
disclosure_mapping = _create_disclosure_mapping_from_credential_definition(data)
return disclosure_mapping


def create_vc_sd_jwt(
jti: str,
iss: str,
sub: str,
kid: str,
key: jwk.JWK,
vct: str,
credential_subject: dict,
disclosure_mapping: typing.Optional[dict] = None,
expiry_in_seconds: typing.Optional[int] = None,
) -> str:
if not expiry_in_seconds:
expiry_in_seconds = 2592000
issuance_epoch, issuance_8601 = (
get_current_datetime_in_epoch_seconds_and_iso8601_format()
)
expiration_epoch, expiration_8601 = (
get_current_datetime_in_epoch_seconds_and_iso8601_format(expiry_in_seconds)
)
_credentialSubject = {**credential_subject}
if disclosure_mapping:
disclosures = []

def calculate_sd(name, value):
_sd = []
disclosure_base64 = create_disclosure_base64(
create_random_salt(32), key=name, value=value
)
sd = create_sd_from_disclosure_base64(disclosure_base64)
disclosures.append(disclosure_base64)
_sd.append(sd)
return _sd

def update_value(obj, path):
# Construct json path dot notation
dot_notation_path = ".".join(path)

# Find matches for the json path
jp = parse(dot_notation_path)
matches = jp.find(obj)

# Iterate through the matches and calculated sd
for match in matches:
sd = calculate_sd(str(match.path), match.value)
if isinstance(match.context.value, dict):
if not match.context.value.get("_sd"):
match.context.value.setdefault("_sd", sd)
del match.context.value[str(match.path)]
else:
match.context.value["_sd"].extend(sd)
del match.context.value[str(match.path)]

def iterate_mapping(obj, path):
for key, value in obj.items():
if isinstance(value, dict):
new_path = path + [f"'{key}'"]
# Check if limitDisclosure is present or not
if "limitDisclosure" in value and value["limitDisclosure"]:
update_value(_credentialSubject, new_path)
iterate_mapping(value, new_path)

# Iterate through disclosure mapping
# and add sd to the corresponding field in the
# credential subject
iterate_mapping(disclosure_mapping, [])


jwt_credential = create_jwt(
jti=jti,
sub=sub,
iss=iss,
kid=kid,
key=key,
iat=issuance_epoch,
exp=expiration_epoch,
vct=vct,
**_credentialSubject,
)
sd_disclosures = ""
if disclosure_mapping:
sd_disclosures = "~" + "~".join(disclosures)

return jwt_credential + sd_disclosures

0 comments on commit bb130ac

Please sign in to comment.