|
14 | 14 | import re
|
15 | 15 | import json
|
16 | 16 | import shutil
|
| 17 | +from datetime import datetime, timezone |
17 | 18 | from contextlib import contextmanager
|
18 | 19 | from abc import ABCMeta, abstractmethod, abstractproperty
|
19 | 20 |
|
@@ -187,15 +188,8 @@ def materialize(
|
187 | 188 | # merge kwargs
|
188 | 189 | adapter_kwargs = {**adapter_model.arguments, **(adapter_kwargs or {})}
|
189 | 190 |
|
190 |
| - # determine the basename of the cache file (if existing) |
191 |
| - h = ( |
192 |
| - os.path.realpath(self.data_location), |
193 |
| - adapter.get_cache_key(**adapter_kwargs), |
194 |
| - ) |
195 |
| - cache_name = f"{create_hash(h)}.json" |
196 |
| - |
197 | 191 | # when cached, read the cached object instead
|
198 |
| - readable_path, writable_path, cached = self.check_cache(cache_name) |
| 192 | + readable_path, writable_path, cached = self.check_cache(adapter, adapter_kwargs) |
199 | 193 | if cached:
|
200 | 194 | yield self.read_cache(readable_path)
|
201 | 195 | return
|
@@ -225,15 +219,64 @@ def materialize(
|
225 | 219 | if writable_path:
|
226 | 220 | self.write_cache(writable_path, materialized)
|
227 | 221 |
|
228 |
| - def check_cache(self, cache_name: str) -> [str, str, bool]: |
229 |
| - # check the writable (default) cache directory |
230 |
| - writable_path = os.path.join(self.cache_directory, cache_name) |
231 |
| - if os.path.exists(writable_path): |
| 222 | + def check_cache( |
| 223 | + self, |
| 224 | + adapter: Adapter, |
| 225 | + adapter_kwargs: dict[str, Any], |
| 226 | + lifetime: int = 86400, # TODO: let adapter or main settings control this |
| 227 | + ) -> [str, str, bool]: |
| 228 | + # create a unique hash |
| 229 | + h = create_hash(( |
| 230 | + os.path.realpath(self.data_location), |
| 231 | + adapter.get_cache_key(**adapter_kwargs), |
| 232 | + )) |
| 233 | + |
| 234 | + # helper to find a cached file in a directory with the largest timestamp and to invalidate |
| 235 | + # too old ones |
| 236 | + cre = re.compile(rf"^{h}(|_\d+)\.json$") |
| 237 | + |
| 238 | + def find(directory: str, ts: int, invalidate: bool) -> str | None: |
| 239 | + files = { |
| 240 | + int(m.group(1)[1:] or 0): os.path.join(directory, elem) |
| 241 | + for elem in os.listdir(directory) |
| 242 | + if (m := cre.match(elem)) |
| 243 | + } |
| 244 | + |
| 245 | + # return none when no cached files were found |
| 246 | + if not files: |
| 247 | + return None |
| 248 | + |
| 249 | + # pick the file with the longest remaining lifetime |
| 250 | + best_ts = 0 if 0 in files else max((_ts for _ts in files if _ts >= ts), default=-1) |
| 251 | + |
| 252 | + # invalidate all other files |
| 253 | + if invalidate: |
| 254 | + for _ts, path in files.items(): |
| 255 | + if _ts != best_ts: |
| 256 | + try: |
| 257 | + os.remove(path) |
| 258 | + print("invalidated", path) |
| 259 | + except: |
| 260 | + pass |
| 261 | + |
| 262 | + return files[best_ts] if best_ts >= 0 else None |
| 263 | + |
| 264 | + # get a utc timestamp |
| 265 | + ts = round(datetime.now(timezone.utc).timestamp()) |
| 266 | + |
| 267 | + # check the writable default cache directory |
| 268 | + writable_path = find(self.cache_directory, ts, True) |
| 269 | + if writable_path: |
232 | 270 | return writable_path, writable_path, True
|
233 | 271 |
|
| 272 | + # create a writable path in the default cache directory |
| 273 | + ts_postfix = "" if lifetime <= 0 else f"_{ts + lifetime}" |
| 274 | + writable_path = os.path.join(self.cache_directory, f"{h}{ts_postfix}.json") |
| 275 | + |
| 276 | + # check readable directories |
234 | 277 | for readonly_cache_directory in self.readonly_cache_directories:
|
235 |
| - readable_path = os.path.join(readonly_cache_directory, cache_name) |
236 |
| - if os.path.exists(readable_path): |
| 278 | + readable_path = find(readonly_cache_directory, ts, False) |
| 279 | + if readable_path: |
237 | 280 | return readable_path, writable_path, True
|
238 | 281 |
|
239 | 282 | return writable_path, writable_path, False
|
|
0 commit comments