Skip to content

Commit

Permalink
Fix L3-iGrant/api#514: Filter presentation for sd-jwt credential is m…
Browse files Browse the repository at this point in the history
…atching the inner elements of an sd-jwt field
  • Loading branch information
albinpa authored and georgepadayatti committed Oct 22, 2024
1 parent e96429f commit 5ba002b
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 4 deletions.
157 changes: 157 additions & 0 deletions sdjwt/pex.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from jsonschema import exceptions, validate, ValidationError
import json
import uuid
import base64
from typing import List, Dict, Union, Optional, Tuple, Any
from pydantic import BaseModel
from sdjwt.didkey import DIDKey
from sdjwt.sdjwt import get_all_disclosures_with_sd_from_token, decode_disclosure_base64
from jwcrypto import jwk, jwt
from jsonpath_ng import jsonpath, parse
from dataclasses import dataclass
Expand Down Expand Up @@ -487,3 +489,158 @@ def update_disclosures_in_token(token: str, disclosures: list) -> str:

sd_jwt = jwt_token + sd_string
return sd_jwt


def decode_credential_sd_to_credential_subject_with_key_mapping(
disclosure_mapping: dict, credential_subject: dict
) -> dict:
credential_subject = {"credentialSubject": credential_subject}
_credentialSubject = {**credential_subject}
key_mapping = {}

def replace_sd_with_credential_subject_attributes(sds: list, disclosure: dict):
credential_attribute = {}
for sd in sds:
disclosure_base64 = disclosure.get(sd)
key, value = decode_disclosure_base64(disclosure_base64=disclosure_base64)
id = str(uuid.uuid4())
key_mapping[id] = value
credential_attribute[key] = id
return credential_attribute, key_mapping

def update_value(obj, path, credential_attribute):
# Construct json path dot notation
dot_notation_path = ".".join(path)

# Find matches for the json path
jp = parse(dot_notation_path)
matches = jp.find(obj)

# Iterate through the matches
for match in matches:
if isinstance(match.context.value, dict):
match.context.value[str(match.path)].pop("_sd")
for key, value in credential_attribute.items():
match.context.value[str(match.path)][key] = value

def iterate_mapping(obj, path):
for key, value in obj.items():

if isinstance(value, dict):
new_path = path + [f"'{key}'"]
# Check if sd is present or not
if "_sd" in value and value["_sd"]:
credential_attribute, key_mapping = (
replace_sd_with_credential_subject_attributes(
value["_sd"], disclosure=disclosure_mapping
)
)
update_value(_credentialSubject, new_path, credential_attribute)
iterate_mapping(value, new_path)

iterate_mapping(credential_subject, [])
return credential_subject["credentialSubject"], key_mapping


def match_credentials_for_sd_jwt(
input_descriptor_json,
credentials,
) -> Tuple[List[MatchedCredential], Optional[Exception]]:
# Deserialise input descriptor json string
try:
descriptor = json.loads(input_descriptor_json)
except json.JSONDecodeError as e:
return [], e

# To store the matched credentials
matches = []

# Iterate through each credential
for item in credentials:
for credential_id, credential_token in item.items():

# Assume credential matches until proven otherwise
credential_matched = True
matched_fields = []
disclosure_mapping = get_all_disclosures_with_sd_from_token(
token=credential_token
)
_, credential_decoded = decode_header_and_claims_in_jwt(credential_token)
credential_subject, key_mapping = (
decode_credential_sd_to_credential_subject_with_key_mapping(
disclosure_mapping=disclosure_mapping,
credential_subject=credential_decoded.get("vc").get(
"credentialSubject"
),
)
)
credential = credential_decoded.get("vc")
credential["credentialSubject"] = credential_subject
credential = json.dumps(credential)

# Iterate through fields specified in the constraints
for field_index, field in enumerate(descriptor["constraints"]["fields"]):

# Assume field matches until proven otherwise
field_matched = False

# Iterate through JSON paths for the current field
for path_index, path in enumerate(field["path"]):

# Apply JSON path on the credential
path_matches, err = apply_json_path(credential, path)

if len(path_matches) > 0 and err is None:
if "filter" in field:
try:
filter_bytes = json.dumps(field["filter"])
except (TypeError, ValueError) as e:
# Continue to next path, since filter has failed to serialise
continue

# Validate the matched JSON against the field's filter
if (
validate_json_schema(path_matches[0], filter_bytes)
is not None
):
# Field doesn't match since validation failed
field_matched = False
break

# Add the matched field to the list
field_matched = True
value = key_mapping.get(str(path_matches[0]))
if value:

matched_fields.append(
MatchedField(
index=field_index,
path=MatchedPath(
path=path, index=path_index, value=value
),
)
)
else:
matched_fields.append(
MatchedField(
index=field_index,
path=MatchedPath(
path=path,
index=path_index,
value=path_matches[0],
),
)
)

if not field_matched:
# If any one field didn't match then move to next credential
credential_matched = False
break

if credential_matched:
# All fields matched, then credential is matched
matches.append(
MatchedCredential(index=credential_id, fields=matched_fields)
)

return matches, None
Loading

0 comments on commit 5ba002b

Please sign in to comment.