Skip to content

Commit

Permalink
Add authenticate_with_magic_auth
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatascastro12 committed Aug 21, 2023
1 parent e6db8d4 commit b8aa973
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 0 deletions.
45 changes: 45 additions & 0 deletions tests/test_users.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import pytest

import workos
from tests.utils.fixtures.mock_session import MockSession
from tests.utils.fixtures.mock_user import MockUser
from workos.users import Users

Expand Down Expand Up @@ -99,6 +101,16 @@ def mock_users_pagination_response(self):
},
}

@pytest.fixture
def mock_auth_response(self):
user = MockUser("user_01H7ZGXFP5C6BBQY6Z7277ZCT0").to_dict()
session = MockSession("session_01E4ZCR3C56J083X43JQXF3JK5").to_dict()

return {
"user": user,
"session": session,
}

def test_create_user(self, mock_user, mock_request_method):
mock_request_method("post", mock_user, 201)

Expand Down Expand Up @@ -176,3 +188,36 @@ def test_remove_user_from_organization(self, capture_and_mock_request, mock_user

assert url[0].endswith("users/user_123/organization/org_123")
assert user["id"] == "user_01H7ZGXFP5C6BBQY6Z7277ZCT0"

def test_authenticate_with_magic_auth(
self, capture_and_mock_request, mock_auth_response
):
code = "test_auth"
expires_in = 3600
magic_auth_challenge_id = "magic_auth_challenge_id"
user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36"
ip_address = "192.0.0.1"

url, request = capture_and_mock_request("post", mock_auth_response, 200)

response = self.users.authenticate_with_magic_auth(
code=code,
expires_in=expires_in,
magic_auth_challenge_id=magic_auth_challenge_id,
user_agent=user_agent,
ip_address=ip_address,
)

assert url[0].endswith("users/session/token")
assert response["user"]["id"] == "user_01H7ZGXFP5C6BBQY6Z7277ZCT0"
assert response["session"]["id"] == "session_01E4ZCR3C56J083X43JQXF3JK5"
assert request["json"]["code"] == code
assert request["json"]["user_agent"] == user_agent
assert request["json"]["expires_in"] == expires_in
assert request["json"]["ip_address"] == ip_address
assert request["json"]["client_id"] == "client_b27needthisforssotemxo"
assert request["json"]["client_secret"] == "sk_abdsomecharactersm284"
assert (
request["json"]["grant_type"]
== "urn:workos:oauth:grant-type:magic-auth:code"
)
41 changes: 41 additions & 0 deletions tests/utils/fixtures/mock_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import datetime
from workos.resources.base import WorkOSBaseResource


class MockSession(WorkOSBaseResource):
def __init__(self, id):
self.id = id
self.token = "session_token_123abc"
self.authorized_organizations = [
{
"organization": {
"id": "org_01E4ZCR3C56J083X43JQXF3JK5",
"name": "Foo Corp",
}
}
]
self.unauthorized_organizations = [
{
"organization": {
"id": "org_01H7BA9A1YY5RGBTP1HYKVJPNC",
"name": "Bar Corp",
},
"reasons": [
{
"type": "authentication_method_required",
"allowed_authentication_methods": ["GoogleOauth"],
}
],
}
]
self.created_at = datetime.datetime.now()
self.updated_at = datetime.datetime.now()

OBJECT_FIELDS = [
"id",
"token",
"authorized_organizations",
"unauthorized_organizations",
"created_at",
"updated_at",
]
34 changes: 34 additions & 0 deletions workos/resources/authentication_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from workos.resources.base import WorkOSBaseResource
from workos.resources.session import WorkOSSession
from workos.resources.users import WorkOSUser


class WorkOSAuthenticationResponse(WorkOSBaseResource):
"""Representation of a User and Session response as returned by WorkOS through User Management features."""

@classmethod
def construct_from_response(cls, response):
authentication_response = super(
WorkOSAuthenticationResponse, cls
).construct_from_response(response)

user = WorkOSUser.construct_from_response(response["user"])
authentication_response.user = user

session = WorkOSSession.construct_from_response(response["session"])
authentication_response.session = session

return authentication_response

def to_dict(self):
authentication_response_dict = super(
WorkOSAuthenticationResponse, self
).to_dict()

user_dict = self.user.to_dict()
authentication_response_dict["user"] = user_dict

session_dict = self.session.to_dict()
authentication_response_dict["session"] = session_dict

return authentication_response_dict
159 changes: 159 additions & 0 deletions workos/resources/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from workos.resources.base import WorkOSBaseResource


class WorkOSUserOrganization(WorkOSBaseResource):
"""Contains the id and name of the associated Organization.
Attributes:
OBJECT_FIELDS (list): List of fields a WorkOSUserOrganization comprises.
"""

OBJECT_FIELDS = [
"id",
"name",
]


class WorkOSUnauthorizedOrganizationReason(WorkOSBaseResource):
"""Contains the id and name of the associated Organization.
Attributes:
OBJECT_FIELDS (list): List of fields a WorkOSUnauthorizedOrganizationReason comprises.
"""

OBJECT_FIELDS = [
"type",
"allowed_authentication_methods",
]


class WorkOSAuthorizedOrganization(WorkOSBaseResource):
"""Contains the id and name of the associated Organization.
Attributes:
OBJECT_FIELDS (list): List of fields a WorkOSUserOrganization comprises.
"""

@classmethod
def construct_from_response(cls, response):
authorized_organization = super(
WorkOSAuthorizedOrganization, cls
).construct_from_response(response)

organization = WorkOSUserOrganization.construct_from_response(
response["organization"]
)
authorized_organization.organization = organization

return authorized_organization

def to_dict(self):
authorized_organizations_dict = super(
WorkOSAuthorizedOrganization, self
).to_dict()

organization_dict = self.organization.to_dict()
authorized_organizations_dict["organization"] = organization_dict

return authorized_organizations_dict


class WorkOSUnauthorizedOrganization(WorkOSBaseResource):
"""Contains the unauthorized organization and reasons for the non authorization.
Attributes:
OBJECT_FIELDS (list): List of fields a WorkOSUserOrganization comprises.
"""

@classmethod
def construct_from_response(cls, response):
unauthorized_organization = super(
WorkOSUnauthorizedOrganization, cls
).construct_from_response(response)

organization = WorkOSUserOrganization.construct_from_response(
response["organization"]
)
unauthorized_organization.organization = organization

unauthorized_organization.reason = []

for reason in response["reasons"]:
reason = WorkOSUnauthorizedOrganizationReason.construct_from_response(
reason
)
unauthorized_organization.reason.append(reason)

return unauthorized_organization

def to_dict(self):
unauthorized_organizations_dict = super(
WorkOSUnauthorizedOrganization, self
).to_dict()

organization_dict = self.organization.to_dict()
unauthorized_organizations_dict["organization"] = organization_dict

unauthorized_organizations_dict["reasons"] = []

for reason in self.reason:
reason_dict = reason.to_dict()
unauthorized_organizations_dict["reasons"].append(reason_dict)

return unauthorized_organizations_dict


class WorkOSSession(WorkOSBaseResource):
"""Representation of a Session response as returned by WorkOS through User Management features.
Attributes:
OBJECT_FIELDS (list): List of fields a class WorkOSSession comprises.
"""

OBJECT_FIELDS = [
"id",
"token",
"created_at",
"updated_at",
]

@classmethod
def construct_from_response(cls, response):
session = super(WorkOSSession, cls).construct_from_response(response)

session.authorized_organizations = []
session.unauthorized_organizations = []

for authorized_organization in response["authorized_organizations"]:
authorized_organization = (
WorkOSAuthorizedOrganization.construct_from_response(
authorized_organization
)
)
session.authorized_organizations.append(authorized_organization)

for unauthorized_organization in response["unauthorized_organizations"]:
unauthorized_organization = (
WorkOSUnauthorizedOrganization.construct_from_response(
unauthorized_organization
)
)
session.unauthorized_organizations.append(unauthorized_organization)

return session

def to_dict(self):
session_dict = super(WorkOSSession, self).to_dict()

session_dict["authorized_organizations"] = []
session_dict["unauthorized_organizations"] = []

for organization in self.authorized_organizations:
organization_dict = organization.to_dict()
session_dict["authorized_organizations"].append(organization_dict)

for organization in self.unauthorized_organizations:
organization_dict = organization.to_dict()
session_dict["unauthorized_organizations"].append(organization_dict)

return session_dict
53 changes: 53 additions & 0 deletions workos/users.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import workos
from workos.resources.authentication_response import WorkOSAuthenticationResponse
from workos.resources.list import WorkOSListResource
from workos.resources.users import (
WorkOSUser,
Expand All @@ -15,6 +16,7 @@
USER_PATH = "users"
USER_DETAIL_PATH = "users/{0}"
USER_ORGANIZATION_PATH = "users/{0}/organization/{1}"
USER_SESSION_TOKEN = "users/session/token"

RESPONSE_LIMIT = 10

Expand Down Expand Up @@ -185,3 +187,54 @@ def remove_user_from_organization(self, user, organization):
)

return WorkOSUser.construct_from_response(response).to_dict()

def authenticate_with_magic_auth(
self,
code,
magic_auth_challenge_id,
expires_in=None,
ip_address=None,
user_agent=None,
):
"""Authenticates a user by verifying a one-time code sent to the user's email address by the Magic Auth Send Code endpoint.
Kwargs:
code (str):
magic_auth_challenge_id (str)
expires_in (int) (Optional)
ip_address (str) (Optional)
user_agent (str) (Optional)
Returns:
(dict): Authentication response from WorkOS.
[user] (dict): User response from WorkOS
[session] (dict): Session response from WorkOS
"""

headers = {}

payload = {
"client_id": workos.client_id,
"client_secret": workos.api_key,
"code": code,
"magic_auth_challenge_id": magic_auth_challenge_id,
"grant_type": "urn:workos:oauth:grant-type:magic-auth:code",
}

if expires_in:
payload["expires_in"] = expires_in

if ip_address:
payload["ip_address"] = ip_address

if user_agent:
payload["user_agent"] = user_agent

response = self.request_helper.request(
USER_SESSION_TOKEN,
method=REQUEST_METHOD_POST,
headers=headers,
params=payload,
)

return WorkOSAuthenticationResponse.construct_from_response(response).to_dict()

0 comments on commit b8aa973

Please sign in to comment.