Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fast centroid fit to sky camera spots #16

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4db44d4
start work on new PR
dkirkby Jan 11, 2025
212897f
apply black formatting to sky,util modules
dkirkby Jan 11, 2025
9e0d1a5
precompute grid of shifted spot profiles in ctor
dkirkby Jan 12, 2025
5857847
save best fit (dx,dy) in setraw()
dkirkby Jan 12, 2025
8c13ddd
fix typo
dkirkby Jan 12, 2025
ac69e59
save individual fitted spot centroids
dkirkby Jan 16, 2025
e13406f
implement fast centroiding fit
dkirkby Jan 30, 2025
37610f5
implement fast_centroids option in SKY.set_raw()
dkirkby Jan 30, 2025
a297a42
wip
dkirkby Feb 4, 2025
4e22ec5
wip
dkirkby Feb 4, 2025
4088157
use err=inf for masked fibers instead of err=0
dkirkby Feb 5, 2025
f9c40bf
wip
dkirkby Feb 5, 2025
213a5db
save results before temp correction
dkirkby Feb 5, 2025
41b6299
Save fast-centroid fit grids by default
dkirkby Feb 8, 2025
b3c797e
Add fn to plot fast vs slow centroid fits
dkirkby Feb 8, 2025
9426cd1
fix typos
dkirkby Feb 8, 2025
2155ff1
implement separate x,y grids for fast centroid fit
dkirkby Feb 8, 2025
30143e2
fix typo
dkirkby Feb 8, 2025
da60a97
handle coarse fit on edge
dkirkby Feb 9, 2025
4b8fe3b
better handling of centroid fit at edge
dkirkby Feb 9, 2025
08804b4
add support for reprocessing sky data
dkirkby Feb 9, 2025
aa4a592
use sqlalchemy for db access
dkirkby Feb 9, 2025
f62d5d6
apply black formatting
dkirkby Feb 9, 2025
a6bbff0
fix import
dkirkby Feb 9, 2025
298ea43
add options to sky.process_night
dkirkby Feb 9, 2025
4544277
add timing to output
dkirkby Feb 9, 2025
53cbdd1
fix typo
dkirkby Feb 9, 2025
7c97bd9
add fast/slow option, fix elapsed
dkirkby Feb 9, 2025
82556e8
make DB optional
dkirkby Feb 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.1.20] - Unreleased

## [0.1.19] - 2024-10-16
### Changed
- Make default SkyCam processing identical to pre 0.1.18, i.e. refit by default and do not apply centroid fitting or temperature corrections. Centroid fitting is not ready to deploy yet since it runs too slowly. Temperature corrections will require new plumbing with ICS.
Expand Down
2 changes: 1 addition & 1 deletion desietc/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.1.19"
__version__ = "0.1.20dev"
177 changes: 106 additions & 71 deletions desietc/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
are installed (for a direct connection), or else that requests is installed
(for an indirect http connection).
"""

import collections
import datetime
import os.path
import pathlib
import io

try:
Expand All @@ -30,7 +31,7 @@
class DB(object):
"""Initialize a connection to the database.

