Skip to content

Commit

Permalink
Merge pull request #211 from desihub/no-zwarn-new
Browse files Browse the repository at this point in the history
no zwarn_new in iron and older specprods
  • Loading branch information
moustakas authored Feb 24, 2025
2 parents f293997 + 85a8212 commit ef40ff2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 40 deletions.
3 changes: 2 additions & 1 deletion bin/mpi-fastspecfit
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ def main():
tile=args.tile, night=args.night, outdir_data=outdir_data,
fastfiles_to_merge=fastfiles_to_merge, outsuffix=args.merge_suffix,
mergedir=args.mergedir, overwrite=args.overwrite,
fastphot=args.fastphot, supermerge=args.mergeall, mp=args.mp)
fastphot=args.fastphot, supermerge=args.mergeall, mp=args.mp,
split_hdu=True)
return


Expand Down
8 changes: 8 additions & 0 deletions doc/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ Change Log

*

3.1.4 (2025-02-24)
------------------

* Handle no ``ZWARN_NEW`` in Iron and older specprods; split large
merge files into separate HDUs [`PR #211`_].

.. _`PR #211`: https://github.com/desihub/fastspecfit/pull/211

3.1.3 (2025-02-01)
------------------

Expand Down
95 changes: 66 additions & 29 deletions py/fastspecfit/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@

# quasarnet and MgII afterburner columns to read
QNLINES = ['C_LYA', 'C_CIV', 'C_CIII', 'C_MgII', 'C_Hbeta', 'C_Halpha', ]
QNCOLS = ['TARGETID', 'Z_NEW', 'ZWARN_NEW', 'IS_QSO_QN_NEW_RR', ] + QNLINES
MGIICOLS = ['TARGETID', 'IS_QSO_MGII']

