Skip to content

Commit

Permalink
✨(api) enrich statements on post and put
Browse files Browse the repository at this point in the history
The xAPI specification indicates to infer the fields `authority`, `stored`,
`timestamp` and `id` (discouraging use of `version`), when recieving statements.
This commit implements this requirement, thus paving the way to proper
permissions management (through `authority`).
  • Loading branch information
Leobouloc committed Aug 28, 2023
1 parent 560b43d commit f1cdd67
Show file tree
Hide file tree
Showing 8 changed files with 519 additions and 38 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ have an authority field matching that of the user
- API: Forwarding PUT now uses PUT (instead of POST)
- Models: The xAPI `context.contextActivities.category` field is now mandatory
in the video and virtual classroom profiles. [BC]
- Backends: `LRSHTTP` methods must not be used in `asyncio` events loop (BC)
- Backends: `LRSHTTP` methods must not be used in `asyncio` events loop (BC)
- API: Incoming statements are enriched with `id`, `timestamp`, `stored`
and `authority`

## [3.9.0] - 2023-07-21

Expand Down
93 changes: 72 additions & 21 deletions src/ralph/api/routers/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ralph.api.auth import get_authenticated_user
from ralph.api.auth.user import AuthenticatedUser
from ralph.api.forwarding import forward_xapi_statements, get_active_xapi_forwardings
from ralph.api.models import ErrorDetail, LaxStatement
from ralph.backends.database.base import (
AgentParameters,
BaseDatabase,
Expand All @@ -40,8 +41,7 @@
BaseXapiAgentWithOpenId,
)
from ralph.models.xapi.base.common import IRI

from ..models import ErrorDetail, LaxStatement
from ralph.utils import now, statements_are_equivalent

logger = logging.getLogger(__name__)

Expand All @@ -67,6 +67,49 @@
}


def _enrich_statement_with_id(statement: dict):
# id: UUID
# https://github.com/adlnet/xAPI-Spec/blob/master/xAPI-Data.md#24-statement-properties
statement["id"] = str(statement.get("id", uuid4()))
return statement["id"]


def _enrich_statement_with_stored(statement: dict, value=None):
# stored: The time at which a Statement is stored by the LRS
# https://github.com/adlnet/xAPI-Spec/blob/1.0.3/xAPI-Data.md#248-stored
if value is None:
statement["stored"] = now()
else:
statement["stored"] = value
return statement["stored"]


def _enrich_statement_with_timestamp(statement: dict):
# timestamp: If not provided, same value as stored
# https://github.com/adlnet/xAPI-Spec/blob/master/xAPI-Data.md#247-timestamp
statement["timestamp"] = statement.get("timestamp", statement.get("stored", now()))
return statement["timestamp"]


def _enrich_statement_with_authority(statement: dict, current_user: AuthenticatedUser):
# authority: Information about whom or what has asserted that this statement is true
# https://github.com/adlnet/xAPI-Spec/blob/master/xAPI-Data.md#249-authority
authority = current_user.agent
if "authority" in statement and statement["authority"] != authority:
logger.error(
"Failed to index submitted statements. Submitted authority does not match."
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=(
"Stated `authority` does not match credentials. Change or remove"
"`authority` field from incoming statement.",
),
)
statement["authority"] = authority
return authority


