Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New forced-photometry loaders #170

Merged
merged 43 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b536159
chore(deps): upgrade fastavro to <1.7 || >=1.9.2
jvansanten Dec 21, 2023
c5272b0
ci: switch to setup-cache-python-poetry action
jvansanten Dec 21, 2023
627b4ed
chore(deps): roll back lockfile
jvansanten Dec 21, 2023
b6bc272
chore(deps): upgrade fastavro to ^1.9.2
jvansanten Dec 21, 2023
1fb1795
mypy: HTTPError.response may be None
jvansanten Dec 19, 2023
5060a5e
mypy: use tuples in various matplotlib apis
jvansanten Dec 19, 2023
4b00aa3
mypy: fixup for fastavro 1.9
jvansanten Dec 19, 2023
a55a4be
chore(deps): exclude ampel >=0.9
jvansanten Dec 21, 2023
2fa2e3c
Make deltaT an explicit unit parameter.
wombaugh Jan 12, 2024
d670274
Merge pull request #131 from AmpelAstro/v0.8.10_tokengeneratorpatch
jvansanten Jan 12, 2024
aaaf8a3
Bump version
jvansanten Jan 12, 2024
f16b896
DecentFilter: check whether drb exists in alert (#132)
andi-matter Jan 17, 2024
5a2c2e0
Bump version to 0.8.12
jvansanten Jan 17, 2024
e434ca6
deps: bump requests-toolbelt to 1.0
jvansanten Jan 22, 2024
ae57331
ZTFAlertStreamController: shiny for mypy
jvansanten Dec 19, 2023
088100d
Merge pull request #134 from AmpelAstro/requests-toolbelt-1.0
jvansanten Jan 22, 2024
3c0a735
[no ci] configure renovate
jvansanten Jan 26, 2024
ee51ff0
chore: use centralized workflow
jvansanten Jan 26, 2024
95847fa
tests: remove stray fields from channel def
jvansanten Jan 26, 2024
74465a7
tests: pass ttl to compiler
jvansanten Jan 26, 2024
6f76456
chore(deps): update minor updates (stable/v0.8) (#138)
renovate[bot] Apr 25, 2024
93bd626
chore: disable lockfile maintenance
jvansanten Apr 25, 2024
ae03b23
chore(deps): pin ampelproject/ampel-interface action to e880b6e
renovate[bot] Apr 25, 2024
9cffa18
ZTFCutoutImages: correctly forward init args
jvansanten May 2, 2024
8f08d1a
Bump version to 0.8.14
jvansanten May 2, 2024
d68a332
request output parsing updated with new endpoint. (#148)
wombaugh May 3, 2024
f882e7b
Bump version to 0.8.15
jvansanten May 3, 2024
e4c16ea
Allow floating (JD) time offsets. (#150)
wombaugh May 14, 2024
cf6c7c1
chore(deps): update minor updates (#152)
renovate[bot] May 24, 2024
67b8495
SkyPortalPublisher: raise a more informative error when compiler_opts…
jvansanten May 29, 2024
2dc2367
SkyPortalPublisher: make cutout posting configurable
jvansanten Jun 1, 2024
f3999ee
Bump version to 0.8.16
jvansanten Jun 1, 2024
6005446
Units for supplying different kinds of forced photometry.
wombaugh Nov 27, 2024
20c395c
Units for supplying different kinds of forced photometry, cont
wombaugh Nov 27, 2024
6ef554e
Merge branch 'master' into newFPloaders
jvansanten Nov 27, 2024
6868654
ruff: pyupgrade
jvansanten Nov 27, 2024
dae8477
ruff: if-stmt-min-max
jvansanten Nov 27, 2024
8b66a60
ruff: simplify
jvansanten Nov 27, 2024
14c1672
ruff: avoid quadratic list summation
jvansanten Nov 27, 2024
4b6c9ee
Merge branch 'master' into newFPloaders
jvansanten Nov 27, 2024
333f501
refresh locks
jvansanten Nov 28, 2024
d2cc947
tests: close mongo clients
jvansanten Nov 28, 2024
f590e1f
Bump version to 0.10.3a0
jvansanten Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 289 additions & 0 deletions ampel/ztf/alert/BTSForcedPhotometryAlertSupplier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
#!/usr/bin/env python
# File: Ampel-ZTF/ampel/ztf/alert/BTSForcedPhotometryAlertSupplier.py
# License: BSD-3-Clause
# Author: valery brinnel
# Date: 25.10.2021
# Last Modified Date: 15.11.2024
# Last Modified By: jno

import sys
from hashlib import blake2b

import numpy as np
import pandas as pd
from astropy.coordinates import SkyCoord
from astropy.time import Time
from bson import encode

from ampel.alert.AmpelAlert import AmpelAlert
from ampel.alert.BaseAlertSupplier import BaseAlertSupplier
from ampel.protocol.AmpelAlertProtocol import AmpelAlertProtocol
from ampel.view.ReadOnlyDict import ReadOnlyDict
from ampel.ztf.alert.calibrate_fps_fork import search_peak
from ampel.ztf.util.ZTFIdMapper import to_ampel_id

dcast = {
"jd": float,
"fnu_microJy": float,
"fnu_microJy_unc": float,
"passband": str,
"programid": int,
"fcqfid": int,
"zpdiff": float,
"sys_unc_factor": float,
"flags": int,
}

ZTF_FILTER_MAP = {"ZTF_g": 1, "ZTF_r": 2, "ZTF_i": 3}


class BTSForcedPhotometryAlertSupplier(BaseAlertSupplier):
"""
Returns an AmpelAlert instance for each file path provided by the underlying alert loader.

This unit assuming that input files are *baseline corrected* IPAC
forced photometry files, of the kind produced by the BTS group.

These are assumed to be named as:
{ZTFID}_fnu.csv
with columns ['jd', 'fnu_microJy', 'fnu_microJy_unc', 'passband', 'fcqfid', 'zpdiff', 'sys_unc_factor', 'flags']


"""

# FP read and baseline correction
flux_key: str = "fnu_microJy"
flux_unc_key: str = "fnu_microJy_unc"
flux_unc_scale: dict[str, float] = {"ZTF_g": 1.0, "ZTF_r": 1.0, "ZTF_i": 1.0}
flux_unc_floor: float = 0.02
baseline_flag_cut: int = (
512 # This will allow also cases with underestimated scaled unc
)
days_prepeak: None | float = (
None # Cut epochs earlier than this relative to peak. Assumes this can be found!
)
days_postpeak: None | float = (
None # Cut epochs later than this relative to peak. Assumes this can be found!
)
allow_iband_peak: bool = (
False # Include I-band in peak calculation (Warning: I-band incomplete)
)

# Candidate coordinates
# BTS IPAC FP files are named according to a transient, but do not contain coordinates.
# You can provide a file containing {ZTFid} to {ra,dec} maps, otherwise the file names will be used.
name_file: str | None = None
file_keys: dict[str, str] = {
"id": "ZTFID",
"ra": "RA",
"dec": "Dec",
"raunit": "hourangle",
"decunit": "deg",
} # Name column in file
name_coordinates = None # Loaded at first run
name_values = None # Loaded at first run

# Run mode (both can be used)
alert_history: bool = False # Return an "alert" for each significant detection
full_history: bool = True # Return the full lightcurve, after alerts.
detection_threshold: float = (
5.0 # S/N above this considered a significant detection
)

# Loaded transient datapoints (for alert mode)
# Dynamically updated during
transient_name: str = ""
transient_tags: list = []
transient_pps: list = []
transient_baselineinfo: dict = {}
transient_hashid: list = []
alert_counter: int = 0

def __init__(self, **kwargs) -> None:
kwargs["deserialize"] = None
super().__init__(**kwargs)

def _load_pps(self, fpath: str) -> bool:
"""
Load BTS IPAC FP datapoints from input file.
Will set the transient id, tags and pps variables.
Return False in case nothing was found.
"""

# post_init did not work for Supplier kind of units, loading name maps here.
if self.name_file and not self.name_coordinates:
# Move to init when configured correctly
df = pd.read_csv(self.name_file)
self.name_coordinates = SkyCoord(
ra=df[self.file_keys["ra"]],
dec=df[self.file_keys["dec"]],
unit=(self.file_keys["raunit"], self.file_keys["decunit"]),
)
self.name_values = df[self.file_keys["id"]]

# Assuming that filenames follow the BTS convention
ztfname = fpath.split("/")[-1].split("_")[0]

# Find coordinates - if name file exists
ra, dec = None, None # Allowing events with no coordinate info
if self.name_coordinates:
myc = self.name_coordinates[self.name_values == ztfname]
if len(myc) > 0:
ra = myc.ra.deg[0]
dec = myc.dec.deg[0]

# Read file
df = pd.read_csv(fpath)
# Reject data with flags above threshold
df = df[df["flags"] <= self.baseline_flag_cut]
tags: list[str] = []

# Search each unique filter/quadrant combo for a peak, in order to cut outliers
if self.days_prepeak or self.days_postpeak:
# Reset index to allow mediam time-search
# obs_jd = Time(df.jd.values, format="jd")
df = df.set_index(pd.to_datetime(Time(df.jd.values, format="jd").datetime))
allpeaks = {}
for obs_group, df_group in df.groupby("fcqfid"):
# Skip secondary grid
if obs_group > 10000000: # type: ignore
continue
if not self.allow_iband_peak and str(obs_group)[-1] == "3":
continue
allpeaks[str(obs_group)] = search_peak(
df_group.fnu_microJy,
df_group.fnu_microJy_unc,
df_group.jd,
window="14D",
)
peaktimes = [v["t_fcqfid_max"] for k, v in allpeaks.items() if v["det_sn"]]
if len(peaktimes) == 0:
print(
"No peaks found in lightcurve, but a cut around this was required - skip object."
)
return False
if np.std(peaktimes, ddof=1) > 50:
print("Warning! Large scatter in time of maximum")
df["flags"] += 1
t_peak = np.mean(peaktimes)
# Could alternatively have weighted with the detection S/N
# snrs = [v["det_snr"] for k, v in allpeaks.items() if v["det_sn"]]
# wt_peak = np.average(peaktimes, weights=snrs)
# print(f"weighted peak time {wt_peak}")
if self.days_prepeak:
df = df[(df["jd"] - t_peak) >= -self.days_prepeak]
if self.days_postpeak:
df = df[(df["jd"] - t_peak) <= self.days_postpeak]

pps = []
alert_ids: list[bytes] = []
for _, row in df.iterrows():
pp = {
str(k): dcast[str(k)](v) if (k in dcast and v is not None) else v
for k, v in row.items()
}

pp["fid"] = ZTF_FILTER_MAP[pp["passband"]]
pp["ra"] = ra
pp["dec"] = dec
pp["rcid"] = (
(int(str(pp["fcqfid"])[-3]) - 1) * 4 + int(str(pp["fcqfid"])[-2]) - 1
)

# Some entries have flux values exactly at 0. Suspicious - we will skip these.
if pp[self.flux_key] == 0:
continue

# Convert jansky to flux
pp["flux"] = pp[self.flux_key] * 2.75406

# Opionally scale uncertainties
pp["flux_unc"] = (
pp[self.flux_unc_key] * 2.75406 * self.flux_unc_scale[pp["passband"]]
)

# Enforce error floor
if pp["flux_unc"] / abs(pp["flux"]) < self.flux_unc_floor:
if tags is None:
tags = ["FLOOR"]
else:
tags.append("FLOOR")
pp["flux_unc"] = pp["flux"] * self.flux_unc_floor

pp_hash = blake2b(encode(pp), digest_size=7).digest()
# pp["candid"] = int.from_bytes(pp_hash, byteorder=sys.byteorder)
pps.append(
ReadOnlyDict(
dict(
sorted(pp.items())
) # Ensure ordered keys for duplication search - necessary here?
)
)
# Create a running list of hash ids
if len(alert_ids) == 0:
alert_ids = [pp_hash]
else:
alert_ids.append(alert_ids[-1] + pp_hash)

self.transient_name = ztfname
self.transient_tags = tags
self.transient_pps = pps
self.transient_hashid = alert_ids
self.alert_counter = 0
return True

def _build_alert(self, datapoints: int) -> AmpelAlertProtocol:
return AmpelAlert(
id=int.from_bytes( # alert id
blake2b(self.transient_hashid[datapoints - 1], digest_size=7).digest(),
byteorder=sys.byteorder,
),
stock=to_ampel_id(self.transient_name), # internal ampel id
# stock=self.transient_name, # internal ampel id - for forced photometry we still have not associated to unique ZTF ... do we need to do that later?
# Ampel alert structure assumes most recent detection to come first
datapoints=tuple(
[self.transient_pps[datapoints - 1]]
+ self.transient_pps[0 : datapoints - 1]
),
extra=ReadOnlyDict(
{
"name": self.transient_name,
}
),
tag=self.transient_tags,
)

def __next__(self) -> AmpelAlertProtocol:
"""
:raises StopIteration: when alert_loader dries out.
:raises AttributeError: if alert_loader was not set properly before this method is called
"""

# Load next lightcurve if we eighter do not have one or already generated an alert with the full
while len(self.transient_pps) == 0 or self.alert_counter >= len(
self.transient_pps
):
# Load next lightcurve from file
# If no lightcurves found, alert loader should raise Stop Iteration
self._load_pps(
str(next(self.alert_loader))
) # Supplier assumings IO comes as path to file

if self.alert_history and self.alert_counter < len(self.transient_pps):
# Increase counter until a significant detection is found.
# Set counter to this, return alert with at lightcurve to this point.
for dp in self.transient_pps[self.alert_counter :]:
if abs(dp["flux"]) / dp["flux_unc"] > self.detection_threshold:
# Detection, stop here
self.alert_counter += 1 # still need to nudge this
return self._build_alert(self.alert_counter - 1)
break
self.alert_counter += 1

# Here we either have not generated piecewise alerts at all, or want to make sure we get one final alert with all data
if self.full_history and self.alert_counter < len(self.transient_pps):
# Make sure we return full history
self.alert_counter = len(self.transient_pps)
return self._build_alert(self.alert_counter)

return self.__next__()
17 changes: 6 additions & 11 deletions ampel/ztf/alert/ZTFFPbotForcedPhotometryAlertSupplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# from appdirs import user_cache_dir
from astropy.time import Time
from bson import encode
from scipy.stats import median_abs_deviation
Expand Down Expand Up @@ -103,7 +101,7 @@ def get_fpbot_baseline(
min_det_per_field_band: int = 10,
zp_max_deviation_from_median: float = 0.5,
reference_days_before_peak: None | float = 50.0,
) -> pd.DataFrame:
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""
For each unique baseline combination, estimate and store baseline.
Partially taken from
Expand All @@ -126,10 +124,7 @@ def get_fpbot_baseline(

"""
df["fcqfid"] = np.array(
df.fieldid.values * 10000
+ df.ccdid.values * 100
+ df.qid.values * 10
+ df.filterid.values
df["fieldid"] * 10000 + df["ccdid"] * 100 + df["qid"] * 10 + df["filterid"]
)

if primary_grid_only:
Expand Down Expand Up @@ -157,7 +152,7 @@ def get_fpbot_baseline(
df["zp_median_deviation"] = np.abs(np.log10(median_zp / df.magzp))
df.query("zp_median_deviation < @zp_max_deviation_from_median", inplace=True)

unique_fid = np.unique(df.fcqfid.values).astype(int)
unique_fid = np.unique(df.fcqfid).astype(int)

# Time index for use for rolling window
df = df.sort_values("obsmjd")
Expand All @@ -176,13 +171,13 @@ def get_fpbot_baseline(

fcqf_df = df.iloc[this_fcqfid].copy()
# Use the pulls from mean to find largest deviation
pull_series = fcqf_df.ampl / fcqf_df["ampl.err"]
pull_series = fcqf_df["ampl"] / fcqf_df["ampl.err"]
roll_med = pull_series.rolling(window, center=True).median().values
# Only use medians with a min nbr of values (otherwise we get edge results)
t_max = fcqf_df.obsmjd.values[np.argmax(roll_med)]
# flux_max = np.max(roll_med)
flux_max = fcqf_df.ampl.values[np.argmax(roll_med)]
flux_scatt = median_abs_deviation(fcqf_df.ampl.values, scale="normal")
flux_max = fcqf_df["ampl"].values[np.argmax(roll_med)]
flux_scatt = median_abs_deviation(fcqf_df["ampl"].values, scale="normal")
peak_snr = flux_max / flux_scatt
if (peak_snr > min_peak_snr) and (ufid < 10000000):
fcqfid_dict[str(ufid)]["det_sn"] = True
Expand Down
Loading