Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/binprotocol #92

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2929669
Implement more methods and fix security hotspot
tomgross Jun 24, 2023
ceb9ff5
readd
tomgross Jun 24, 2023
0c9f4e3
Added more test methods and update test deps
tomgross Dec 22, 2023
503e49e
Choose compatible plyawright version
tomgross Dec 22, 2023
74bd97c
foo
tomgross Dec 22, 2023
a2eb6f1
fix-oauth2-bug
masrlinu Aug 1, 2023
75c7272
Fix Playwright locators in test
tomgross Dec 22, 2023
204cf46
PyFS
tomgross Dec 26, 2023
e9937a2
foo
tomgross Jan 5, 2024
0f986d1
foo
tomgross Jan 5, 2024
538a784
20 failures
tomgross Jan 5, 2024
1cd926e
2 failures
tomgross Jan 6, 2024
298e8b0
1 failure
tomgross Jan 6, 2024
8dc881d
black
tomgross Jan 6, 2024
b01fb15
foo
tomgross Jan 20, 2024
b5afe46
use other implementation
tomgross Feb 5, 2024
a3ac07b
Simplify code
tomgross Feb 9, 2024
ada2543
All tests pass
tomgross Feb 9, 2024
9907272
Error handling
tomgross Feb 10, 2024
54ae08e
Error handling
tomgross Feb 11, 2024
75ab098
More Error handling
tomgross Feb 11, 2024
6c168fe
Lock on getinfo
tomgross Feb 11, 2024
60bb12d
Don't stream session
tomgross Feb 11, 2024
4854018
Try to stablize pyfs tests
tomgross Feb 11, 2024
821e1bf
add binary protocol
tomgross Feb 28, 2024
d8fcb73
PyFS
tomgross Mar 1, 2024
cd8945f
Update setuptools requirement
tomgross Jul 17, 2024
86e68aa
Fix import error
tomgross Jul 17, 2024
b988535
Files from master
tomgross Dec 29, 2024
ae69dcf
rename
tomgross Dec 29, 2024
ac0f2a6
foo
tomgross Dec 29, 2024
7306e2f
foo
tomgross Dec 29, 2024
7c1c82d
Merge branch 'master' into feature/binprotocol
tomgross Dec 29, 2024
f51fbc9
Merge branch 'master' into feature/binprotocol
tomgross Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 65 additions & 77 deletions src/pcloud/api.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,20 @@
import os
import requests
import zipfile

from hashlib import sha1
from io import BytesIO

from pcloud.jsonprotocol import PCloudJSONConnection
from pcloud.oauth2 import TokenHandler
from pcloud.utils import log
from pcloud.utils import to_api_datetime
from pcloud.validate import MODE_AND
from pcloud.validate import RequiredParameterCheck
from requests_toolbelt.multipart.encoder import MultipartEncoder

from urllib.parse import urlparse
from urllib.parse import urlunsplit

import datetime
import logging
import os.path
import requests
import sys
import zipfile


log = logging.getLogger("pcloud")
log.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
log.addHandler(handler)

# File open flags https://docs.pcloud.com/methods/fileops/file_open.html
O_WRITE = int("0x0002", 16)
Expand All @@ -47,40 +39,47 @@ class InvalidFileModeError(Exception):
"""File mode not supported"""


# Helpers
def to_api_datetime(dt):
"""Converter to a datetime structure the pCloud API understands

See https://docs.pcloud.com/structures/datetime.html
"""
if isinstance(dt, datetime.datetime):
return dt.isoformat()
return dt


class PyCloud(object):
endpoints = {
"api": "https://api.pcloud.com/",
"eapi": "https://eapi.pcloud.com/",
"test": "http://localhost:5023/",
"test": "localhost:5023",
"binapi": "https://binapi.pcloud.com",
"bineapi": "https://bineapi.pcloud.com",
"nearest": "",
}

def __init__(
self, username, password, endpoint="api", token_expire=31536000, oauth2=False
self,
username,
password,
endpoint="api",
token_expire=31536000,
oauth2=False,
connection=PCloudJSONConnection,
):
self.session = requests.Session()
conn = connection(self)
self.connection = conn.connect()
if endpoint not in self.endpoints:
log.error(
"Endpoint (%s) not found. Use one of: %s",
endpoint,
",".join(self.endpoints.keys()),
", ".join(self.endpoints.keys()),
)
return
elif endpoint == "nearest":
self.endpoint = self.getnearestendpoint()
elif endpoint not in connection.allowed_endpoints:
log.error(
"Endpoint (%s) not in allowed list of '%s'. Use one of: %s",
endpoint,
connection.__name__,
", ".join(connection.allowed_endpoints),
)
return
else:
self.endpoint = self.endpoints.get(endpoint)