def _parse_agent_parameters(agent_obj: dict):
"""Parse a dict and return an AgentParameters object to use in queries."""
# Transform agent to `dict` as FastAPI cannot parse JSON (seen as string)
Expand Down Expand Up @@ -356,8 +399,9 @@ async def get(
@router.put("", responses=POST_PUT_RESPONSES, status_code=status.HTTP_204_NO_CONTENT)
# pylint: disable=unused-argument
async def put(
current_user: Annotated[AuthenticatedUser, Depends(get_authenticated_user)],
# pylint: disable=invalid-name
statementId: str,
statementId: UUID,
statement: LaxStatement,
background_tasks: BackgroundTasks,
_=Depends(strict_query_params),
Expand All @@ -367,23 +411,27 @@ async def put(
LRS Specification:
https://github.com/adlnet/xAPI-Spec/blob/1.0.3/xAPI-Communication.md#211-put-statements
"""
statement_dict = {statementId: statement.dict(exclude_unset=True)}

# Force the UUID id in the statement to string, make sure it matches the
# statementId given in the URL.
statement_dict[statementId]["id"] = str(statement_dict[statementId]["id"])
statement_as_dict = statement.dict(exclude_unset=True)

if not statementId == statement_dict[statementId]["id"]:
statement_as_dict.update(id=str(statement_as_dict.get("id", statementId)))
if statementId != statement_as_dict["id"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="xAPI statement id does not match given statementId",
)

# Enrich statement before forwarding (NB: id is already set)
timestamp = _enrich_statement_with_timestamp(statement_as_dict)

if get_active_xapi_forwardings():
background_tasks.add_task(
forward_xapi_statements, statement_dict[statementId], method="put"
forward_xapi_statements, statement_as_dict, method="put"
)

# Finish enriching statements after forwarding
_enrich_statement_with_stored(statement_as_dict, timestamp)
_enrich_statement_with_authority(statement_as_dict, current_user)

try:
existing_statement = DATABASE_CLIENT.query_statements_by_ids([statementId])
except BackendException as error:
Expand All @@ -397,7 +445,7 @@ async def put(
# In the case that the current statement is not an exact duplicate of the one
# found in the database we return a 409, otherwise the usual 204.
for existing in existing_statement:
if statement_dict != existing["_source"]:
if not statements_are_equivalent(statement_as_dict, existing["_source"]):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="A different statement already exists with the same ID",
Expand All @@ -406,9 +454,7 @@ async def put(

# For valid requests, perform the bulk indexing of all incoming statements
try:
success_count = DATABASE_CLIENT.put(
statement_dict.values(), ignore_errors=False
)
success_count = DATABASE_CLIENT.put([statement_as_dict], ignore_errors=False)
except (BackendException, BadFormatException) as exc:
logger.error("Failed to index submitted statement")
raise HTTPException(
Expand All @@ -422,6 +468,7 @@ async def put(
@router.post("/", responses=POST_PUT_RESPONSES)
@router.post("", responses=POST_PUT_RESPONSES)
async def post(
current_user: Annotated[AuthenticatedUser, Depends(get_authenticated_user)],
statements: Union[LaxStatement, List[LaxStatement]],
background_tasks: BackgroundTasks,
response: Response,
Expand All @@ -438,14 +485,11 @@ async def post(
if not isinstance(statements, list):
statements = [statements]

# The statements dict has multiple functions:
# - generate IDs for statements that are missing them;
# - use the list of keys to perform validations and as a final return value;
# - provide an iterable containing both the statements and generated IDs for bulk.
# Enrich statements before forwarding
statements_dict = {}
for statement in map(lambda x: x.dict(exclude_unset=True), statements):
statement_id = str(statement.get("id", uuid4()))
statement["id"] = statement_id
statement_id = _enrich_statement_with_id(statement)
timestamp = _enrich_statement_with_timestamp(statement)
statements_dict[statement_id] = statement

# Requests with duplicate statement IDs are considered invalid
Expand All @@ -462,6 +506,11 @@ async def post(
forward_xapi_statements, list(statements_dict.values()), method="post"
)

# Finish enriching statements after forwarding
for statement in statements_dict.values():
_enrich_statement_with_stored(statement, value=timestamp)
_enrich_statement_with_authority(statement, current_user)

try:
existing_statements = DATABASE_CLIENT.query_statements_by_ids(statements_ids)
except BackendException as error:
Expand All @@ -482,7 +531,9 @@ async def post(

# The LRS specification calls for deep comparison of duplicates. This
# is done here. If they are not exactly the same, we raise an error.
if statements_dict[existing["_id"]] != existing["_source"]:
if not statements_are_equivalent(
statements_dict[existing["_id"]], existing["_source"]
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Differing statements already exist with the same ID: "
Expand Down
22 changes: 22 additions & 0 deletions src/ralph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,25 @@ async def sem_task(task):
except Exception as exception:
group.cancel()
raise exception


def statements_are_equivalent(statement_1: dict, statement_2: dict):
"""Check if statements are equivalent.
To be equivalent, they must be identical on all fields not modified on input by the
LRS and identical on other fields, if these fields are present in both
statements. For example, if an "authority" field is present in only one statement,
they may still be equivalent.
"""
# Check that unmutable fields have the same values
fields = ["actor", "verb", "object", "id", "result", "context", "attachements"]

# Check that some fields enriched by the LRS are equal when in both statements
# The LRS specification excludes the fields below from equivalency. It was
# decided to include them anyway as their value is inherent to the statements.
other_fields = {"timestamp", "version"} # "authority" and "stored" remain ignored.
fields.extend(other_fields & statement_1.keys() & statement_2.keys())

if any(statement_1.get(field) != statement_2.get(field) for field in fields):
return False
return True
Loading

0 comments on commit f1cdd67

Please sign in to comment.