Skip to content

Commit

Permalink
Merge pull request #4191 from hove-io/opg_cached
Browse files Browse the repository at this point in the history
Opg config cached
  • Loading branch information
azime authored Jan 15, 2024
2 parents 4aa4b2a + df538d6 commit 28f5205
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 102 deletions.
2 changes: 2 additions & 0 deletions source/jormungandr/jormungandr/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
'TIMEOUT_TIMEO': 60,
'TIMEOUT_SYNTHESE': 30,
'TIMEOUT_KRAKEN_COVERAGES': 60,
'FETCH_S3_DATA_TIMEOUT': 24 * 60,
}

CACHE_CONFIGURATION = json.loads(os.getenv('JORMUNGANDR_CACHE_CONFIGURATION', '{}')) or default_cache
Expand All @@ -163,6 +164,7 @@
'CACHE_TYPE': 'null', # by default cache is not activated
'TIMEOUT_AUTHENTICATION': 30,
'TIMEOUT_PARAMS': 30,
"FETCH_S3_DATA_TIMEOUT": 2 * 60,
}

MEMORY_CACHE_CONFIGURATION = (
Expand Down
6 changes: 3 additions & 3 deletions source/jormungandr/jormungandr/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ def __init__(
self.best_boarding_positions = read_best_boarding_positions(file_path)

# load stop_point attractivities, the feature is only available when loki is selected as pt_planner
self.olympic_site_params_manager = OlympicSiteParamsManager(self)
if self.olympics_forbidden_uris:
self.olympic_site_params_manager.fill_olympic_site_params_from_s3()
self.olympic_site_params_manager = OlympicSiteParamsManager(
self, app.config.get(str('OLYMPIC_SITE_PARAMS_BUCKET'), {})
)

# TODO: use db
self._pt_journey_fare_backend_manager = PtJourneyFareBackendManager(
Expand Down
2 changes: 1 addition & 1 deletion source/jormungandr/jormungandr/interfaces/v1/opg_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ def __init__(self, *args, **kwargs):
def get(self, region=None, lon=None, lat=None):
region_str = i_manager.get_region(region, lon, lat)
instance = i_manager.instances[region_str]
return instance.olympic_site_params_manager.olympic_site_params, 200
return instance.olympic_site_params_manager.opg_params, 200
89 changes: 68 additions & 21 deletions source/jormungandr/jormungandr/olympic_site_params_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,43 @@
from jormungandr import app
from navitiacommon import type_pb2
from botocore.client import Config
from jormungandr import app, memory_cache, cache


AttractivityVirtualFallback = namedtuple("AttractivityVirtualFallback", "attractivity, virtual_duration")


class ResourceS3Object:
def __init__(self, s3_object, instance_name):
self.s3_object = s3_object
self.instance_name = instance_name

def __repr__(self):
return "{}-{}-{}".format(self.instance_name, self.s3_object.key, self.s3_object.e_tag)


class OlympicSiteParamsManager:
def __init__(self, instance):
def __init__(self, instance, config):
self.olympic_site_params = dict()
self.map_filename_last_modified = dict()
self.instance = instance
self.bucket_name = config.get("name")
self.folder = config.get("folder", "olympic_site_params")
self.args = config.get("args", {"connect_timeout": 2, "read_timeout": 2, "retries": {'max_attempts': 0}})

self.check_conf()

def check_conf(self):
logger = logging.getLogger(__name__)
if not self.bucket_name:
logger.warning(
"Reading stop points attractivities, undefined bucket_name for instance {}".format(
self.instance.name
)
)

def __repr__(self):
return "opg-{}".format(self.instance.name)

def build_olympic_site_params(self, scenario, data):
if not scenario:
Expand Down Expand Up @@ -117,32 +145,49 @@ def get_json_content(self, s3_object):
logger.exception('Error while loading file: {}'.format(s3_object.key))
return {}

def fill_olympic_site_params_from_s3(self):
@cache.memoize(app.config[str('CACHE_CONFIGURATION')].get(str('FETCH_S3_DATA_TIMEOUT'), 24 * 60))
def load_data(self, resource_s3_object):
"""
the POI conf is hidden in REDIS by the instance name, file name and Etag of the S3 object
"""
json_content = self.get_json_content(resource_s3_object.s3_object)
self.str_datetime_time_stamp(json_content)
return json_content

@memory_cache.memoize(
app.config[str('MEMORY_CACHE_CONFIGURATION')].get(str('FETCH_S3_DATA_TIMEOUT'), 2 * 60)
)
def fetch_and_get_data(self, instance_name, bucket_name, folder, **kwargs):
result = dict()
logger = logging.getLogger(__name__)
bucket_params = app.config.get('OLYMPIC_SITE_PARAMS_BUCKET', {})
if not bucket_params:
logger.debug("Reading stop points attractivities, undefined bucket_params")
return
bucket_name = bucket_params.get("name")
if not bucket_name:
logger.debug("Reading stop points attractivities, undefined bucket_name")
return

args = bucket_params.get(
"args", {"connect_timeout": 2, "read_timeout": 2, "retries": {'max_attempts': 0}}
)
s3_resource = boto3.resource('s3', config=Config(**args))

folder = app.config.get('OLYMPIC_SITE_PARAMS_BUCKET', {}).get("folder", "olympic_site_params")
s3_resource = boto3.resource('s3', config=Config(**kwargs))
try:
my_bucket = s3_resource.Bucket(bucket_name)
for obj in my_bucket.objects.filter(Prefix="{}/{}/".format(folder, self.instance.name)):
for obj in my_bucket.objects.filter(Prefix="{}/{}/".format(folder, instance_name)):
if obj.key.endswith('.json'):
json_content = self.get_json_content(obj)
self.str_datetime_time_stamp(json_content)
self.olympic_site_params.update(json_content)
resource_s3_object = ResourceS3Object(obj, instance_name)
json_content = self.load_data(resource_s3_object)
result.update(json_content)
except Exception:
logger.exception("Error on OlympicSiteParamsManager")
return result

@property
def opg_params(self):
self.fill_olympic_site_params_from_s3()
return self.olympic_site_params

def fill_olympic_site_params_from_s3(self):
if not self.instance.olympics_forbidden_uris:
return
logger = logging.getLogger(__name__)
if not self.bucket_name:
logger.debug("Reading stop points attractivities, undefined bucket_name")
return

self.olympic_site_params = self.fetch_and_get_data(
instance_name=self.instance.name, bucket_name=self.bucket_name, folder=self.folder, **self.args
)

def build(self, pt_object_origin, pt_object_destination, api_request):
# Warning, the order of functions is important
Expand Down Expand Up @@ -214,6 +259,8 @@ def get_olympic_site_params(self, pt_origin_detail, pt_destination_detail, api_r
if not origin_olympic_site and not destination_olympic_site:
return {}

self.fill_olympic_site_params_from_s3()

if origin_olympic_site and destination_olympic_site:
origin_olympic_site = None

Expand Down
Loading

0 comments on commit 28f5205

Please sign in to comment.