From de086751bf76dbbfbb90b999a3eaf7adfa675c47 Mon Sep 17 00:00:00 2001 From: Sean Hammond Date: Tue, 10 Dec 2024 16:53:24 +0000 Subject: [PATCH] Allow users to call the add-membership API Allow users (not just authclients) to call the add-membership API. --- h/security/permission_map.py | 5 +- h/security/predicates.py | 36 ++- h/views/api/group_members.py | 13 +- tests/functional/api/groups/members_test.py | 249 ++++++++++++++++++- tests/unit/h/security/predicates_test.py | 95 +++++++ tests/unit/h/views/api/group_members_test.py | 12 +- 6 files changed, 387 insertions(+), 23 deletions(-) diff --git a/h/security/permission_map.py b/h/security/permission_map.py index 186b36ecaa2..42920ba2c3d 100644 --- a/h/security/permission_map.py +++ b/h/security/permission_map.py @@ -66,7 +66,10 @@ [p.group_has_user_as_admin], [p.group_has_user_as_moderator], ], - Permission.Group.MEMBER_ADD: [[p.group_matches_authenticated_client_authority]], + Permission.Group.MEMBER_ADD: [ + [p.group_member_add], + [p.group_matches_authenticated_client_authority], + ], Permission.Group.MEMBER_REMOVE: [[p.group_member_remove]], Permission.Group.MEMBER_EDIT: [[p.group_member_edit]], Permission.Group.MODERATE: GROUP_MODERATE_PREDICATES.values(), diff --git a/h/security/predicates.py b/h/security/predicates.py index 093e536db44..0bfceb11967 100644 --- a/h/security/predicates.py +++ b/h/security/predicates.py @@ -13,7 +13,11 @@ from itertools import chain from h.models.group import GroupMembershipRoles, JoinableBy, ReadableBy, WriteableBy -from h.traversal import EditGroupMembershipContext, GroupMembershipContext +from h.traversal import ( + AddGroupMembershipContext, + EditGroupMembershipContext, + GroupMembershipContext, +) def requires(*parent_predicates): @@ -217,6 +221,36 @@ def get_authenticated_users_membership(): ) +@requires(authenticated_user, group_found) +def group_member_add(identity, context: AddGroupMembershipContext): + assert ( + context.new_roles is not None + ), "new_roles must be set before checking permissions" + + def get_authenticated_users_roles(): + """Return the authenticated users roles in the target group.""" + for membership in identity.user.memberships: + if membership.group.id == context.group.id: + return membership.roles + + return [] + + authenticated_users_roles = get_authenticated_users_roles() + + if GroupMembershipRoles.OWNER in authenticated_users_roles: + # Owners can add members with any roles. + return True + + if GroupMembershipRoles.ADMIN in authenticated_users_roles: + # Admins can add members with any role below admin. + return ( + GroupMembershipRoles.OWNER not in context.new_roles + and GroupMembershipRoles.ADMIN not in context.new_roles + ) + + return False + + @requires(authenticated_user, group_found) def group_member_edit( identity, context: EditGroupMembershipContext diff --git a/h/views/api/group_members.py b/h/views/api/group_members.py index 07f6208ff41..3eeac9a48b6 100644 --- a/h/views/api/group_members.py +++ b/h/views/api/group_members.py @@ -3,6 +3,7 @@ from pyramid.config import not_ from pyramid.httpexceptions import HTTPConflict, HTTPNoContent, HTTPNotFound +from h.models import GroupMembershipRoles from h.presenters import GroupMembershipJSONPresenter from h.schemas.api.group_membership import EditGroupMembershipAPISchema from h.schemas.pagination import PaginationQueryParamsSchema @@ -99,7 +100,6 @@ def remove_member(context: GroupMembershipContext, request): request_method="POST", link_name="group.member.add", description="Add a user to a group", - permission=Permission.Group.MEMBER_ADD, ) def add_member(context: AddGroupMembershipContext, request): if context.user.authority != context.group.authority: @@ -107,17 +107,18 @@ def add_member(context: AddGroupMembershipContext, request): if request.body: appstruct = EditGroupMembershipAPISchema().validate(json_payload(request)) - roles = appstruct["roles"] + context.new_roles = appstruct["roles"] else: - # This doesn't mean the membership will be created with no roles: - # default roles will be applied by the service. - roles = None + context.new_roles = [GroupMembershipRoles.MEMBER] + + if not request.has_permission(Permission.Group.MEMBER_ADD, context): + raise HTTPNotFound() group_members_service = request.find_service(name="group_members") try: membership = group_members_service.member_join( - context.group, context.user.userid, roles + context.group, context.user.userid, context.new_roles ) except ConflictError as err: raise HTTPConflict(str(err)) from err diff --git a/tests/functional/api/groups/members_test.py b/tests/functional/api/groups/members_test.py index fe365a5c637..71319e94ea3 100644 --- a/tests/functional/api/groups/members_test.py +++ b/tests/functional/api/groups/members_test.py @@ -261,6 +261,242 @@ def test_it_returns_an_error_if_offset_and_limit_are_invalid( class TestAddMember: + @pytest.mark.parametrize( + "authenticated_users_roles,json,expected_roles", + [ + ( + [GroupMembershipRoles.OWNER], + {"roles": [GroupMembershipRoles.OWNER]}, + [GroupMembershipRoles.OWNER], + ), + ( + [GroupMembershipRoles.OWNER], + {"roles": [GroupMembershipRoles.ADMIN]}, + [GroupMembershipRoles.ADMIN], + ), + ( + [GroupMembershipRoles.OWNER], + {"roles": [GroupMembershipRoles.MODERATOR]}, + [GroupMembershipRoles.MODERATOR], + ), + ( + [GroupMembershipRoles.OWNER], + {"roles": [GroupMembershipRoles.MEMBER]}, + [GroupMembershipRoles.MEMBER], + ), + ( + [GroupMembershipRoles.OWNER], + None, + [GroupMembershipRoles.MEMBER], + ), + ( + [GroupMembershipRoles.ADMIN], + {"roles": [GroupMembershipRoles.OWNER]}, + None, + ), + ( + [GroupMembershipRoles.ADMIN], + {"roles": [GroupMembershipRoles.ADMIN]}, + None, + ), + ( + [GroupMembershipRoles.ADMIN], + {"roles": [GroupMembershipRoles.MODERATOR]}, + [GroupMembershipRoles.MODERATOR], + ), + ( + [GroupMembershipRoles.ADMIN], + {"roles": [GroupMembershipRoles.MEMBER]}, + [GroupMembershipRoles.MEMBER], + ), + ( + [GroupMembershipRoles.ADMIN], + None, + [GroupMembershipRoles.MEMBER], + ), + ( + [GroupMembershipRoles.MODERATOR], + {"roles": [GroupMembershipRoles.OWNER]}, + None, + ), + ( + [GroupMembershipRoles.MODERATOR], + {"roles": [GroupMembershipRoles.ADMIN]}, + None, + ), + ( + [GroupMembershipRoles.MODERATOR], + {"roles": [GroupMembershipRoles.MODERATOR]}, + None, + ), + ( + [GroupMembershipRoles.MODERATOR], + {"roles": [GroupMembershipRoles.MEMBER]}, + None, + ), + ( + [GroupMembershipRoles.MODERATOR], + None, + None, + ), + ( + [GroupMembershipRoles.MEMBER], + {"roles": [GroupMembershipRoles.OWNER]}, + None, + ), + ( + [GroupMembershipRoles.MEMBER], + {"roles": [GroupMembershipRoles.ADMIN]}, + None, + ), + ( + [GroupMembershipRoles.MEMBER], + {"roles": [GroupMembershipRoles.MODERATOR]}, + None, + ), + ( + [GroupMembershipRoles.MEMBER], + {"roles": [GroupMembershipRoles.MEMBER]}, + None, + ), + ( + [GroupMembershipRoles.MEMBER], + None, + None, + ), + ], + ) + def test_it( + self, + do_request, + group, + user, + authenticated_users_membership, + authenticated_users_roles, + json, + expected_roles, + ): + authenticated_users_membership.roles = authenticated_users_roles + + do_request(json=json, status=200 if expected_roles else 404) + + roles = None + for membership in group.memberships: + if membership.user == user: + roles = membership.roles + break + assert roles == expected_roles + + @pytest.mark.parametrize( + "roles,status", + [ + ([GroupMembershipRoles.OWNER], 200), + ([GroupMembershipRoles.MODERATOR], 409), + ], + ) + def test_me_alias(self, roles, status, do_request, group, authenticated_user): + do_request(userid="me", json={"roles": roles}, status=status) + + roles = None + for membership in group.memberships: + if membership.user == authenticated_user: + roles = membership.roles + assert roles == [GroupMembershipRoles.OWNER] + + def test_it_does_nothing_if_the_user_is_already_a_member_of_the_group( + self, do_request, group, user + ): + group.memberships.append(GroupMembership(user=user)) + + do_request() + + assert user in group.members + + def test_it_when_a_conflicting_membership_already_exists( + self, do_request, group, user + ): + group.memberships.append( + GroupMembership(user=user, roles=[GroupMembershipRoles.MEMBER]) + ) + + response = do_request( + json={"roles": [GroupMembershipRoles.MODERATOR]}, status=409 + ) + + assert ( + response.json["reason"] + == "The user is already a member of the group, with conflicting membership attributes" + ) + + def test_it_errors_if_the_pubid_is_unknown(self, do_request): + do_request(pubid="UNKNOWN_PUBID", status=404) + + def test_it_errors_if_the_userid_is_unknown(self, do_request, group): + do_request(userid="acct:UNKOWN_USERNAME@{group.authority}", status=404) + + def test_it_errors_if_the_userid_is_invalid(self, do_request): + do_request(userid="INVALID_USERID", status=404) + + def test_it_errors_if_the_request_isnt_authenticated(self, do_request, headers): + del headers["Authorization"] + + do_request(status=404) + + def test_it_errors_if_the_authenticated_user_isnt_a_member_of_the_group( + self, do_request, factories, headers + ): + headers.update( + token_authorization_header(factories.DeveloperToken(user=factories.User())) + ) + + do_request(status=404) + + @pytest.fixture(autouse=True) + def group(self, factories): + return factories.Group() + + @pytest.fixture(autouse=True) + def user(self, factories, group): + return factories.User(authority=group.authority) + + @pytest.fixture(autouse=True) + def authenticated_user(self, db_session, factories, group): + return factories.User(authority=group.authority) + + @pytest.fixture(autouse=True) + def authenticated_users_membership(self, db_session, authenticated_user, group): + membership = GroupMembership( + group=group, user=authenticated_user, roles=[GroupMembershipRoles.OWNER] + ) + db_session.add(membership) + return membership + + @pytest.fixture + def headers(self, factories, authenticated_user): + return token_authorization_header( + factories.DeveloperToken(user=authenticated_user) + ) + + @pytest.fixture + def do_request(self, app, db_session, group, user, headers): + def do_request( + pubid=group.pubid, + userid=user.userid, + json={"roles": ["member"]}, + headers=headers, + status=200, + ): + db_session.commit() + path = f"/api/groups/{pubid}/members/{userid}" + if json is None: + return app.post(path, headers=headers, status=status) + + return app.post_json(path, json, headers=headers, status=status) + + return do_request + + +class TestAddMemberWithAuthclientAuthentication: @pytest.mark.parametrize( "json,expected_roles", [ @@ -312,19 +548,6 @@ def test_it_errors_if_the_userid_is_unknown(self, do_request, authclient): def test_it_errors_if_the_userid_is_invalid(self, do_request): do_request(userid="INVALID_USERID", status=404) - def test_it_errors_if_the_request_isnt_authenticated(self, do_request, headers): - del headers["Authorization"] - - do_request(status=404) - - def test_it_errors_if_the_request_has_token_authentication( - self, do_request, factories, user, headers - ): - token = factories.DeveloperToken(user=user) - headers.update(token_authorization_header(token)) - - do_request(status=404) - def test_it_errors_if_the_groups_authority_doesnt_match( self, do_request, factories ): diff --git a/tests/unit/h/security/predicates_test.py b/tests/unit/h/security/predicates_test.py index b931b340e76..a93598885d6 100644 --- a/tests/unit/h/security/predicates_test.py +++ b/tests/unit/h/security/predicates_test.py @@ -13,6 +13,7 @@ from h.security import Identity, predicates from h.security.identity import LongLivedGroup, LongLivedMembership from h.traversal import ( + AddGroupMembershipContext, AnnotationContext, EditGroupMembershipContext, GroupMembershipContext, @@ -535,6 +536,100 @@ def context(self, group, user): ) +class TestGroupMemberAdd: + @pytest.mark.parametrize( + "authenticated_users_roles,target_users_roles,expected_result", + [ + ([GroupMembershipRoles.OWNER], [GroupMembershipRoles.OWNER], True), + ([GroupMembershipRoles.OWNER], [GroupMembershipRoles.ADMIN], True), + ([GroupMembershipRoles.OWNER], [GroupMembershipRoles.MODERATOR], True), + ([GroupMembershipRoles.OWNER], [GroupMembershipRoles.MEMBER], True), + ([GroupMembershipRoles.ADMIN], [GroupMembershipRoles.OWNER], False), + ([GroupMembershipRoles.ADMIN], [GroupMembershipRoles.ADMIN], False), + ([GroupMembershipRoles.ADMIN], [GroupMembershipRoles.MODERATOR], True), + ([GroupMembershipRoles.ADMIN], [GroupMembershipRoles.MEMBER], True), + ([GroupMembershipRoles.MODERATOR], [GroupMembershipRoles.OWNER], False), + ([GroupMembershipRoles.MODERATOR], [GroupMembershipRoles.ADMIN], False), + ([GroupMembershipRoles.MODERATOR], [GroupMembershipRoles.MODERATOR], False), + ([GroupMembershipRoles.MODERATOR], [GroupMembershipRoles.MEMBER], False), + ([GroupMembershipRoles.MEMBER], [GroupMembershipRoles.OWNER], False), + ([GroupMembershipRoles.MEMBER], [GroupMembershipRoles.ADMIN], False), + ([GroupMembershipRoles.MEMBER], [GroupMembershipRoles.MODERATOR], False), + ([GroupMembershipRoles.MEMBER], [GroupMembershipRoles.MEMBER], False), + (None, [GroupMembershipRoles.OWNER], False), + (None, [GroupMembershipRoles.ADMIN], False), + (None, [GroupMembershipRoles.MODERATOR], False), + (None, [GroupMembershipRoles.MEMBER], False), + ], + ) + def test_it( + self, + db_session, + factories, + identity, + group, + authenticated_users_roles, + target_users_roles, + expected_result, + ): + target_user = factories.User() + if authenticated_users_roles: + identity.user.memberships.append( + LongLivedMembership( + group=LongLivedGroup.from_model(group), + user=identity.user, + roles=authenticated_users_roles, + ) + ) + context = AddGroupMembershipContext( + group=group, user=target_user, new_roles=target_users_roles + ) + db_session.commit() + + assert predicates.group_member_add(identity, context) == expected_result + + def test_it_crashes_if_new_roles_is_not_set(self, identity): + context = AddGroupMembershipContext( + group=sentinel.group, user=sentinel.user, new_roles=None + ) + + with pytest.raises( + AssertionError, + match="^new_roles must be set before checking permissions$", + ): + predicates.group_member_add(identity, context) + + @pytest.fixture + def authenticated_user(self, db_session, authenticated_user, factories): + # Make the authenticated user a member of a *different* group, + # to make sure that unrelated memberships don't accidentally allow or + # deny permissions. + db_session.add( + GroupMembership( + user=authenticated_user, + group=factories.Group(), + roles=[GroupMembershipRoles.OWNER], + ) + ) + + return authenticated_user + + @pytest.fixture + def group(self, db_session, factories): + group = factories.Group() + + # Make a *different* user a member of the target group + # to make sure that unrelated memberships don't accidentally allow or + # deny permissions. + db_session.add( + GroupMembership( + group=group, user=factories.User(), roles=[GroupMembershipRoles.OWNER] + ) + ) + + return group + + class TestGroupMemberEdit: @pytest.mark.parametrize( "authenticated_users_roles,old_roles,new_roles,expected_result", diff --git a/tests/unit/h/views/api/group_members_test.py b/tests/unit/h/views/api/group_members_test.py index bae12093f09..d527441d764 100644 --- a/tests/unit/h/views/api/group_members_test.py +++ b/tests/unit/h/views/api/group_members_test.py @@ -6,7 +6,7 @@ import h.views.api.group_members as views from h import presenters -from h.models import GroupMembership +from h.models import GroupMembership, GroupMembershipRoles from h.schemas.base import ValidationError from h.security import Permission from h.security.identity import Identity, LongLivedGroup, LongLivedMembership @@ -171,6 +171,14 @@ def test_it( GroupMembershipJSONPresenter.return_value.asdict.assert_called_once_with() assert response == GroupMembershipJSONPresenter.return_value.asdict.return_value + def test_it_errors_if_the_user_doesnt_have_permission( + self, context, pyramid_request, pyramid_config + ): + pyramid_config.testing_securitypolicy(permissive=False) + + with pytest.raises(HTTPNotFound): + views.add_member(context, pyramid_request) + def test_it_with_no_request_body( self, pyramid_request, @@ -184,7 +192,7 @@ def test_it_with_no_request_body( EditGroupMembershipAPISchema.assert_not_called() group_members_service.member_join.assert_called_once_with( - context.group, context.user.userid, roles=None + context.group, context.user.userid, roles=[GroupMembershipRoles.MEMBER] ) def test_it_when_a_conflicting_membership_already_exists(