Skip to content

Commit

Permalink
Downloader: re-ordering proxy translation from RapiDAST format to Req… (
Browse files Browse the repository at this point in the history
#185)

* Downloader: re-ordering proxy translation from RapiDAST format to Requests format to avoid double-translation

* Add pytests for downloaders
  • Loading branch information
cedricbu authored Apr 15, 2024
1 parent 9a44070 commit c700680
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 5 deletions.
10 changes: 5 additions & 5 deletions scanners/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ def authenticated_download_with_rtoken(url, dest, auth, proxy=None):
session = requests.Session()

# get a token
token = oauth2_get_token_from_rtoken(auth, proxy, session)
if not token:
return False
authenticated_headers = {"Authorization": f"Bearer {token}"}

if proxy:
proxy = {
"https": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}",
"http": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}",
}
token = oauth2_get_token_from_rtoken(auth, proxy, session)
if not token:
return False

authenticated_headers = {"Authorization": f"Bearer {token}"}

resp = session.get(url, proxies=proxy, headers=authenticated_headers)

Expand Down
79 changes: 79 additions & 0 deletions tests/scanners/test_downloaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from unittest.mock import Mock
from unittest.mock import MagicMock
from unittest.mock import patch

import pytest

from scanners import downloaders

from collections import namedtuple

@pytest.fixture(scope="function")
def my_auth():
return {
"url": "auth_url",
"client_id": "auth_client_id",
"rtoken": "aut_rtoken",
}

@pytest.fixture(scope="function")
def my_proxy():
proxy = {
"proxyHost": "proxyHost",
"proxyPort": "proxyPort",
}

@patch("scanners.downloaders.requests.get")
def test_anonymous_download(mock_get, my_proxy):
def request_get(url, allow_redirects=True, proxies=None):
Response = namedtuple("Response", ["status_code", "content"])
return Response(status_code=200, content="content")


mock_get.side_effect = request_get

ret = downloaders.anonymous_download("url", dest=None, proxy=my_proxy)

assert ret == "content"




@patch("scanners.downloaders.requests.Session")
def test_oauth2_get_token_from_rtoken(mock_session, my_auth, my_proxy):
def fake_Session():
def fake_post(url, **kwargs):
Post = namedtuple("Post", ["raise_for_status", "text"])
return Post(raise_for_status=lambda: None, text=b"{'access_token':123}")

Session = namedtuple("Session", ["post"])
return Session(post=fake_post)

mock_session.side_effect = fake_Session

rtoken = downloaders.oauth2_get_token_from_rtoken(auth=my_auth, proxy=my_proxy, session=None)

assert rtoken == 123

@patch("scanners.downloaders.requests.Session")
@patch("scanners.downloaders.oauth2_get_token_from_rtoken")
@patch("builtins.open")
def test_authenticated_download_with_rtoken(mock_open, mock_get_rtoken, mock_session, my_auth, my_proxy):
def fake_Session():
def fake_post(url, **kwargs):
Post = namedtuple("Post", ["raise_for_status", "text"])
return Post(raise_for_status=lambda: None, text=b"{'access_token':123}")
def fake_get(url, **kwargs):
Get = namedtuple("Get", ["status_code", "text"])
return Get(status_code=200, text="text")

Session = namedtuple("Session", ["post", "get"])
return Session(post=fake_post, get=fake_get)

mock_session.side_effect = fake_Session
mock_get_rtoken.return_value = "123"
mock_open.return_value = MagicMock()

res = downloaders.authenticated_download_with_rtoken("url", "Nowhere", auth=my_auth, proxy=my_proxy)
assert res == True

0 comments on commit c700680

Please sign in to comment.