Skip to content

Commit

Permalink
Merge pull request #33 from CMCC-Foundation/feature_18
Browse files Browse the repository at this point in the history
- Merging feature from Sebastien project (CSV, GeoJSON, raster map apis)
-  Zarr new feature
- Temporal Resample on the fly
- Spatial regrid on the fly
  • Loading branch information
gtramonte authored Jan 27, 2025
2 parents 0c1742d + 628a233 commit 5ddfb23
Show file tree
Hide file tree
Showing 18 changed files with 679 additions and 61 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The system has been designed using a cloud-native architecture, based on contain

It uses [geokube](https://github.com/CMCC-Foundation/geokube) as an Analytics Engine to perform geospatial operations.

## Developer Team
## Developers Team

- [Valentina Scardigno](https://github.com/vale95-eng)
- [Gabriele Tramonte](https://github.com/gtramonte)
Expand Down
103 changes: 87 additions & 16 deletions api/app/endpoint_handlers/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import pika
from typing import Optional

from dbmanager.dbmanager import DBManager
from fastapi.responses import FileResponse

from dbmanager.dbmanager import DBManager, RequestStatus
from geoquery.geoquery import GeoQuery
from geoquery.task import TaskList
from datastore.datastore import Datastore, DEFAULT_MAX_REQUEST_SIZE_GB
Expand All @@ -18,12 +20,18 @@
from api_utils import make_bytes_readable_dict
from validation import assert_product_exists

from . import request

log = get_dds_logger(__name__)
data_store = Datastore()

MESSAGE_SEPARATOR = os.environ["MESSAGE_SEPARATOR"]

def _is_etimate_enabled(dataset_id, product_id):
if dataset_id in ("sentinel-2",):
return False
return True


@log_execution_time(log)
def get_datasets(user_roles_names: list[str]) -> list[dict]:
Expand Down Expand Up @@ -213,7 +221,7 @@ def estimate(

@log_execution_time(log)
@assert_product_exists
def query(
def async_query(
user_id: str,
dataset_id: str,
product_id: str,
Expand Down Expand Up @@ -250,21 +258,22 @@ def query(
"""
log.debug("geoquery: %s", query)
estimated_size = estimate(dataset_id, product_id, query, "GB").get("value")
allowed_size = data_store.product_metadata(dataset_id, product_id).get(
"maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB
)
if estimated_size > allowed_size:
raise exc.MaximumAllowedSizeExceededError(
dataset_id=dataset_id,
product_id=product_id,
estimated_size_gb=estimated_size,
allowed_size_gb=allowed_size,
)
if estimated_size == 0.0:
raise exc.EmptyDatasetError(
dataset_id=dataset_id, product_id=product_id
if _is_etimate_enabled(dataset_id, product_id):
estimated_size = estimate(dataset_id, product_id, query, "GB").get("value")
allowed_size = data_store.product_metadata(dataset_id, product_id).get(
"maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB
)
if estimated_size > allowed_size:
raise exc.MaximumAllowedSizeExceededError(
dataset_id=dataset_id,
product_id=product_id,
estimated_size_gb=estimated_size,
allowed_size_gb=allowed_size,
)
if estimated_size == 0.0:
raise exc.EmptyDatasetError(
dataset_id=dataset_id, product_id=product_id
)
broker_conn = pika.BlockingConnection(
pika.ConnectionParameters(
host=os.getenv("BROKER_SERVICE_HOST", "broker")
Expand Down Expand Up @@ -295,6 +304,68 @@ def query(
broker_conn.close()
return request_id

@log_execution_time(log)
@assert_product_exists
def sync_query(
user_id: str,
dataset_id: str,
product_id: str,
query: GeoQuery,
):
"""Realize the logic for the endpoint:
`POST /datasets/{dataset_id}/{product_id}/execute`
Query the data and return the result of the request.
Parameters
----------
user_id : str
ID of the user executing the query
dataset_id : str
ID of the dataset
product_id : str
ID of the product
query : GeoQuery
Query to perform
Returns
-------
request_id : int
ID of the request
Raises
-------
MaximumAllowedSizeExceededError
if the allowed size is below the estimated one
EmptyDatasetError
if estimated size is zero
"""

import time
request_id = async_query(user_id, dataset_id, product_id, query)
status, _ = DBManager().get_request_status_and_reason(request_id)
log.debug("sync query: status: %s", status)
while status in (RequestStatus.RUNNING, RequestStatus.QUEUED,
RequestStatus.PENDING):
time.sleep(1)
status, _ = DBManager().get_request_status_and_reason(request_id)
log.debug("sync query: status: %s", status)

if status is RequestStatus.DONE:
download_details = DBManager().get_download_details_for_request_id(
request_id
)
return FileResponse(
path=download_details.location_path,
filename=download_details.location_path.split(os.sep)[-1],
)
raise exc.ProductRetrievingError(
dataset_id=dataset_id,
product_id=product_id,
status=status.name)


@log_execution_time(log)
def run_workflow(
Expand Down
6 changes: 5 additions & 1 deletion api/app/endpoint_handlers/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ def get_request_resulting_size(request_id: int):
If the request was not found
"""
if request := DBManager().get_request_details(request_id):
return request.download.size_bytes
size = request.download.size_bytes
if not size or size == 0:
raise exc.EmptyDatasetError(dataset_id=request.dataset,
product_id=request.product)
return size
log.info(
"request with id '%s' could not be found",
request_id,
Expand Down
13 changes: 13 additions & 0 deletions api/app/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,16 @@ def __init__(self, dataset_id, product_id):
product_id=product_id,
)
super().__init__(self.msg)

class ProductRetrievingError(BaseDDSException):
"""Retrieving of the product failed."""

msg: str = "Retrieving of the product '{dataset_id}.{product_id}' failed with the status {status}"

def __init__(self, dataset_id, product_id, status):
self.msg = self.msg.format(
dataset_id=dataset_id,
product_id=product_id,
status=status
)
super().__init__(self.msg)
Loading

0 comments on commit 5ddfb23

Please sign in to comment.