To force a direct connection using pyscopg2, set ``http_fallback``
To force a direct connection using sqlalchemy and pyscopg2, set ``http_fallback``
to ``False``. To force an indirect http connection using requests,
set ``config_name`` to ``None``. By default, will attempt a
direct connection then fall back to an indirect connection.
Expand All @@ -49,65 +50,81 @@ class DB(object):
Use an indirect http connection when a direct connection fails
if True.
"""
def __init__(self, config_name='db.yaml', http_fallback=True):
self.method = 'indirect'
if os.path.exists(config_name):

def __init__(
self,
config_name="/global/cfs/cdirs/desi/engineering/focalplane/db.yaml",
http_fallback=True,
):
self.method = "indirect"
if pathlib.Path(config_name).exists():
# Try a direct connection.
try:
import yaml
except ImportError:
raise RuntimeError('The pyyaml package is not installed.')
with open(config_name, 'r') as f:
raise RuntimeError("The pyyaml package is not installed.")
with open(config_name, "r") as f:
db_config = yaml.safe_load(f)
try:
import psycopg2
self.conn = psycopg2.connect(**db_config)
self.method = 'direct'
import sqlalchemy

self.engine = sqlalchemy.create_engine(
"postgresql://{user}:{password}@{host}:{port}/{dbname}".format(
**db_config
)
)
self.method = "direct"
except ImportError:
if not http_fallback:
raise RuntimeError('The psycopg2 package is not installed.')
raise RuntimeError("The sqlalchemy package is not installed.")
except Exception as e:
if not http_fallback:
raise RuntimeError(f'Unable to establish a database connection:\n{e}')
if self.method == 'indirect' and http_fallback:
raise RuntimeError(
f"Unable to establish a database connection:\n{e}"
)
if self.method == "indirect" and http_fallback:
try:
import requests
except ImportError:
raise RuntimeError('The requests package is not installed.')
logging.info(f'Established {self.method} database connection.')
raise RuntimeError("The requests package is not installed.")
logging.info(f"Established {self.method} database connection.")

def query(self, sql, maxrows=10, dates=None):
"""Perform a query using arbitrary SQL. Returns a pandas dataframe.
Use maxrows=None to remove any limit on the number of returned rows.
"""
logging.debug(f'SQL: {sql}')
if 'limit ' in sql.lower():
raise ValueError('Must specify SQL LIMIT using maxrows.')
logging.debug(f"SQL: {sql}")
if "limit " in sql.lower():
raise ValueError("Must specify SQL LIMIT using maxrows.")
if maxrows is None:
maxrows = 'NULL'
if self.method == 'direct':
return pd.read_sql(sql + f' LIMIT {maxrows}', self.conn, parse_dates=dates)
maxrows = "NULL"
if self.method == "direct":
return pd.read_sql(
sql + f" LIMIT {maxrows}", self.engine, parse_dates=dates
)
else:
return self.indirect(dict(sql_statement=sql, maxrows=maxrows), dates)

def indirect(self, params, dates=None):
"""Perform an indirect query using an HTTP request. Returns a pandas dataframe."""
url = 'https://replicator.desi.lbl.gov/QE/DESI/app/query'
params['dbname'] = 'desi'
url = "https://replicator.desi.lbl.gov/QE/DESI/app/query"
params["dbname"] = "desi"
# Use tab-separated output since the web interface does not escape embedded
# special characters, and there are instances of commas in useful
# string columns like PROGRAM.
#params['output_type'] = 'text,' # comma separated
params['output_type'] = 'text' # tab separated
logging.debug(f'INDIRECT PARAMS: {params}')
# params['output_type'] = 'text,' # comma separated
params["output_type"] = "text" # tab separated
logging.debug(f"INDIRECT PARAMS: {params}")
req = requests.get(url, params=params)
if req.status_code != requests.codes.ok:
if req.status_code == 401:
raise RuntimeError('Authentication failed: have you setup your .netrc file?')
raise RuntimeError(
"Authentication failed: have you setup your .netrc file?"
)
req.raise_for_status()
# The server response ends each line with "\t\r\n" so we replace that with "\n" here.
text = req.text.replace('\t\r\n', '\n')
return pd.read_csv(io.StringIO(text), sep='\t', parse_dates=dates)
text = req.text.replace("\t\r\n", "\n")
return pd.read_csv(io.StringIO(text), sep="\t", parse_dates=dates)

@staticmethod
def where(**kwargs):
Expand All @@ -134,56 +151,58 @@ def where(**kwargs):
lo, hi = spec
assert lo is None or hi is None or lo < hi
if lo == None:
where.append(f'{col}<={hi}')
where.append(f"{col}<={hi}")
elif hi == None:
where.append(f'{col}>={lo}')
where.append(f"{col}>={lo}")
else:
where.append(f'({col} BETWEEN {lo} AND {hi})')
except (ValueError,TypeError,AssertionError):
where.append(f"({col} BETWEEN {lo} AND {hi})")
except (ValueError, TypeError, AssertionError):
try:
# Try to interpret spec as a string.
has_wildcard = any([wc in spec for wc in '%_'])
has_wildcard = any([wc in spec for wc in "%_"])
if has_wildcard:
where.append(f"{col} LIKE '{spec}'")
else:
where.append(f"{col}='{spec}'")
except TypeError:
# Assume that spec is a single numeric value.
where.append(f'{col}={spec}')
return ' AND '.join(where)
where.append(f"{col}={spec}")
return " AND ".join(where)

def select(self, table, what, where=None, maxrows=10, order=None, dates=None):
sql = f'select {what} from {table}'
sql = f"select {what} from {table}"
if where is not None:
sql += f' where {where}'
sql += f" where {where}"
if order is not None:
sql += f' order by {order}'
sql += f" order by {order}"
return self.query(sql, maxrows, dates)


class Exposures(object):
"""Cacheing wrapper class for the exposure database.
Note that the exposures table uses 'ID' for the exposure id (not EXPID).
"""
def __init__(self, db, columns='*', cachesize=5000):

def __init__(self, db, columns="*", cachesize=5000):
# Run a test query.
test = db.select('exposure.exposure', columns, maxrows=1)
test = db.select("exposure.exposure", columns, maxrows=1)
self.columns = list(test.columns)
logging.debug(f'exposure table columns: {self.columns}')
self.what = ','.join(self.columns)
logging.debug(f"exposure table columns: {self.columns}")
self.what = ",".join(self.columns)
self.db = db
self.cache = collections.OrderedDict()
self.cachesize = cachesize

def __call__(self, expid, what=None):
"""Lookup a single exposure and cache the results.
"""
"""Lookup a single exposure and cache the results."""
if what is not None and what not in self.columns:
raise ValueError(f'Invalid column name: "{what}".')
if expid not in self.cache:
row = self.db.select('exposure.exposure', self.what, where=f'id={expid}', maxrows=1)
row = self.db.select(
"exposure.exposure", self.what, where=f"id={expid}", maxrows=1
)
if row is None:
raise ValueError('No such exposure id {0}.'.format(expid))
raise ValueError("No such exposure id {0}.".format(expid))
# Cache the results.
self.cache[expid] = row.values[0]
# Trim the cache if necessary.
Expand All @@ -196,30 +215,39 @@ def __call__(self, expid, what=None):
return values[self.columns.index(what)]

def select(self, where, maxrows=10):
"""Get exposures selected by where. Results are not cached.
"""
return self.db.select('exposure.exposure', self.what, where=where, maxrows=maxrows)
"""Get exposures selected by where. Results are not cached."""
return self.db.select(
"exposure.exposure", self.what, where=where, maxrows=maxrows
)


class NightTelemetry(object):
"""Lookup telemetry using a cache of local noon-noon results.
"""
def __init__(self, db, tablename, columns='*', cachesize=10, timestamp='time_recorded', verbose=False):
"""Lookup telemetry using a cache of local noon-noon results."""

def __init__(
self,
db,
tablename,
columns="*",
cachesize=10,
timestamp="time_recorded",
verbose=False,
):
# Run a test query.
test = db.select('telemetry.' + tablename, columns, maxrows=1)
test = db.select("telemetry." + tablename, columns, maxrows=1)
self.db = db
self.cachesize = int(cachesize)
self.tablename = tablename
self.columns = list(test.columns)
if timestamp not in self.columns:
self.columns.append(timestamp)
self.what = ','.join(self.columns)
self.what = ",".join(self.columns)
self.timestamp = timestamp
if verbose:
print(f'Initialized telemetry from {self.tablename} for {self.what}.')
print(f"Initialized telemetry from {self.tablename} for {self.what}.")
self.cache = collections.OrderedDict()
self.MJD_epoch = pd.Timestamp('1858-11-17', tz='UTC')
self.one_day = pd.Timedelta('1 days')
self.MJD_epoch = pd.Timestamp("1858-11-17", tz="UTC")
self.one_day = pd.Timedelta("1 days")

def __call__(self, night, what=None, MJD=None):
"""Return the telemetry for a single night.
Expand Down Expand Up @@ -248,34 +276,39 @@ def __call__(self, night, what=None, MJD=None):
if what is not None and what not in self.columns:
raise ValueError(f'Invalid column name "{what}". Pick from {self.what}.')
if MJD is not None and what is None:
raise ValueError(f'Must specify a column (what) with MJD values.')
raise ValueError(f"Must specify a column (what) with MJD values.")
# Calculate local midnight on night = YYYYMMDD as midnight UTC + 31 hours (assuming local = UTC-7)
try:
midnight = datetime.datetime.strptime(str(night), '%Y%m%d') + datetime.timedelta(days=1, hours=7)
midnight = datetime.datetime.strptime(
str(night), "%Y%m%d"
) + datetime.timedelta(days=1, hours=7)
except ValueError:
raise ValueError(f'Badly formatted or invalid night: "{night}".')
self.midnight = pd.Timestamp(midnight, tz='UTC')
self.midnight = pd.Timestamp(midnight, tz="UTC")
if night not in self.cache or MJD is not None:
# Fetch data from local noon on YYYYMMDD until local noon the next day.
tmin = self.midnight - pd.Timedelta(12, 'hours')
tmax = self.midnight + pd.Timedelta(12, 'hours')
tmin = self.midnight - pd.Timedelta(12, "hours")
tmax = self.midnight + pd.Timedelta(12, "hours")
if MJD is not None:
MJD = np.asarray(MJD)
# Check that the min MJD is within our range.
timestamp = self.MJD_epoch + MJD.min() * self.one_day
if timestamp < tmin or timestamp > tmax:
raise ValueError(f'MJD {MJD.min()} ({timestamp}) not in night {night}.')
raise ValueError(f"MJD {MJD.min()} ({timestamp}) not in night {night}.")
# Check that the max MJD is within our range.
timestamp = self.MJD_epoch + MJD.max() * self.one_day
if timestamp < tmin or timestamp > tmax:
raise ValueError(f'MJD {MJD.max()} ({timestamp}) not in night {night}.')
raise ValueError(f"MJD {MJD.max()} ({timestamp}) not in night {night}.")
if night not in self.cache:
# Fetch the results.
results = self.db.select(
self.tablename, self.what, maxrows=None,
where=f"{self.timestamp}>=TIMESTAMP '{tmin}' and {self.timestamp}<=TIMESTAMP '{tmax}'")
self.tablename,
self.what,
maxrows=None,
where=f"{self.timestamp}>=TIMESTAMP '{tmin}' and {self.timestamp}<=TIMESTAMP '{tmax}'",
)
# Convert the timestamp column to MJD.
results['MJD'] = (results[self.timestamp] - self.MJD_epoch) / self.one_day
results["MJD"] = (results[self.timestamp] - self.MJD_epoch) / self.one_day
# Cache the results.
self.cache[night] = results
# Trim the cache if necessary.
Expand All @@ -287,11 +320,13 @@ def __call__(self, night, what=None, MJD=None):
if what is None:
return results
# Select the specified column (in addition to MJD).
results = results[['MJD', what]]
results = results[["MJD", what]]
if MJD is None:
return results
# Interpolate to the specified time (assuming "what" is numeric).
dtype = results[what].dtype
if not np.issubdtype(dtype, np.number):
raise ValueError(f'Nearest neighbor lookup not implemented yet for dtype "{dtype}".')
return np.interp(MJD, results['MJD'], results[what])
raise ValueError(
f'Nearest neighbor lookup not implemented yet for dtype "{dtype}".'
)
return np.interp(MJD, results["MJD"], results[what])
Loading