-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #124 from lsst/tickets/SP-1789
tickets/SP-1789: plot EFD logevents on an event timeline plot
- Loading branch information
Showing
12 changed files
with
1,651 additions
and
87 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
import asyncio | ||
import os | ||
import threading | ||
from collections import defaultdict | ||
from collections.abc import Iterable | ||
from functools import partial | ||
from warnings import warn | ||
|
||
import pandas as pd | ||
from lsst_efd_client import EfdClient | ||
|
||
from schedview.dayobs import DayObs | ||
|
||
DEFAULT_EFD = ( | ||
"summit_efd" if os.getenv("EXTERNAL_INSTANCE_URL", "") == "https://summit-lsp.lsst.codes" else "usdf_efd" | ||
) | ||
SAL_INDEX_GUESSES = defaultdict(partial([[]].__getitem__, 0), {"lsstcomcam": [1, 3], "latiss": [2]}) | ||
|
||
|
||
def _get_efd_client(efd: EfdClient | str | None) -> EfdClient: | ||
match efd: | ||
case EfdClient(): | ||
efd_client = efd | ||
case str(): | ||
efd_client = EfdClient(efd) | ||
case None: | ||
try: | ||
from lsst.summit.utils.efdUtils import makeEfdClient | ||
|
||
efd_client = makeEfdClient() | ||
except ModuleNotFoundError: | ||
warn( | ||
"lsst.summit.utils not installed, " | ||
f"falling back on guessing the EFD client: {DEFAULT_EFD}" | ||
) | ||
efd_client = EfdClient(DEFAULT_EFD) | ||
except RuntimeError: | ||
warn( | ||
"lsst.summit.utils cannot automatically determine which EFD to use on this host, " | ||
f"falling back on guessing the EFD client: {DEFAULT_EFD}" | ||
) | ||
efd_client = EfdClient(DEFAULT_EFD) | ||
case _: | ||
raise ValueError(f"Cannot translate a {type(efd)} to an EfdClient.") | ||
|
||
return efd_client | ||
|
||
|
||
async def _get_efd_fields_for_topic(efd_client, topic, public_only=True): | ||
fields = await efd_client.get_fields(topic) | ||
if public_only: | ||
fields = [f for f in fields if "private" not in f] | ||
|
||
return fields | ||
|
||
|
||
async def query_efd_topic_for_night( | ||
topic: str, | ||
day_obs: DayObs | str | int, | ||
sal_indexes: tuple[int, ...] = (1, 2, 3), | ||
efd: EfdClient | str | None = None, | ||
fields: list[str] | None = None, | ||
) -> pd.DataFrame: | ||
"""Query and EFD topic for all entries on a night. | ||
Parameters | ||
---------- | ||
topic : `str` | ||
The topic to query | ||
day_obs : `DayObs` or `str` or `int` | ||
The date of the start of the night requested. | ||
sal_indexes : `tuple[int, ...]`, optional | ||
Which SAL indexes to query, by default (1, 2, 3). | ||
Can be guessed by instrument with ``SAL_INDEX_GUESSES[instrument]`` | ||
efd : `EfdClient` or `str` `None`, optional | ||
The EFD client to use, by default None, which creates a new one | ||
based on the environment. | ||
fields : `list[str]` or `None`, optional | ||
Fields to query from the topic, by default None, which queries all | ||
fields. | ||
Returns | ||
------- | ||
result : `pd.DataFrame` | ||
The result of the query | ||
""" | ||
|
||
day_obs = day_obs if isinstance(day_obs, DayObs) else DayObs.from_date(day_obs) | ||
efd_client = _get_efd_client(efd) | ||
|
||
if fields is None: | ||
fields = await _get_efd_fields_for_topic(efd_client, topic) | ||
|
||
if not isinstance(sal_indexes, Iterable): | ||
sal_indexes = [sal_indexes] | ||
|
||
results = [] | ||
for sal_index in sal_indexes: | ||
result = await efd_client.select_time_series( | ||
topic, fields, day_obs.start, day_obs.end, index=sal_index | ||
) | ||
if isinstance(result, pd.DataFrame) and len(result) > 0: | ||
results.append(result) | ||
|
||
result = pd.concat(results) if len(results) > 0 else pd.DataFrame() | ||
result.index.name = "time" | ||
|
||
return result | ||
|
||
|
||
async def query_latest_in_efd_topic( | ||
topic: str, | ||
num_records: int = 6, | ||
sal_indexes: tuple[int, ...] = (1, 2, 3), | ||
efd: EfdClient | str | None = None, | ||
fields: list[str] | None = None, | ||
) -> pd.DataFrame: | ||
"""Query and EFD topic for all entries on a night. | ||
Parameters | ||
---------- | ||
topic : `str` | ||
The topic to query | ||
num_records : `int` | ||
The number of records to return. | ||
sal_indexes : `tuple[int, ...]`, optional | ||
Which SAL indexes to query, by default (1, 2, 3). | ||
Can be guessed by instrument with ``SAL_INDEX_GUESSES[instrument]`` | ||
efd : `EfdClient` or `str` `None`, optional | ||
The EFD client to use, by default None, which creates a new one | ||
based on the environment. | ||
fields : `list[str]` or `None`, optional | ||
Fields to query from the topic, by default None, which queries all | ||
fields. | ||
Returns | ||
------- | ||
result : `pd.DataFrame` | ||
The result of the query | ||
""" | ||
|
||
efd_client = _get_efd_client(efd) | ||
|
||
if fields is None: | ||
fields = await _get_efd_fields_for_topic(efd_client, topic) | ||
|
||
if not isinstance(sal_indexes, Iterable): | ||
sal_indexes = [sal_indexes] | ||
|
||
results = [] | ||
for sal_index in sal_indexes: | ||
result = await efd_client.select_top_n(topic, fields, num_records, index=sal_index) | ||
if isinstance(result, pd.DataFrame) and len(result) > 0: | ||
results.append(result) | ||
|
||
result = pd.concat(results) if len(results) > 0 else pd.DataFrame() | ||
|
||
return result | ||
|
||
|
||
def sync_query_efd_topic_for_night(*args, **kwargs): | ||
"""Just like query_efd_topic_for_night, but run in a separate thread | ||
and block for results, so it can be run within a separate event loop. | ||
""" | ||
# Inspired by https://stackoverflow.com/questions/74703727 | ||
# Works even in a panel event loop | ||
io_loop = asyncio.new_event_loop() | ||
io_thread = threading.Thread(target=io_loop.run_forever, name="EFD query thread", daemon=True) | ||
|
||
def run_async(coro): | ||
if not io_thread.is_alive(): | ||
io_thread.start() | ||
future = asyncio.run_coroutine_threadsafe(coro, io_loop) | ||
return future.result() | ||
|
||
result = run_async(query_efd_topic_for_night(*args, **kwargs)) | ||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
from typing import Literal | ||
|
||
from schedview.collect.efd import query_efd_topic_for_night | ||
from schedview.collect.nightreport import get_night_narrative | ||
from schedview.collect.visits import NIGHT_STACKERS, read_visits | ||
from schedview.dayobs import DayObs | ||
|
||
EFD_TOPIC_FOR_KEY = { | ||
"scheduler_dependencies": "lsst.sal.Scheduler.logevent_dependenciesVersions", | ||
"scheduler_configuration": "lsst.sal.Scheduler.logevent_configurationApplied", | ||
"block_status": "lsst.sal.Scheduler.logevent_blockStatus", | ||
"scheduler_snapshots": "lsst.sal.Scheduler.logevent_largeFileObjectAvailable", | ||
} | ||
|
||
|
||
async def collect_timeline_data( | ||
day_obs: str | int | DayObs, | ||
sal_indexes: tuple[int, ...] = (1, 2, 3), | ||
telescope: Literal["AuxTel", "Simonyi"] = "Simonyi", | ||
visit_origin: str = "lsstcomcam", | ||
**kwargs, | ||
) -> dict: | ||
"""Create a dictionary with data to put on a timeline, compatible with | ||
`schedview.plot.timeline.make_multitimeline`. | ||
Parameters | ||
---------- | ||
day_obs : `str` or `int` or `DayObs` | ||
Night for which to retrieve data | ||
sal_indexes : `tuple[int, ...]`, optional | ||
sal indexes from which to query EFD data , by default (1, 2, 3) | ||
telescope : `str`, optional | ||
"AuxTel" or "Simonyi", by default "Simonyi" | ||
visit_origin : `str`, optional | ||
Source of visit data, by default "lsstcomcam" | ||
Returns | ||
------- | ||
timeline_data: `dict` | ||
Data for a timeline plot. | ||
""" | ||
|
||
day_obs = DayObs.from_date(day_obs) | ||
|
||
data = {} | ||
requested_keys = [k for k in kwargs if kwargs[k]] | ||
|
||
for key in requested_keys: | ||
if key == "visits": | ||
if "visit_timeline" in data: | ||
data[key] = data["visit_timeline"] | ||
else: | ||
data[key] = read_visits(day_obs, visit_origin, stackers=NIGHT_STACKERS) | ||
elif key == "visit_timeline": | ||
if "visits" in data: | ||
data[key] = data["visits"] | ||
else: | ||
data[key] = read_visits(day_obs, visit_origin, stackers=NIGHT_STACKERS) | ||
elif key == "log_messages": | ||
data[key] = get_night_narrative(day_obs, telescope) | ||
elif key in EFD_TOPIC_FOR_KEY: | ||
data[key] = await query_efd_topic_for_night(EFD_TOPIC_FOR_KEY[key], day_obs, sal_indexes) | ||
else: | ||
ValueError("Unrecognized data key: {key}") | ||
|
||
return data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
import pandas as pd | ||
from astropy.time import Time | ||
from lsst.ts.xml.enums.Script import ScriptState | ||
|
||
from schedview.dayobs import DayObs | ||
|
||
|
||
def collapse_script_logevents(script_logevents: pd.DataFrame | list) -> pd.DataFrame: | ||
script_logevents = pd.DataFrame(script_logevents) | ||
|
||
script_logevents["scriptStateName"] = script_logevents["scriptState"].map( | ||
{k: ScriptState(k).name for k in ScriptState} | ||
) | ||
|
||
for col in [c for c in script_logevents.columns if c.startswith("timestamp")]: | ||
script_logevents[col.removeprefix("timestamp")] = Time( | ||
script_logevents[col], format="unix_tai" | ||
).datetime64 | ||
|
||
state_times = ( | ||
script_logevents.reset_index() | ||
.groupby(["scriptSalIndex", "scriptStateName"]) | ||
.agg("last") | ||
.reset_index(["scriptStateName"]) | ||
.pivot(columns=["scriptStateName"], values=["time"]) | ||
.droplevel(0, axis="columns") | ||
) | ||
last_values = script_logevents.groupby("scriptSalIndex").agg("last") | ||
first_times = script_logevents.reset_index().groupby("scriptSalIndex").agg({"time": "min"}) | ||
last_times = script_logevents.reset_index().groupby("scriptSalIndex").agg({"time": "max"}) | ||
collapsed_script = state_times.join(last_values) | ||
collapsed_script.insert(0, "last_logevent_time", last_times) | ||
collapsed_script.insert(0, "first_logevent_time", first_times) | ||
return collapsed_script | ||
|
||
|
||
def find_script_stage_spans(scripts: pd.DataFrame | list, day_obs: DayObs) -> pd.DataFrame: | ||
"""Create a table of script events with start and end times for the | ||
configure and process stages. | ||
Parameters | ||
---------- | ||
scripts : `pd.DataFrame` or `list` | ||
Data as returned from a query to | ||
the lsst.sal.ScriptQueue.logevent_script channel of the EFD. | ||
Returns | ||
------- | ||
pd.DataFrame | ||
The DataFrame with added `stage`, `start_time`, and `end_time` | ||
columns. | ||
""" | ||
scripts = collapse_script_logevents(scripts) | ||
|
||
stage_spans = [] | ||
for stage in ("Configure", "Process"): | ||
these_spans = scripts.copy() | ||
these_spans.insert(0, "end_time", scripts[f"{stage}End"]) | ||
these_spans.insert(0, "start_time", scripts[f"{stage}Start"]) | ||
these_spans.insert(0, "stage", stage) | ||
|
||
stage_spans.append(these_spans) | ||
|
||
script_spans = pd.concat(stage_spans) | ||
script_spans = script_spans.loc[ | ||
(script_spans["start_time"] < day_obs.end.datetime64) | ||
& (script_spans["start_time"] > day_obs.start.datetime64) | ||
& (script_spans["end_time"] < day_obs.end.datetime64) | ||
& (script_spans["end_time"] > day_obs.start.datetime64), | ||
:, | ||
] | ||
|
||
return script_spans |
Oops, something went wrong.