Skip to content

Commit 1847d4b

Browse files
authored
Patch20250220: logic optimization (#203)
* testing * updating pipeline * update logging level * optimize token logic * remove the thread for token refresh * replace hardcoded thread with dynamic setup which will always be number of thread +1 * optimize the long pulling when checking chunk uploading * fixup the test cases * remove testing code * remove testing code * move the user_config init in each class into init function instead of directly under the class
1 parent 06e3662 commit 1847d4b

File tree

17 files changed

+102
-129
lines changed

17 files changed

+102
-129
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ Command line tool that allows the user to execute data operations on the platfor
4747
4848
Linux example for each environment:
4949
50-
pyinstaller -F --distpath ./app/bundled_app/linux --specpath ./app/build/linux --workpath ./app/build/linux --paths=./.venv/lib/python3.8/site-packages ./app/pilotcli.py -n <app-name>
50+
pyinstaller -F --distpath ./app/bundled_app/linux --specpath ./app/build/linux --workpath ./app/build/linux --paths=./.venv/lib/python3.10/site-packages ./app/pilotcli.py -n <app-name>
5151
5252
Note: Building for ARM Mac may require a newer version of `pyinstaller`.
5353

app/configs/app_config.py

-4
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ class Env:
2828
greenroom_bucket_prefix = 'gr'
2929
# the number of items to active interative mode
3030
interative_threshold = 10
31-
# set hard limit for pending jobs, otherwise cli will consume all memory
32-
# to cache jobs. If later on the speed of chunk deliver become faster, we
33-
# can increase the concurrency number.
34-
num_of_jobs = ConfigClass.concurrent_job_limit
3531

3632
github_url = 'PilotDataPlatform/cli'
3733

app/configs/config.py

-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ class Settings(BaseSettings):
3737
apikey_endpoint: str = 'api-key'
3838

3939
upload_batch_size: int = 100
40-
concurrent_job_limit: int = 10
4140
upload_chunk_size: int = 1024 * 1024 * 20 # 20MB
4241

4342
def __init__(self, **data):

app/configs/user_config.py

+18-13
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ class UserConfig(metaclass=Singleton):
4242
This user config is global.
4343
"""
4444

45+
_api_key: str
46+
_access_token: str
47+
_refresh_token: str
48+
_username: str
49+
4550
def __init__(
4651
self,
4752
config_path: Union[str, Path, None] = None,
@@ -53,7 +58,6 @@ def __init__(
5358
5459
This adjustment is made to prevent complications with mounted NFS volumes where all files have root ownership.
5560
"""
56-
5761
if config_path is None:
5862
config_path = ConfigClass.config_path
5963
if config_filename is None:
@@ -106,6 +110,12 @@ def __init__(
106110
}
107111
self.save()
108112

113+
# load all config into memory
114+
self._api_key = decryption(self.config['USER']['api_key'], self.secret)
115+
self._access_token = decryption(self.config['USER']['access_token'], self.secret)
116+
self._refresh_token = decryption(self.config['USER']['refresh_token'], self.secret)
117+
self._username = decryption(self.config['USER']['username'], self.secret)
118+
109119
def _check_user_permissions(self, path: Path, expected_bits: Iterable[int]) -> Union[str, None]:
110120
"""Check if file or folder is owned by the user and has proper access mode."""
111121

@@ -147,42 +157,37 @@ def is_access_token_exists(self) -> bool:
147157

148158
@property
149159
def username(self):
150-
return decryption(self.config['USER']['username'], self.secret)
160+
return self._username
151161

152162
@username.setter
153163
def username(self, val):
154164
self.config['USER']['username'] = encryption(val, self.secret)
155165

156-
@property
157-
def password(self):
158-
return decryption(self.config['USER']['password'], self.secret)
159-
160-
@password.setter
161-
def password(self, val):
162-
self.config['USER']['password'] = encryption(val, self.secret)
163-
164166
@property
165167
def api_key(self):
166-
return decryption(self.config['USER']['api_key'], self.secret)
168+
return self._api_key
167169

168170
@api_key.setter
169171
def api_key(self, val):
172+
self._api_key = val
170173
self.config['USER']['api_key'] = encryption(val, self.secret)
171174

172175
@property
173176
def access_token(self):
174-
return decryption(self.config['USER'].get('access_token', ''), self.secret)
177+
return self._access_token
175178

176179
@access_token.setter
177180
def access_token(self, val):
181+
self._access_token = val
178182
self.config['USER']['access_token'] = encryption(val, self.secret)
179183

180184
@property
181185
def refresh_token(self):
182-
return decryption(self.config['USER']['refresh_token'], self.secret)
186+
return self._refresh_token
183187

184188
@refresh_token.setter
185189
def refresh_token(self, val):
190+
self._refresh_token = val
186191
self.config['USER']['refresh_token'] = encryption(val, self.secret)
187192

188193
@property

app/services/clients/base_auth_client.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
class BaseAuthClient(BaseClient):
1818

1919
token_manager: SrvTokenManager
20-
user = UserConfig()
20+
user: UserConfig
2121

2222
def __init__(self, endpoint: str, timeout: int = 10) -> None:
2323
super().__init__(endpoint, timeout)
2424

25+
self.user = UserConfig()
2526
self.token_manager = SrvTokenManager()
2627
self.headers = {
2728
'Authorization': 'Bearer ' + self.user.access_token,

app/services/clients/base_client.py

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def _request(
4444
if response.status_code not in self.retry_status:
4545
response.raise_for_status()
4646
return response
47+
4748
time.sleep(self.retry_interval)
4849

4950
logger.debug(f'failed with over {self.retry_count} retries.')

app/services/file_manager/file_manifests.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ def dupe_checking_hook(pairs):
2828

2929
class SrvFileManifests(BaseAuthClient, metaclass=MetaService):
3030
app_config = AppConfig()
31-
user = UserConfig()
31+
user: UserConfig
3232

3333
def __init__(self, interactive=True):
3434
super().__init__(self.app_config.Connections.url_bff)
3535

36+
self.user = UserConfig()
37+
3638
self.interactive = interactive
3739
self.endpoint = self.app_config.Connections.url_bff + '/v1'
3840

app/services/file_manager/file_tag.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
class SrvFileTag(metaclass=MetaService):
1919
appconfig = AppConfig()
20-
user = UserConfig()
20+
user: UserConfig
2121

2222
def __init__(self, interactive=True):
2323
self.interactive = interactive
24+
self.user = UserConfig()
2425

2526
@staticmethod
2627
def validate_tag(tag):

app/services/file_manager/file_upload/file_upload.py

+2-11
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,7 @@ def simple_upload( # noqa: C901
233233

234234
# now loop over each file under the folder and start
235235
# the chunk upload
236-
237-
# thread number +1 reserve one thread to refresh token
238-
# and remove the token decorator in functions
239-
240-
pool = ThreadPool(num_of_thread + 1)
241-
pool.apply_async(upload_client.upload_token_refresh)
236+
pool = ThreadPool(num_of_thread)
242237
on_success_res = []
243238

244239
file_object: FileObject
@@ -338,11 +333,7 @@ def resume_upload(
338333
mhandler.SrvOutPutHandler.resume_check_success()
339334

340335
# lastly, start resumable upload for the rest of the chunks
341-
# thread number +1 reserve one thread to refresh token
342-
# and remove the token decorator in functions
343-
344-
pool = ThreadPool(num_of_thread + 1)
345-
pool.apply_async(upload_client.upload_token_refresh)
336+
pool = ThreadPool(num_of_thread)
346337
on_success_res = []
347338
for file_object in unfinished_items:
348339
upload_client.stream_upload(file_object, pool)

app/services/file_manager/file_upload/upload_client.py

+10-20
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import math
99
import os
1010
import threading
11-
import time
1211
from logging import getLogger
1312
from multiprocessing.pool import ThreadPool
1413
from typing import Any
@@ -22,7 +21,6 @@
2221

2322
import app.services.output_manager.message_handler as mhandler
2423
from app.configs.app_config import AppConfig
25-
from app.configs.config import ConfigClass
2624
from app.configs.user_config import UserConfig
2725
from app.models.upload_form import generate_on_success_form
2826
from app.services.clients.base_auth_client import BaseAuthClient
@@ -31,7 +29,6 @@
3129
from app.services.output_manager.error_handler import ECustomizedError
3230
from app.services.output_manager.error_handler import SrvErrorHandler
3331
from app.services.user_authentication.decorator import require_valid_token
34-
from app.services.user_authentication.token_manager import SrvTokenManager
3532
from app.utils.aggregated import get_file_info_by_geid
3633

3734
from .exception import INVALID_CHUNK_ETAG
@@ -92,7 +89,6 @@ def __init__(
9289
# for tracking the multi-threading chunk upload
9390
self.active_jobs = 0
9491
self.lock = threading.Lock()
95-
self.chunk_upload_done = threading.Event()
9692

9793
def generate_meta(self, local_path: str) -> Tuple[int, int]:
9894
"""
@@ -309,14 +305,15 @@ def stream_upload(self, file_object: FileObject, pool: ThreadPool) -> None:
309305
been uploaded.
310306
"""
311307
count = 0
312-
semaphore = threading.Semaphore(AppConfig.Env.num_of_jobs)
308+
semaphore = threading.Semaphore(pool._processes + 1)
309+
chunk_upload_done = threading.Event()
313310

314311
def on_complete(result):
315312
semaphore.release()
316313
with self.lock:
317314
self.active_jobs -= 1
318315
if self.active_jobs == 0:
319-
self.chunk_upload_done.set()
316+
chunk_upload_done.set()
320317

321318
# process on the file content
322319
f = open(file_object.local_path, 'rb')
@@ -355,8 +352,14 @@ def on_complete(result):
355352

356353
count += 1
357354

355+
# for resumable check ONLY if user resume the upload at 100%
356+
# just check if there is any active job, if not, set the event
357+
while not chunk_upload_done.wait(timeout=60):
358+
if self.active_jobs == 0:
359+
chunk_upload_done.set()
360+
logger.warning('Waiting for all the chunks to be uploaded, remaining jobs: %s', file_object.progress)
361+
358362
f.close()
359-
self.chunk_upload_done.wait()
360363

361364
def upload_chunk(self, file_object: FileObject, chunk_number: int, chunk: str, etag: str, chunk_size: int) -> None:
362365
"""
@@ -466,16 +469,3 @@ def check_status(self, file_object: FileObject) -> bool:
466469

467470
def set_finish_upload(self):
468471
self.finish_upload = True
469-
470-
def upload_token_refresh(self, azp: str = ConfigClass.keycloak_device_client_id):
471-
token_manager = SrvTokenManager()
472-
DEFAULT_INTERVAL = 2 # seconds to check if the upload is finished
473-
total_count = 0 # when total_count equals token_refresh_interval, refresh token
474-
while self.finish_upload is not True:
475-
if total_count >= AppConfig.Env.token_refresh_interval:
476-
token_manager.refresh(azp)
477-
total_count = 0
478-
479-
# if not then sleep for DEFAULT_INTERVAL seconds
480-
time.sleep(DEFAULT_INTERVAL)
481-
total_count = total_count + DEFAULT_INTERVAL

app/services/user_authentication/token_manager.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@
2020
class SrvTokenManager(BaseClient, metaclass=MetaService):
2121
def __init__(self):
2222
super().__init__(AppConfig.Connections.url_keycloak_token, 10)
23-
user_config = UserConfig()
24-
if user_config.is_logged_in():
25-
self.config = user_config
26-
else:
27-
raise Exception('Login session not found, please login first.')
23+
self.config = UserConfig()
2824

2925
def update_token(self, access_token, refresh_token):
3026
self.config.access_token = access_token

app/utils/aggregated.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from app.models.item import ItemStatus
2121
from app.models.item import ItemType
2222
from app.services.clients.base_auth_client import BaseAuthClient
23+
from app.services.clients.base_auth_client import BaseClient
2324
from app.services.logger_services.debugging_log import debug_logger
2425
from app.services.output_manager.error_handler import ECustomizedError
2526
from app.services.output_manager.error_handler import SrvErrorHandler
@@ -233,13 +234,24 @@ def remove_the_output_file(filepath: str) -> None:
233234

234235

235236
def get_latest_cli_version() -> Tuple[Version, str]:
237+
import logging
238+
import time
239+
236240
try:
237-
httpx_client = BaseAuthClient(AppConfig.Connections.url_fileops_greenroom)
241+
start_time = time.time()
242+
httpx_client = BaseClient(AppConfig.Connections.url_fileops_greenroom)
243+
logging.critical(f'http client init time: {time.time() - start_time}')
238244
user_config = UserConfig()
245+
logging.critical(f'user config init time: {time.time() - start_time}')
246+
t1 = time.time()
239247
if not user_config.is_access_token_exists():
240248
return Version('0.0.0')
249+
logging.critical(f'Check token time: {time.time() - t1}')
250+
t2 = time.time()
241251

242-
response = httpx_client._get('v1/download/cli/presigned')
252+
headers = {'Authorization': 'Bearer'}
253+
response = httpx_client._get('v1/download/cli/presigned', headers=headers)
254+
logging.critical(f'Get latest version time: {time.time() - t2}')
243255
result = response.json().get('result', {})
244256
latest_version = result.get('linux', {}).get('version', '0.0.0')
245257
download_url = result.get('linux', {}).get('download_url', '')

0 commit comments

Comments
 (0)