From ccecd3b8f50dad1cccb221a8e2ed479c2cf16593 Mon Sep 17 00:00:00 2001 From: Tom Gross Date: Wed, 19 Feb 2020 22:05:31 +0100 Subject: [PATCH] Python version and flake8 fixes --- .travis.yml | 2 +- setup.cfg | 3 + setup.py | 2 +- src/pcloud/__init__.py | 5 +- src/pcloud/api.py | 195 +++++++++++++++--------------- src/pcloud/pcloudfs.py | 78 ++++++------ src/pcloud/tests/server.py | 36 +++--- src/pcloud/tests/test_api.py | 43 +++---- src/pcloud/tests/test_validate.py | 17 ++- src/pcloud/validate.py | 5 +- 10 files changed, 194 insertions(+), 192 deletions(-) diff --git a/.travis.yml b/.travis.yml index 50db278..4c6aad2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,9 +4,9 @@ language: python sudo: required dist: xenial python: - - 3.5 - 3.6 - 3.7 + - 3.8 install: - pip install -r test_requirements.txt script: diff --git a/setup.cfg b/setup.cfg index 65ed48f..aa7f8eb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -17,3 +17,6 @@ not_skip = __init__.py [aliass] test=tox + +[flake8] +max-line-length = 100 diff --git a/setup.py b/setup.py index bd8158f..fed2752 100644 --- a/setup.py +++ b/setup.py @@ -23,9 +23,9 @@ "Intended Audience :: Developers", "Environment :: Web Environment", "Programming Language :: Python", - "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", "Operating System :: OS Independent", "License :: OSI Approved :: MIT License", ], diff --git a/src/pcloud/__init__.py b/src/pcloud/__init__.py index b7f655b..bbfe097 100644 --- a/src/pcloud/__init__.py +++ b/src/pcloud/__init__.py @@ -1,5 +1,6 @@ # from pcloud.api import PyCloud -PyCloud # pyflakes -__all__ = ['PyCloud'] +PyCloud # pyflakes + +__all__ = ["PyCloud"] diff --git a/src/pcloud/api.py b/src/pcloud/api.py index 7c81c2b..ae0042e 100644 --- a/src/pcloud/api.py +++ b/src/pcloud/api.py @@ -8,15 +8,15 @@ import requests -log = logging.getLogger('pycloud') +log = logging.getLogger("pycloud") # File open flags https://docs.pcloud.com/methods/fileops/file_open.html -O_WRITE = int('0x0002', 16) -O_CREAT = int('0x0040', 16) -O_EXCL = int('0x0080', 16) -O_TRUNC = int('0x0200', 16) -O_APPEND = int('0x0400', 16) +O_WRITE = int("0x0002", 16) +O_CREAT = int("0x0040", 16) +O_EXCL = int("0x0080", 16) +O_TRUNC = int("0x0200", 16) +O_APPEND = int("0x0400", 16) # Exceptions @@ -25,13 +25,13 @@ class AuthenticationError(Exception): def main(): - parser = argparse.ArgumentParser(description='pCloud command line client') + parser = argparse.ArgumentParser(description="pCloud command line client") parser.add_argument( - 'username', - help='The username for login into your pCloud account') + "username", help="The username for login into your pCloud account" + ) parser.add_argument( - 'password', - help='The password for login into your pCloud account') + "password", help="The password for login into your pCloud account" + ) args = parser.parse_args() pyc = PyCloud(args.username, args.password) print(pyc) @@ -39,22 +39,22 @@ def main(): class PyCloud(object): - endpoint = 'https://api.pcloud.com/' + endpoint = "https://api.pcloud.com/" def __init__(self, username, password): - self.username = username.lower().encode('utf-8') - self.password = password.encode('utf-8') + self.username = username.lower().encode("utf-8") + self.password = password.encode("utf-8") self.session = requests.Session() self.auth_token = self.get_auth_token() def _do_request(self, method, authenticate=True, json=True, **kw): if authenticate: - params = {'auth': self.auth_token} + params = {"auth": self.auth_token} else: params = {} params.update(kw) - log.debug('Doing request to %s%s', self.endpoint, method) - log.debug('Params: %s', params) + log.debug("Doing request to %s%s", self.endpoint, method) + log.debug("Params: %s", params) resp = self.session.get(self.endpoint + method, params=params) if json: return resp.json() @@ -63,56 +63,53 @@ def _do_request(self, method, authenticate=True, json=True, **kw): # Authentication def getdigest(self): - resp = self._do_request('getdigest', authenticate=False) - return bytes(resp['digest'], 'utf-8') + resp = self._do_request("getdigest", authenticate=False) + return bytes(resp["digest"], "utf-8") def get_auth_token(self): digest = self.getdigest() passworddigest = sha1( - self.password + - bytes(sha1(self.username).hexdigest(), 'utf-8') + - digest) + self.password + bytes(sha1(self.username).hexdigest(), "utf-8") + digest + ) params = { - 'getauth': 1, - 'logout': 1, - 'username': self.username.decode('utf-8'), - 'digest': digest.decode('utf-8'), - 'passworddigest': passworddigest.hexdigest()} - resp = self._do_request('userinfo', authenticate=False, **params) - if 'auth' not in resp: - raise(AuthenticationError) - return resp['auth'] + "getauth": 1, + "logout": 1, + "username": self.username.decode("utf-8"), + "digest": digest.decode("utf-8"), + "passworddigest": passworddigest.hexdigest(), + } + resp = self._do_request("userinfo", authenticate=False, **params) + if "auth" not in resp: + raise (AuthenticationError) + return resp["auth"] # Folders - @RequiredParameterCheck(('path', 'folderid')) + @RequiredParameterCheck(("path", "folderid")) def createfolder(self, **kwargs): - return self._do_request('createfolder', **kwargs) + return self._do_request("createfolder", **kwargs) - @RequiredParameterCheck(('path', 'folderid')) + @RequiredParameterCheck(("path", "folderid")) def listfolder(self, **kwargs): - return self._do_request('listfolder', **kwargs) + return self._do_request("listfolder", **kwargs) - @RequiredParameterCheck(('path', 'folderid')) + @RequiredParameterCheck(("path", "folderid")) def renamefolder(self, **kwargs): - return self._do_request('renamefolder', **kwargs) + return self._do_request("renamefolder", **kwargs) - @RequiredParameterCheck(('path', 'folderid')) + @RequiredParameterCheck(("path", "folderid")) def deletefolder(self, **kwargs): - return self._do_request('deletefolder', **kwargs) + return self._do_request("deletefolder", **kwargs) - @RequiredParameterCheck(('path', 'folderid')) + @RequiredParameterCheck(("path", "folderid")) def deletefolderrecursive(self, **kwargs): - return self._do_request('deletefolderrecursive', **kwargs) + return self._do_request("deletefolderrecursive", **kwargs) def _upload(self, method, files, **kwargs): - kwargs['auth'] = self.auth_token - resp = self.session.post( - self.endpoint + method, - files=files, - data=kwargs) + kwargs["auth"] = self.auth_token + resp = self.session.post(self.endpoint + method, files=files, data=kwargs) return resp.json() - @RequiredParameterCheck(('files', 'data')) + @RequiredParameterCheck(("files", "data")) def uploadfile(self, **kwargs): """ upload a file to pCloud @@ -123,123 +120,123 @@ def uploadfile(self, **kwargs): need to specify the filename too data='Hello pCloud', filename='foo.txt' """ - if 'files' in kwargs: + if "files" in kwargs: files = {} - upload_files = kwargs.pop('files') + upload_files = kwargs.pop("files") for f in upload_files: filename = basename(f) - files = {filename: open(f, 'rb')} - kwargs['filename'] = filename + files = {filename: open(f, "rb")} + # kwargs['filename'] = filename else: # 'data' in kwargs: - files = {'f': (kwargs.pop('filename'), kwargs.pop('data'))} - return self._upload('uploadfile', files, **kwargs) + files = {"f": (kwargs.pop("filename"), kwargs.pop("data"))} + return self._upload("uploadfile", files, **kwargs) - @RequiredParameterCheck(('progresshash',)) + @RequiredParameterCheck(("progresshash",)) def uploadprogress(self, **kwargs): - return self._do_request('uploadprogress', **kwargs) + return self._do_request("uploadprogress", **kwargs) - @RequiredParameterCheck(('links',)) + @RequiredParameterCheck(("links",)) def downloadfile(self, **kwargs): - return self._do_request('downloadfile', **kwargs) + return self._do_request("downloadfile", **kwargs) def copyfile(self, **kwargs): pass - @RequiredParameterCheck(('path', 'fileid')) + @RequiredParameterCheck(("path", "fileid")) def checksumfile(self, **kwargs): - return self._do_request('checksumfile', **kwargs) + return self._do_request("checksumfile", **kwargs) - @RequiredParameterCheck(('path', 'fileid')) + @RequiredParameterCheck(("path", "fileid")) def deletefile(self, **kwargs): - return self._do_request('deletefile', **kwargs) + return self._do_request("deletefile", **kwargs) def renamefile(self, **kwargs): - return self._do_request('renamefile', **kwargs) + return self._do_request("renamefile", **kwargs) # Auth API methods def sendverificationemail(self, **kwargs): - return self._do_request('sendverificationemail', **kwargs) + return self._do_request("sendverificationemail", **kwargs) def verifyemail(self, **kwargs): - return self._do_request('verifyemail', **kwargs) + return self._do_request("verifyemail", **kwargs) def changepassword(self, **kwargs): - return self._do_request('changepassword', **kwargs) + return self._do_request("changepassword", **kwargs) def lostpassword(self, **kwargs): - return self._do_request('lostpassword', **kwargs) + return self._do_request("lostpassword", **kwargs) def resetpassword(self, **kwargs): - return self._do_request('resetpassword', **kwargs) + return self._do_request("resetpassword", **kwargs) def register(self, **kwargs): - return self._do_request('register', **kwargs) + return self._do_request("register", **kwargs) def invite(self, **kwargs): - return self._do_request('invite', **kwargs) + return self._do_request("invite", **kwargs) def userinvites(self, **kwargs): - return self._do_request('userinvites', **kwargs) + return self._do_request("userinvites", **kwargs) def logout(self, **kwargs): - return self._do_request('logout', **kwargs) + return self._do_request("logout", **kwargs) def listtokens(self, **kwargs): - return self._do_request('listtokens', **kwargs) + return self._do_request("listtokens", **kwargs) def deletetoken(self, **kwargs): - return self._do_request('deletetoken', **kwargs) + return self._do_request("deletetoken", **kwargs) # File API methods - @RequiredParameterCheck(('flags',)) + @RequiredParameterCheck(("flags",)) def file_open(self, **kwargs): - return self._do_request('file_open', **kwargs) + return self._do_request("file_open", **kwargs) - @RequiredParameterCheck(('fd',)) + @RequiredParameterCheck(("fd",)) def file_read(self, **kwargs): - return self._do_request('file_read', json=False, **kwargs) + return self._do_request("file_read", json=False, **kwargs) - @RequiredParameterCheck(('fd',)) + @RequiredParameterCheck(("fd",)) def file_pread(self, **kwargs): - return self._do_request('file_pread', json=False, **kwargs) + return self._do_request("file_pread", json=False, **kwargs) - @RequiredParameterCheck(('fd', 'data')) + @RequiredParameterCheck(("fd", "data")) def file_pread_ifmod(self, **kwargs): - return self._do_request('file_pread_ifmod', json=False, **kwargs) + return self._do_request("file_pread_ifmod", json=False, **kwargs) - @RequiredParameterCheck(('fd',)) + @RequiredParameterCheck(("fd",)) def file_size(self, **kwargs): - return self._do_request('file_size', **kwargs) + return self._do_request("file_size", **kwargs) - @RequiredParameterCheck(('fd',)) + @RequiredParameterCheck(("fd",)) def file_truncate(self, **kwargs): - return self._do_request('file_truncate', **kwargs) + return self._do_request("file_truncate", **kwargs) - @RequiredParameterCheck(('fd', 'data')) + @RequiredParameterCheck(("fd", "data")) def file_write(self, **kwargs): - files = {'filename': BytesIO(kwargs['data'])} - return self._upload('file_write', files, **kwargs) + files = {"filename": BytesIO(kwargs["data"])} + return self._upload("file_write", files, **kwargs) - @RequiredParameterCheck(('fd',)) + @RequiredParameterCheck(("fd",)) def file_pwrite(self, **kwargs): - return self._do_request('file_pwrite', **kwargs) + return self._do_request("file_pwrite", **kwargs) - @RequiredParameterCheck(('fd',)) + @RequiredParameterCheck(("fd",)) def file_checksum(self, **kwargs): - return self._do_request('file_checksum', **kwargs) + return self._do_request("file_checksum", **kwargs) - @RequiredParameterCheck(('fd',)) + @RequiredParameterCheck(("fd",)) def file_seek(self, **kwargs): - return self._do_request('file_seek', **kwargs) + return self._do_request("file_seek", **kwargs) - @RequiredParameterCheck(('fd',)) + @RequiredParameterCheck(("fd",)) def file_close(self, **kwargs): - return self._do_request('file_close', **kwargs) + return self._do_request("file_close", **kwargs) - @RequiredParameterCheck(('fd',)) + @RequiredParameterCheck(("fd",)) def file_lock(self, **kwargs): - return self._do_request('file_lock', **kwargs) + return self._do_request("file_lock", **kwargs) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/pcloud/pcloudfs.py b/src/pcloud/pcloudfs.py index 4aefe21..0345980 100644 --- a/src/pcloud/pcloudfs.py +++ b/src/pcloud/pcloudfs.py @@ -12,6 +12,7 @@ class PCloudFile(BytesIO): """ A file representation for pCloud files """ + def __init__(self, pcloud, path, mode): self.pcloud = pcloud self.path = path @@ -19,11 +20,12 @@ def __init__(self, pcloud, path, mode): # TODO: dependency mode and flags? flags = O_CREAT resp = self.pcloud.file_open(path=self.path, flags=flags) - if resp.get('result') == 0: - self.fd = resp['fd'] + if resp.get("result") == 0: + self.fd = resp["fd"] else: - raise OSError('pCloud error occured ({0}) - {1}'.format( - resp['result'], resp['error'])) + raise OSError( + "pCloud error occured ({0}) - {1}".format(resp["result"], resp["error"]) + ) def close(self): self.pcloud.file_close(fd=self.fd) @@ -54,9 +56,12 @@ def write(self, b): class PCloudFS(FS): """ A Python virtual filesystem representation for pCloud """ + # make alternative implementations possible (i.e. for testing) + factory = PyCloud + def __init__(self, username, password): super().__init__() - self.pcloud = PyCloud(username, password) + self.pcloud = self.factory(username, password) self._meta = { "case_insensitive": False, "invalid_path_chars": ":", # not sure what else @@ -64,7 +69,7 @@ def __init__(self, username, password): "max_sys_path_length": None, # there's no syspath "network": True, "read_only": False, - "supports_rename": False # since we don't have a syspath... + "supports_rename": False, # since we don't have a syspath... } def __repr__(self): @@ -72,23 +77,23 @@ def __repr__(self): def _info_from_metadata(self, metadata, namespaces): info = { - 'basic': { - 'is_dir': metadata.get('isfolder', False), - 'name': metadata.get('name') + "basic": { + "is_dir": metadata.get("isfolder", False), + "name": metadata.get("name"), } } - if 'details' in namespaces: - info['details'] = { - 'type': 1 if metadata.get('isfolder') else 2, - 'accessed': None, - 'modified': metadata.get('modified'), - 'created': metadata.get('created'), - 'metadata_changed': metadata.get('modified'), - 'size': metadata.get('size', 0) + if "details" in namespaces: + info["details"] = { + "type": 1 if metadata.get("isfolder") else 2, + "accessed": None, + "modified": metadata.get("modified"), + "created": metadata.get("created"), + "metadata_changed": metadata.get("modified"), + "size": metadata.get("size", 0), } - if 'link' in namespaces: + if "link" in namespaces: pass - if 'access' in namespaces: + if "access" in namespaces: pass return Info(info) @@ -101,19 +106,19 @@ def getinfo(self, path, namespaces=None): # provides no consistent way of geting the metadata # for both folders and files we extract it from the # folder listing - if path == '/': - parent_path = '/' + if path == "/": + parent_path = "/" else: - parent_path = '/'.join(_path.split('/')[:-1]) - parent_path = parent_path if parent_path else '/' + parent_path = "/".join(_path.split("/")[:-1]) + parent_path = parent_path if parent_path else "/" folder_list = self.pcloud.listfolder(path=parent_path) metadata = None - if 'metadata' in folder_list: - if _path == '/': - metadata = folder_list['metadata'] + if "metadata" in folder_list: + if _path == "/": + metadata = folder_list["metadata"] else: - for item in folder_list['metadata']['contents']: - if item['path'] == _path: + for item in folder_list["metadata"]["contents"]: + if item["path"] == _path: metadata = item break if metadata is None: @@ -131,17 +136,18 @@ def listdir(self, path): if _type is not ResourceType.directory: raise errors.DirectoryExpected(path) result = self.pcloud.listfolder(path=_path) - return [item['name'] for item in result['metadata']['contents']] + return [item["name"] for item in result["metadata"]["contents"]] def makedir(self, path, permissions=None, recreate=False): self.check() result = self.pcloud.createfolder(path=path) - if result['result'] == 2004: + if result["result"] == 2004: raise errors.DirectoryExists('Directory "{0}" already exists') - elif result['result'] != 0: + elif result["result"] != 0: raise errors.CreateFailed( 'Create of directory "{0}" failed with "{1}"'.format( - path, result['error']) + path, result["error"] + ) ) else: # everything is OK return self.opendir(path) @@ -165,14 +171,12 @@ class PCloudOpener(Opener): @staticmethod def open_fs(fs_url, parse_result, writeable, create, cwd): - _, _, directory = parse_result.resource.partition('/') - fs = PCloudFS( - username=parse_result.username, - password=parse_result.password - ) + _, _, directory = parse_result.resource.partition("/") + fs = PCloudFS(username=parse_result.username, password=parse_result.password) if directory: return fs.opendir(directory) else: return fs + # EOF diff --git a/src/pcloud/tests/server.py b/src/pcloud/tests/server.py index 1387781..51ac43d 100644 --- a/src/pcloud/tests/server.py +++ b/src/pcloud/tests/server.py @@ -11,32 +11,36 @@ class MockHandler(BaseHTTPRequestHandler): # Handler for GET requests def do_GET(self): self.send_response(200) - self.send_header('Content-type', 'applicaton/json') + self.send_header("Content-type", "applicaton/json") self.end_headers() # Send the json message - path = self.path[1:].split('?') - with open(join(dirname( - __file__), 'data', path[0] + '.json')) as f: + path = self.path[1:].split("?") + with open(join(dirname(__file__), "data", path[0] + ".json")) as f: data = f.read() - self.wfile.write(bytes(data, 'utf-8')) + self.wfile.write(bytes(data, "utf-8")) # Handler for POST requests def do_POST(self): form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={ - 'REQUEST_METHOD': 'POST', - 'CONTENT_TYPE': self.headers['Content-Type'], - }) - - size = len(form.getvalue('upload.txt')) + fp=self.rfile, + headers=self.headers, + environ={ + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": self.headers["Content-Type"], + }, + ) + print(form) + if "data" in form: + size = len(form.getvalue("data")) + else: + size = len(form.getvalue("upload.txt")) self.send_response(200) - self.send_header('Content-type', 'applicaton/json') + self.send_header("Content-type", "applicaton/json") self.end_headers() # Send the json message - self.wfile.write(bytes( - '{ "result": 0, "metadata": {"size": %s} }' % size, 'utf-8')) + self.wfile.write( + bytes('{ "result": 0, "metadata": {"size": %s} }' % size, "utf-8") + ) class MockServer(socketserver.TCPServer): diff --git a/src/pcloud/tests/test_api.py b/src/pcloud/tests/test_api.py index b616528..aadf766 100644 --- a/src/pcloud/tests/test_api.py +++ b/src/pcloud/tests/test_api.py @@ -1,38 +1,29 @@ # from pcloud import api -from pcloud.tests.server import MockHandler -from pcloud.tests.server import MockServer import os.path -import threading - - -PORT = 5000 - - -httpd = MockServer(("", PORT), MockHandler) -httpd_thread = threading.Thread(target=httpd.serve_forever) -httpd_thread.setDaemon(True) -httpd_thread.start() +import pytest class DummyPyCloud(api.PyCloud): - endpoint = 'http://localhost:{0}/'.format(PORT) - - -def test_getdigest(): - api = DummyPyCloud('foo', 'bar') - assert api.getdigest() == b'YGtAxbUpI85Zvs7lC7Z62rBwv907TBXhV2L867Hkh' + endpoint = "http://localhost:{0}/".format(5000) -def test_get_auth_token(): - api = DummyPyCloud('foo', 'bar') - assert api.get_auth_token() == 'TOKEN' +@pytest.mark.usefixtures("start_mock_server") +class TestPcloudApi(object): + def test_getdigest(self): + api = DummyPyCloud("foo", "bar") + assert api.getdigest() == b"YGtAxbUpI85Zvs7lC7Z62rBwv907TBXhV2L867Hkh" + def test_get_auth_token(self): + api = DummyPyCloud("foo", "bar") + assert api.get_auth_token() == "TOKEN" -def test_upload_files(): - api = DummyPyCloud('foo', 'bar') - testfile = os.path.join(os.path.dirname(__file__), 'data', 'upload.txt') - assert api.uploadfile(files=[testfile]) == {"result": 0, - "metadata": {"size": 14}} + def test_upload_files(self): + api = DummyPyCloud("foo", "bar") + testfile = os.path.join(os.path.dirname(__file__), "data", "upload.txt") + assert api.uploadfile(files=[testfile]) == { + "result": 0, + "metadata": {"size": 14}, + } diff --git a/src/pcloud/tests/test_validate.py b/src/pcloud/tests/test_validate.py index ef08a16..116ac5a 100644 --- a/src/pcloud/tests/test_validate.py +++ b/src/pcloud/tests/test_validate.py @@ -3,35 +3,34 @@ from pcloud.validate import MODE_AND -@RequiredParameterCheck(('path', 'folderid')) +@RequiredParameterCheck(("path", "folderid")) def foo(path=None, folderid=None, bar=None): return path, folderid, bar -@RequiredParameterCheck(('path', 'folderid'), mode=MODE_AND) +@RequiredParameterCheck(("path", "folderid"), mode=MODE_AND) def foo_all(path=None, folderid=None, bar=None): return path, folderid, bar class TestPathIdentifier(object): - def test_validiate_path(self): - assert foo(path='/', bar='x') == ('/', None, 'x') + assert foo(path="/", bar="x") == ("/", None, "x") def test_validiate_folderid(self): - assert foo(folderid='0') == (None, '0', None) + assert foo(folderid="0") == (None, "0", None) def test_validiate_nothing(self): with pytest.raises(ValueError): - foo(bar='something') + foo(bar="something") def test_validiate_all_path(self): with pytest.raises(ValueError): - foo_all(path='/', bar='x') + foo_all(path="/", bar="x") def test_validiate_all_folderid(self): with pytest.raises(ValueError): - foo_all(folderid='0') == (None, '0', None) + foo_all(folderid="0") == (None, "0", None) def test_validiate_all(self): - foo_all(folderid='0', path='/') == ('/', '0', None) + foo_all(folderid="0", path="/") == ("/", "0", None) diff --git a/src/pcloud/validate.py b/src/pcloud/validate.py index c09ad72..1378f74 100644 --- a/src/pcloud/validate.py +++ b/src/pcloud/validate.py @@ -21,7 +21,10 @@ def wrapper(*args, **kwargs): elif self.mode == MODE_AND and found_paramater == self.required: return func(*args, **kwargs) else: - raise ValueError(f"One required parameter `{ ', '.join(self.required)}` is missing: {found_paramater}") + raise ValueError( + f"Required parameter `{', '.join(self.required)}` is missing." + ) + wrapper.__name__ = func.__name__ wrapper.__dict__.update(func.__dict__) wrapper.__doc__ = func.__doc__