log.info(f"Using pCloud API endpoint: {self.endpoint}")
self.username = username.lower().encode("utf-8")
self.password = password.encode("utf-8")
Expand Down Expand Up @@ -122,24 +121,9 @@ def oauth2_authorize(
return cls("", access_token, endpoint, token_expire, oauth2=True)

def _do_request(self, method, authenticate=True, json=True, endpoint=None, **kw):
if authenticate and self.auth_token: # Password authentication
params = {"auth": self.auth_token}
elif authenticate and self.access_token: # OAuth2 authentication
params = {"access_token": self.access_token}
else:
params = {}
if endpoint is None:
endpoint = self.endpoint
params.update(kw)
log.debug("Doing request to %s%s", endpoint, method)
log.debug("Params: %s", params)
resp = self.session.get(endpoint + method, params=params)
if json:
result = resp.json()
else:
result = resp.content
log.debug("Response: %s", result)
return result
return self.connection.do_get_request(
method, authenticate, json, endpoint, **kw
)

# Authentication
def getdigest(self):
Expand Down Expand Up @@ -176,6 +160,7 @@ def getnearestendpoint(self):
resp = self._do_request(
"getapiserver", authenticate=False, endpoint=default_api
)

api = resp.get("api")
if len(api):
return urlunsplit(["https", api[0], "/", "", ""])
Expand Down Expand Up @@ -234,24 +219,11 @@ def copyfolder(self, **kwargs):
raise NotImplementedError

# File
def _upload(self, method, files, **kwargs):
if self.auth_token: # Password authentication
kwargs["auth"] = self.auth_token
elif self.access_token: # OAuth2 authentication
kwargs["access_token"] = self.access_token
fields = list(kwargs.items())
fields.extend(files)
m = MultipartEncoder(fields=fields)
resp = requests.post(
self.endpoint + method, data=m, headers={"Content-Type": m.content_type}
)
return resp.json()

@RequiredParameterCheck(("files", "data"))
def uploadfile(self, **kwargs):
"""upload a file to pCloud

1) You can specify a list of filenames to read
1) You can specify a list of filenames to upload
files=['/home/pcloud/foo.txt', '/home/pcloud/bar.txt']

2) you can specify binary data via the data parameter and
Expand All @@ -276,7 +248,7 @@ def uploadfile(self, **kwargs):
if "folderid" in kwargs:
# cast folderid to string, since API allows this but requests not
kwargs["folderid"] = str(kwargs["folderid"])
return self._upload("uploadfile", files, **kwargs)
return self.connection.upload("uploadfile", files, **kwargs)

@RequiredParameterCheck(("progresshash",))
def uploadprogress(self, **kwargs):
Expand Down Expand Up @@ -365,53 +337,55 @@ def gettextfile(self, **kwargs):
# File API methods
@RequiredParameterCheck(("flags",))
def file_open(self, **kwargs):
return self._do_request("file_open", **kwargs)
return self._do_request("file_open", use_session=True, **kwargs)

@RequiredParameterCheck(("fd", "count"))
def file_read(self, **kwargs):
return self._do_request("file_read", json=False, **kwargs)
return self._do_request("file_read", json=False, use_session=True, **kwargs)

@RequiredParameterCheck(("fd",))
def file_pread(self, **kwargs):
return self._do_request("file_pread", json=False, **kwargs)
return self._do_request("file_pread", json=False, use_session=True, **kwargs)

@RequiredParameterCheck(("fd", "data"))
def file_pread_ifmod(self, **kwargs):
return self._do_request("file_pread_ifmod", json=False, **kwargs)
return self._do_request(
"file_pread_ifmod", json=False, use_session=True, **kwargs
)

@RequiredParameterCheck(("fd",))
def file_size(self, **kwargs):
return self._do_request("file_size", **kwargs)
return self._do_request("file_size", use_session=True, **kwargs)

@RequiredParameterCheck(("fd",))
def file_truncate(self, **kwargs):
return self._do_request("file_truncate", **kwargs)
return self._do_request("file_truncate", use_session=True, **kwargs)

@RequiredParameterCheck(("fd", "data"))
def file_write(self, **kwargs):
files = [("file", ("upload-file.io", BytesIO(kwargs.pop("data"))))]
kwargs["fd"] = str(kwargs["fd"])
return self._upload("file_write", files, **kwargs)
return self.connection.upload("file_write", files, **kwargs)

@RequiredParameterCheck(("fd",))
def file_pwrite(self, **kwargs):
return self._do_request("file_pwrite", **kwargs)
return self._do_request("file_pwrite", use_session=True, **kwargs)

@RequiredParameterCheck(("fd",))
def file_checksum(self, **kwargs):
return self._do_request("file_checksum", **kwargs)
return self._do_request("file_checksum", use_session=True, **kwargs)

@RequiredParameterCheck(("fd",))
def file_seek(self, **kwargs):
return self._do_request("file_seek", **kwargs)
return self._do_request("file_seek", use_session=True, **kwargs)

@RequiredParameterCheck(("fd",))
def file_close(self, **kwargs):
return self._do_request("file_close", **kwargs)
return self._do_request("file_close", use_session=True, **kwargs)

@RequiredParameterCheck(("fd",))
def file_lock(self, **kwargs):
return self._do_request("file_lock", **kwargs)
return self._do_request("file_lock", use_session=True, **kwargs)

# Archiving
@RequiredParameterCheck(("path", "fileid"))
Expand Down Expand Up @@ -529,5 +503,19 @@ def trash_restorepath(self, **kwargs):
def trash_restore(self, **kwargs):
raise NotImplementedError

# convenience methods
@RequiredParameterCheck(("path",))
def file_exists(self, **kwargs):
path = kwargs["path"]
resp = self.file_open(path=path, flags=O_APPEND)
result = resp.get("result")
if result == 0:
self.file_close(fd=resp["fd"])
return True
elif result == 2009:
return False
else:
raise OSError(f"pCloud error occured ({result}) - {resp['error']}: {path}")


# EOF
Loading
Loading