def one_spectrum(specdata, meta, uncertainty_floor=0.01, RV=3.1,
Expand Down Expand Up @@ -912,9 +911,13 @@ def update_qso_redshifts(zb, meta, qnfile, mgiifile, fitindx, specprod):

if specprod in ['fuji', 'guadalupe', 'himalayas', 'iron']:
QNthresh = 0.95
QNCOLS = ['TARGETID', 'Z_NEW', 'IS_QSO_QN_NEW_RR', ] + QNLINES
new_zwarn = False
else:
# updated for Jura, Kibo, Loa, ...
QNthresh = 0.99
QNCOLS = ['TARGETID', 'Z_NEW', 'ZWARN_NEW', 'IS_QSO_QN_NEW_RR', ] + QNLINES
new_zwarn = False

surv_target, surv_mask, surv = main_cmx_or_sv(meta, scnd=True)
if surv == 'cmx':
Expand All @@ -941,15 +944,17 @@ def update_qso_redshifts(zb, meta, qnfile, mgiifile, fitindx, specprod):
iqso = IQSO * qn['IS_QSO_QN_NEW_RR'] * qn['IS_QSO_QN_099']
if np.sum(iqso) > 0:
zb['Z'][iqso] = qn['Z_NEW'][iqso]
zb['ZWARN'][iqso] = qn['ZWARN_NEW'][iqso]
if new_zwarn:
zb['ZWARN'][iqso] = qn['ZWARN_NEW'][iqso]
if np.sum(IWISE_VAR_QSO) > 0:
mgii = Table(fitsio.read(mgiifile, 'MGII', rows=fitindx, columns=MGIICOLS))
assert(np.all(mgii['TARGETID'] == meta['TARGETID']))
iwise_var_qso = (((zb['SPECTYPE'] == 'QSO') | mgii['IS_QSO_MGII'] | qn['IS_QSO_QN_099']) & (IWISE_VAR_QSO & qn['IS_QSO_QN_NEW_RR']))
if np.sum(iwise_var_qso) > 0:
zb['Z'][iwise_var_qso] = qn['Z_NEW'][iwise_var_qso]
#zb['Z_ERR'][iwise_var_qso] = qn['ZERR_NEW'][iwise_var_qso]
zb['ZWARN'][iwise_var_qso] = qn['ZWARN_NEW'][iwise_var_qso]
if new_zwarn:
zb['ZWARN'][iwise_var_qso] = qn['ZWARN_NEW'][iwise_var_qso]
del mgii
del qn

Expand Down Expand Up @@ -1570,7 +1575,7 @@ def write_fastspecfit(meta, specphot, fastfit, modelspectra=None, outfile=None,
uncertainty_floor=0.01, minsnr_balmer_broad=2.5,
no_smooth_continuum=False, ignore_photometry=False,
broadlinefit=True, use_quasarnet=True, constrain_age=False,
verbose=True):
split_hdu=False, verbose=True):
"""Write out.
"""
Expand All @@ -1585,10 +1590,6 @@ def write_fastspecfit(meta, specphot, fastfit, modelspectra=None, outfile=None,
os.makedirs(outdir, exist_ok=True)

nobj = len(meta)
if nobj == 1:
log.info(f'Writing 1 object to {outfile}')
else:
log.info(f'Writing {nobj:,d} objects to {outfile}')

if outfile.endswith('.gz'):
tmpfile = outfile[:-3]+'.tmp'
Expand Down Expand Up @@ -1628,37 +1629,73 @@ def write_fastspecfit(meta, specphot, fastfit, modelspectra=None, outfile=None,
meta.meta['EXTNAME'] = 'METADATA'
specphot.meta['EXTNAME'] = 'SPECPHOT'

hdus = fits.HDUList()
hdus.append(fits.PrimaryHDU(None, primhdr))
hdus.append(fits.convenience.table_to_hdu(meta))
hdus.append(fits.convenience.table_to_hdu(specphot))
hdu_primary = fits.PrimaryHDU(None, primhdr)
hdu_meta = fits.convenience.table_to_hdu(meta)
hdu_specphot = fits.convenience.table_to_hdu(specphot)
if fastfit is not None:
fastfit.meta['EXTNAME'] = 'FASTSPEC'
hdus.append(fits.convenience.table_to_hdu(fastfit))
hdu_fastfit = fits.convenience.table_to_hdu(fastfit)

if modelspectra is not None:
hdu = fits.ImageHDU(name='MODELS')
hdu_data = fits.ImageHDU(name='MODELS')
# [nobj, 3, nwave]
hdu.data = np.swapaxes(np.array([modelspectra['CONTINUUM'].data,
modelspectra['SMOOTHCONTINUUM'].data,
modelspectra['EMLINEMODEL'].data]), 0, 1)
hdu_data.data = np.swapaxes(np.array([modelspectra['CONTINUUM'].data,
modelspectra['SMOOTHCONTINUUM'].data,
modelspectra['EMLINEMODEL'].data]), 0, 1)
for key in modelspectra.meta:
hdu.header[key] = (modelspectra.meta[key][0], modelspectra.meta[key][1]) # all the spectra are identical, right??
hdu_data.header[key] = (modelspectra.meta[key][0], modelspectra.meta[key][1]) # all the spectra are identical, right??

hdus.append(hdu)

hdus.writeto(tmpfile, overwrite=True, checksum=True)
def write(hdus, tmpfile, outfile):
if nobj == 1:
log.info(f'Writing 1 object to {outfile}')
else:
log.info(f'Writing {nobj:,d} objects to {outfile}')
hdus.writeto(tmpfile, overwrite=True, checksum=True)
# compress if needed (via another tempfile), otherwise just rename
if outfile.endswith('.gz'):
tmpfilegz = outfile[:-3]+'.tmp.gz'
with open(tmpfile, 'rb') as f_in:
with gzip.open(tmpfilegz, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.rename(tmpfilegz, outfile)
os.remove(tmpfile)
else:
os.rename(tmpfile, outfile)

# compress if needed (via another tempfile), otherwise just rename
if outfile.endswith('.gz'):
tmpfilegz = outfile[:-3]+'.tmp.gz'
with open(tmpfile, 'rb') as f_in:
with gzip.open(tmpfilegz, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.rename(tmpfilegz, outfile)
os.remove(tmpfile)

# For large merged catalogs (e.g., main/dark), split the HDUs into
# individual files.
if split_hdu:
hdu_list = [hdu_meta, hdu_specphot]
suffix_list = ['metadata', 'specphot']
if fastfit is not None:
hdu_list.append(hdu_fastfit)
suffix_list.append('fastspec')
if modelspectra is not None:
hdu_list.append(hdu_data)
suffix_list.append('models')

for hdu, suffix in zip(hdu_list, suffix_list):
if outfile.endswith('.gz'):
outfile_split = outfile.replace('.fits.gz', f'-{suffix}.fits.gz')
else:
outfile_split = outfile.replace('.fits', f'-{suffix}.fits')
hdus = fits.HDUList()
hdus.append(hdu_primary)
hdus.append(hdu)
write(hdus, tmpfile, outfile_split)
else:
os.rename(tmpfile, outfile)
hdus = fits.HDUList()
hdus.append(hdu_primary)
hdus.append(hdu_meta)
hdus.append(hdu_specphot)
if fastfit is not None:
hdus.append(hdu_fastfit)
if modelspectra is not None:
hdus.append(hdu_data)
write(hdus, tmpfile, outfile)


if verbose:
log.debug(f'Writing out took {time.time()-t0:.2f} seconds.')
Expand Down
27 changes: 17 additions & 10 deletions py/fastspecfit/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ def read_to_merge_one(filename, fastphot):
return meta, specphot, fastfit


def _domerge(outfiles, survey=None, program=None, outprefix=None, specprod=None,
coadd_type=None, mergefile=None, fastphot=False, mp=1):
def _domerge(outfiles, outprefix=None, specprod=None, coadd_type=None,
mergefile=None, fastphot=False, split_hdu=False, mp=1):
"""Support routine to merge a set of input files.
"""
Expand Down Expand Up @@ -447,13 +447,13 @@ def _domerge(outfiles, survey=None, program=None, outprefix=None, specprod=None,
emlinesfile=deps2['EMLINES_FILE'], inputz=deps['INPUTZ'],
ignore_photometry=deps['NOPHOTO'], broadlinefit=deps['BRDLFIT'],
constrain_age=deps['CONSAGE'], use_quasarnet=deps['USEQNET'],
no_smooth_continuum=deps['NOSCORR'])
no_smooth_continuum=deps['NOSCORR'], split_hdu=split_hdu)


def merge_fastspecfit(specprod=None, coadd_type=None, survey=None, program=None,
healpix=None, tile=None, night=None, sample=None, outsuffix=None,
fastphot=False, specprod_dir=None, outdir_data='.',
fastfiles_to_merge=None, merge_suffix=None,
split_hdu=False, fastfiles_to_merge=None, merge_suffix=None,
mergedir=None, supermerge=False, overwrite=False, mp=1):
"""Merge all the individual catalogs into a single large catalog. Runs only on
rank 0.
Expand Down Expand Up @@ -492,7 +492,8 @@ def merge_fastspecfit(specprod=None, coadd_type=None, survey=None, program=None,
log.info(f'Merging {len(outfiles):,d} catalogs')
mergefile = os.path.join(mergedir, f'{outprefix}-{outsuffix}.fits')
_domerge(outfiles, mergefile=mergefile, outprefix=outprefix,
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot, mp=mp)
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot,
split_hdu=split_hdu, mp=mp)
else:
log.info(f'No catalogs found: {_outfiles}')
return
Expand All @@ -510,7 +511,8 @@ def merge_fastspecfit(specprod=None, coadd_type=None, survey=None, program=None,
outdir_data=outdir_data, overwrite=overwrite)
if len(outfiles) > 0:
_domerge(outfiles, mergefile=mergefile, outprefix=outprefix,
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot, mp=mp)
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot,
split_hdu=split_hdu, mp=mp)

elif coadd_type == 'healpix' and sample is None:
if survey is None or program is None:
Expand All @@ -528,9 +530,13 @@ def merge_fastspecfit(specprod=None, coadd_type=None, survey=None, program=None,
merge=True, fastphot=fastphot, specprod_dir=specprod_dir,
outdir_data=outdir_data, overwrite=overwrite)
if len(outfiles) > 0:
_domerge(outfiles, survey=survey[0], program=program[0],
mergefile=mergefile, outprefix=outprefix, specprod=specprod,
coadd_type=coadd_type, fastphot=fastphot, mp=mp)
# split main/dark and main/bright merge files by HDU
if split_hdu and survey == 'main' and ((program == 'dark') or (program == 'bright')):
do_split_hdu = True
else:
do_split_hdu = False
_domerge(outfiles, mergefile=mergefile, outprefix=outprefix, specprod=specprod,
coadd_type=coadd_type, fastphot=fastphot, split_hdu=do_split_hdu, mp=mp)
else:
mergefile = os.path.join(mergedir, f'{outprefix}-{specprod}-{coadd_type}.fits')
if os.path.isfile(mergefile) and not overwrite:
Expand All @@ -541,4 +547,5 @@ def merge_fastspecfit(specprod=None, coadd_type=None, survey=None, program=None,
outdir_data=outdir_data, overwrite=overwrite)
if len(outfiles) > 0:
_domerge(outfiles, mergefile=mergefile, outprefix=outprefix,
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot, mp=mp)
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot,
split_hdu=split_hdu, mp=mp)

0 comments on commit ef40ff2

Please sign in to comment.