Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 zcatalog #2396

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
248 changes: 169 additions & 79 deletions py/desispec/scripts/zcatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
from desispec.zcatalog import find_primary_spectra
from desispec.io.util import get_tempfilename, checkgzip, replace_prefix, write_bintable
from desispec.io.table import read_table
from desispec.coaddition import coadd_fibermap
from desispec.util import parse_keyval
from desiutil.annotate import load_csv_units
import desiutil.depend
from desispec import validredshifts

def load_sv1_ivar_w12(hpix, targetids):
"""
Expand Down Expand Up @@ -89,7 +89,7 @@ def _wrap_read_redrock(optdict):
"""read_redrock wrapper to expand dictionary of named args for multiprocessing"""
return read_redrock(**optdict)

def read_redrock(rrfile, group=None, recoadd_fibermap=False, minimal=False, pertile=False, counter=None):
def read_redrock(rrfile, group=None, pertile=False, counter=None):
"""
Read a Redrock file, combining REDSHIFTS, FIBERMAP, and TSNR2 HDUs

Expand All @@ -98,8 +98,6 @@ def read_redrock(rrfile, group=None, recoadd_fibermap=False, minimal=False, pert

Options:
group (str): add group-specific columns for cumulative, pernight, healpix
readcoadd_fibermap (bool): recoadd fibermap from spectra file in same dir
minimal (bool): only propagate minimal subet of columns
pertile (bool): input Redrock file is single tile (not healpix)
counter (tuple): (i,n) log loading ith file out of n

Expand All @@ -121,56 +119,47 @@ def read_redrock(rrfile, group=None, recoadd_fibermap=False, minimal=False, pert
rrfile, hdr['SPGRP'], group))
return None

redshifts = fx['REDSHIFTS'].read()
redshifts = Table(fx['REDSHIFTS'].read())

if recoadd_fibermap:
spectra_filename = checkgzip(replace_prefix(rrfile, 'redrock', 'spectra'))
log.info('Recoadding fibermap from %s', os.path.basename(spectra_filename))
fibermap_orig = read_table(spectra_filename)
fibermap, expfibermap = coadd_fibermap(fibermap_orig, onetile=pertile)
else:
fibermap = Table(fx['FIBERMAP'].read())
expfibermap = fx['EXP_FIBERMAP'].read()
fibermap = Table(fx['FIBERMAP'].read())
expfibermap = Table(fx['EXP_FIBERMAP'].read())

tsnr2 = fx['TSNR2'].read()
assert np.all(redshifts['TARGETID'] == fibermap['TARGETID'])
assert np.all(redshifts['TARGETID'] == tsnr2['TARGETID'])
tsnr2 = Table(fx['TSNR2'].read())

if minimal:
# basic set of target information
fmcols = ['TARGET_RA', 'TARGET_DEC', 'FLUX_G', 'FLUX_R', 'FLUX_Z']
emline_file = rrfile.replace('/redrock-', '/emline-')
qso_mgii_file = rrfile.replace('/redrock-', '/qso_mgii-')
qso_qn_file = rrfile.replace('/redrock-', '/qso_qn-')

# add targeting columns
for colname in fibermap.dtype.names:
if colname.endswith('_TARGET') and colname != 'FA_TARGET':
fmcols.append(colname)
with fitsio.FITS(emline_file) as fx:
emline = Table(fx['EMLINEFIT'].read())
with fitsio.FITS(qso_mgii_file) as fx:
qso_mgii = Table(fx['MGII'].read())
with fitsio.FITS(qso_qn_file) as fx:
qso_qn = Table(fx['QN_RR'].read())

# add columns needed for uniqueness that differ for healpix vs. tiles
extracols = ['TILEID', 'LASTNIGHT', 'HEALPIX', 'SURVEY', 'PROGRAM']
for colname in extracols:
if colname in fibermap.dtype.names:
fmcols.append(colname)
assert np.all(redshifts['TARGETID'] == fibermap['TARGETID'])
assert np.all(redshifts['TARGETID'] == tsnr2['TARGETID'])
assert np.all(redshifts['TARGETID'] == emline['TARGETID'])
assert np.all(redshifts['TARGETID'] == qso_mgii['TARGETID'])
assert np.all(redshifts['TARGETID'] == qso_qn['TARGETID'])

# NIGHT header -> fibermap LASTNIGHT
if ('LASTNIGHT' not in fmcols) and ('NIGHT' in hdr):
fibermap['LASTNIGHT'] = np.int32(hdr['NIGHT'])
fmcols.append('LASTNIGHT')
fmcols = list(fibermap.dtype.names)
fmcols.remove('TARGETID')

data = hstack( [Table(redshifts), Table(fibermap[fmcols])] )
emline_cols = ['OII_FLUX', 'OII_FLUX_IVAR']
qso_mgii_cols = ['IS_QSO_MGII']
qso_qn_cols = ['Z_NEW', 'ZERR_NEW', 'IS_QSO_QN_NEW_RR', 'C_LYA', 'C_CIV', 'C_CIII', 'C_MgII', 'C_Hbeta', 'C_Halpha']

else:
fmcols = list(fibermap.dtype.names)
fmcols.remove('TARGETID')
if tsnr2 is not None:
tsnr2cols = list(tsnr2.dtype.names)
tsnr2cols.remove('TARGETID')
data = hstack([
Table(redshifts),
Table(fibermap[fmcols]),
Table(tsnr2[tsnr2cols]),
])
else:
data = hstack( [Table(redshifts), Table(fibermap[fmcols])] )
tsnr2cols = list(tsnr2.dtype.names)
tsnr2cols.remove('TARGETID')
data = hstack([
redshifts,
fibermap[fmcols],
tsnr2[tsnr2cols],
emline[emline_cols],
qso_mgii[qso_mgii_cols],
qso_qn[qso_qn_cols]
], join_type='exact')

#- Add group specific columns, recognizing some some of them may
#- have already been inherited from the fibermap.
Expand Down Expand Up @@ -245,12 +234,10 @@ def parse(options=None):
help="input directory")
parser.add_argument("-o", "--outfile",type=str,
help="output file")
parser.add_argument("--minimal", action='store_true',
help="only include minimal output columns")
parser.add_argument("-t", "--tiles", type=str,
help="ascii file with tileids to include (one per line)")

parser.add_argument("--survey", type=str,
parser.add_argument("--survey", type=str, required=True,
help="DESI survey, e.g. sv1, sv3, main")
parser.add_argument("--program", type=str,
help="DESI program, e.g bright, dark")
Expand All @@ -262,8 +249,6 @@ def parse(options=None):
help="KEYWORD=VALUE entries to add to the output header")
parser.add_argument('--patch-missing-ivar-w12', action='store_true',
help="Use target files to patch missing FLUX_IVAR_W1/W2 values")
parser.add_argument('--recoadd-fibermap', action='store_true',
help="Re-coadd FIBERMAP from spectra files")
parser.add_argument('--add-units', action='store_true',
help="Add units to output catalog from desidatamodel "
"column descriptions")
Expand Down Expand Up @@ -293,16 +278,21 @@ def main(args=None):
log.critical('Unable to import desidatamodel, required to add units (try "module load desidatamodel" first)')
sys.exit(1)

survey = args.survey

if args.indir:
indir = args.indir
redrockfiles = sorted(io.iterfiles(f'{indir}', prefix='redrock', suffix='.fits'))
pertile = (args.group != 'healpix') # assume tile-based input unless explicitely healpix
elif args.group == 'healpix':
pertile = False
survey = args.survey if args.survey is not None else "*"
program = args.program if args.program is not None else "*"
indir = os.path.join(io.specprod_root(), 'healpix')

#- special case for NERSC; use read-only mount regardless of $DESI_SPECTRO_REDUX
if indir.startswith('/global/cfs/cdirs'):
indir = indir.replace('/global/cfs/cdirs', '/dvs_ro/cfs/cdirs')

#- specprod/healpix/SURVEY/PROGRAM/HPIXGROUP/HPIX/redrock*.fits
globstr = os.path.join(indir, survey, program, '*', '*', 'redrock*.fits')
log.info(f'Looking for healpix redrock files in {globstr}')
Expand All @@ -318,12 +308,11 @@ def main(args=None):

log.info(f'Loading tiles from {tilefile}')
tiles = Table.read(tilefile)
if args.survey is not None:
keep = tiles['SURVEY'] == args.survey
tiles = tiles[keep]
if len(tiles) == 0:
log.critical(f'No tiles kept after filtering by SURVEY={args.survey}')
sys.exit(1)
keep = tiles['SURVEY'] == survey
tiles = tiles[keep]
if len(tiles) == 0:
log.critical(f'No tiles kept after filtering by SURVEY={survey}')
sys.exit(1)

if args.program is not None:
keep = tiles['PROGRAM'] == args.program
Expand Down Expand Up @@ -367,16 +356,16 @@ def main(args=None):
read_args = list()
for ifile, rrfile in enumerate(redrockfiles):
read_args.append(dict(rrfile=rrfile, group=args.group, pertile=pertile,
recoadd_fibermap=args.recoadd_fibermap, minimal=args.minimal,
counter=(ifile+1, nfiles)))

#- Read individual Redrock files
if args.nproc>1:
from multiprocessing import Pool
with Pool(args.nproc) as pool:
results = pool.map(_wrap_read_redrock, read_args)
results = pool.map(_wrap_read_redrock, read_args, chunksize=1)
else:
results = [_wrap_read_redrock(a) for a in read_args]
log.info("Successfully read {} redrock files".format(nfiles))

#- Stack catalogs
zcatdata = list()
Expand All @@ -392,17 +381,14 @@ def main(args=None):
exp_fibermaps.append(expfibermap)

log.info('Stacking zcat')
zcat = vstack(zcatdata)
zcat = vstack(zcatdata, join_type='exact')
desiutil.depend.mergedep(dependencies, zcat.meta)
if exp_fibermaps:
log.info('Stacking exposure fibermaps')
expfm = np.hstack(exp_fibermaps)
else:
expfm = None
log.info('Stacking exposure fibermaps')
expfm = np.hstack(exp_fibermaps)

#- Add FIRSTNIGHT for tile-based cumulative catalogs
#- (LASTNIGHT was added while reading from NIGHT header keyword)
if args.group == 'cumulative' and expfm is not None and 'FIRSTNIGHT' not in zcat.colnames:
if args.group == 'cumulative' and 'FIRSTNIGHT' not in zcat.colnames:
log.info('Adding FIRSTNIGHT per tile')
icol = zcat.colnames.index('LASTNIGHT')
zcat.add_column(np.zeros(len(zcat), dtype=np.int32),
Expand Down Expand Up @@ -460,8 +446,78 @@ def main(args=None):
except KeyError:
log.warning(f'TARGETID {tid} (row {i}) not found in sv1 targets')

# Add redshift quality flags
zqual = validredshifts.actually_validate(zcat)
good_spec = validredshifts.get_good_fiberstatus(zcat)
good_spec &= zcat['OBJTYPE']=='TGT'
zqual['GOOD_SPEC'] = good_spec.copy() # GOOD_SPEC: true if it is a science spectrum with good hardware status
zqual['Z_CONF'] = np.uint8(0) # Z_CONF: 0 if no confidence
for col in ['GOOD_Z_BGS', 'GOOD_Z_LRG', 'GOOD_Z_ELG', 'GOOD_Z_QSO']:
zqual[col] = zqual[col] & zqual['GOOD_SPEC'] # require good hardware quality for GOOD_Z_TRACER

# evaluate Z_CONF
if survey in ['main', 'sv1', 'sv2', 'sv3']:
if survey=='main':
desi_target_col = 'DESI_TARGET'
else:
desi_target_col = survey.upper()+'_DESI_TARGET'

# The BGS_ANY, LRG, ELG and QSO target bits are the same in SV1 to main
is_bgs = zcat[desi_target_col] & 2**60 > 0
is_lrg = zcat[desi_target_col] & 2**0 > 0
is_elg = zcat[desi_target_col] & 2**1 > 0
is_qso = zcat[desi_target_col] & 2**2 > 0

# GOOD_Z_TRACER: False if it is not a Tracer target or if it is a TRACER target but fails TRACER redshift quality cut; True if it is a TRACER target and passes TRACER redshift quality cut
# They apply to the Z column
zqual['GOOD_Z_BGS'] &= is_bgs
zqual['GOOD_Z_LRG'] &= is_lrg
zqual['GOOD_Z_ELG'] &= is_elg

# GOOD_Z_QSO: True if Z_QSO is a confident redshift; it only applies to the Z_QSO column
# definition: False if it is not a QSO target or if it is a QSO target but fails QSO redshift quality cut; True if it is a QSO target and passes QSO redshift quality cut
zqual['GOOD_Z_QSO'] &= is_qso

mask = zqual['GOOD_Z_BGS'] | zqual['GOOD_Z_LRG'] | zqual['GOOD_Z_ELG']
zqual['Z_CONF'][mask] = 2 # Z_CONF=2: highly confident; definition: if Z is a confident redshift

mask = (zqual['Z_CONF']!=2) & zqual['GOOD_SPEC'] & (zcat['ZWARN']==0)
zqual['Z_CONF'][mask] = 1 # Z_CONF=1: less confident and Z is likely to be a catastrophically wrong redshift; definition: if the criteria for 2 is not met but has GOOD_SPEC==True and ZWARN==0

else:
for col in ['GOOD_Z_BGS', 'GOOD_Z_LRG', 'GOOD_Z_ELG', 'GOOD_Z_QSO']:
zqual[col] = False
mask = (zqual['Z_CONF']!=2) & zqual['GOOD_SPEC'] & (zcat['ZWARN']==0)
zqual['Z_CONF'][mask] = 1

zcat = hstack([zcat, zqual], join_type='exact')

columns_basic = ['TARGETID', 'TILEID', 'HEALPIX', 'LASTNIGHT', 'Z', 'ZERR', 'ZWARN', 'CHI2', 'SPECTYPE', 'SUBTYPE', 'DELTACHI2', 'PETAL_LOC', 'FIBER', 'COADD_FIBERSTATUS', 'TARGET_RA', 'TARGET_DEC', 'DESINAME', 'OBJTYPE', 'FIBERASSIGN_X', 'FIBERASSIGN_Y', 'PRIORITY', 'DESI_TARGET', 'BGS_TARGET', 'MWS_TARGET', 'SCND_TARGET', 'CMX_TARGET', 'SV1_DESI_TARGET', 'SV1_BGS_TARGET', 'SV1_MWS_TARGET', 'SV1_SCND_TARGET', 'SV2_DESI_TARGET', 'SV2_BGS_TARGET', 'SV2_MWS_TARGET', 'SV2_SCND_TARGET', 'SV3_DESI_TARGET', 'SV3_BGS_TARGET', 'SV3_MWS_TARGET', 'SV3_SCND_TARGET' 'COADD_NUMEXP', 'COADD_EXPTIME', 'COADD_NUMNIGHT', 'COADD_NUMTILE', 'MIN_MJD', 'MAX_MJD', 'MEAN_MJD', 'TSNR2_BGS', 'TSNR2_ELG', 'TSNR2_LRG', 'TSNR2_LYA', 'TSNR2_QSO', 'GOOD_SPEC', 'Z_CONF', 'GOOD_Z_QSO', 'Z_QSO', 'ZERR_QSO']
columns_imaging = ['PMRA', 'PMDEC', 'REF_EPOCH', 'RELEASE', 'BRICKNAME', 'BRICKID', 'BRICK_OBJID', 'MORPHTYPE', 'EBV', 'FLUX_G', 'FLUX_R', 'FLUX_Z', 'FLUX_W1', 'FLUX_W2', 'FLUX_IVAR_G', 'FLUX_IVAR_R', 'FLUX_IVAR_Z', 'FLUX_IVAR_W1', 'FLUX_IVAR_W2', 'FIBERFLUX_G', 'FIBERFLUX_R', 'FIBERFLUX_Z', 'FIBERTOTFLUX_G', 'FIBERTOTFLUX_R', 'FIBERTOTFLUX_Z', 'MASKBITS', 'SERSIC', 'SHAPE_R', 'SHAPE_E1', 'SHAPE_E2', 'REF_ID', 'REF_CAT', 'GAIA_PHOT_G_MEAN_MAG', 'GAIA_PHOT_BP_MEAN_MAG', 'GAIA_PHOT_RP_MEAN_MAG', 'PARALLAX', 'PHOTSYS']
assert len(np.intersect1d(columns_basic, columns_imaging))==0

# Remove main-survey target bits for non-main surveys (they are not the actual main-survey target bits)
if survey!='main':
for col in ['DESI_TARGET', 'BGS_TARGET', 'MWS_TARGET', 'SCND_TARGET']:
if col in zcat.colnames:
zcat.remove_column(col)

# Remove the columns that do not exist
columns_basic = [col for col in columns_basic if col in zcat.colnames]
columns_imaging = ['TARGETID', 'TILEID'] + columns_imaging
columns_imaging = [col for col in columns_imaging if col in zcat.colnames] # remove columns that do not exist
columns_extra = ['TARGETID', 'TILEID'] + [col for col in zcat.colnames if (col not in columns_basic and col not in columns_imaging)]
columns_extra = [col for col in columns_extra if col in zcat.colnames] # remove columns that do not exist

zcat_basic = zcat[columns_basic].copy()
zcat_imaging = zcat[columns_imaging].copy()
zcat_extra = zcat[columns_extra].copy()

#- we're done adding columns, convert to numpy array for fitsio
zcat = np.array(zcat)
zcat_basic = np.array(zcat_basic)
zcat_imaging = np.array(zcat_imaging)
zcat_extra = np.array(zcat_extra)

#- Inherit header from first input, but remove keywords that don't apply
#- across multiple files
Expand All @@ -487,8 +543,8 @@ def main(args=None):
key, value = parse_keyval(keyval)
header[key] = value

if args.survey is not None:
header['SURVEY'] = args.survey
if survey is not None:
header['SURVEY'] = survey

if args.program is not None:
header['PROGRAM'] = args.program
Expand All @@ -503,16 +559,50 @@ def main(args=None):
units = dict()
comments = dict()

log.info(f'Writing {args.outfile}')
tmpfile = get_tempfilename(args.outfile)

write_bintable(tmpfile, zcat, header=header, extname='ZCATALOG',
if not os.path.isdir(os.path.dirname(args.outfile)):
os.makedirs(os.path.dirname(args.outfile))

# outfile_all = os.path.join(os.path.dirname(args.outfile), 'merged', os.path.basename(args.outfile)+'.fits')
# if not os.path.isdir(os.path.dirname(outfile_all)):
# os.makedirs(os.path.dirname(outfile_all))
# log.info(f'Writing {outfile_all}')
# tmpfile = get_tempfilename(outfile_all)
# write_bintable(tmpfile, zcat, header=header, extname='MERGEDZCAT',
# units=units, clobber=True)
# write_bintable(tmpfile, expfm, extname='EXP_FIBERMAP', units=units)
# os.rename(tmpfile, outfile_all)
# log.info("Successfully wrote {}".format(outfile_all))

outfile_basic = args.outfile+'.fits'
log.info(f'Writing {outfile_basic}')
tmpfile = get_tempfilename(outfile_basic)
write_bintable(tmpfile, zcat_basic, header=header, extname='ZCATALOG',
units=units, clobber=True)
os.rename(tmpfile, outfile_basic)
log.info("Successfully wrote {}".format(outfile_basic))

if not args.minimal and expfm is not None:
write_bintable(tmpfile, expfm, extname='EXP_FIBERMAP', units=units)

os.rename(tmpfile, args.outfile)

log.info("Successfully wrote {}".format(args.outfile))
outfile_imaging = args.outfile+'-imaging.fits'
log.info(f'Writing {outfile_imaging}')
tmpfile = get_tempfilename(outfile_imaging)
write_bintable(tmpfile, zcat_imaging, header=header, extname='ZCATALOG_IMAGING',
units=units, clobber=True)
os.rename(tmpfile, outfile_imaging)
log.info("Successfully wrote {}".format(outfile_imaging))

outfile_extra = args.outfile+'-extra.fits'
log.info(f'Writing {outfile_extra}')
tmpfile = get_tempfilename(outfile_extra)
write_bintable(tmpfile, zcat_extra, header=header, extname='ZCATALOG_EXTRA',
units=units, clobber=True)
os.rename(tmpfile, outfile_extra)
log.info("Successfully wrote {}".format(outfile_extra))

outfile_expfm = os.path.join(os.path.dirname(args.outfile), 'exp_fibermap', os.path.basename(args.outfile)+'-expfibermap.fits')
if not os.path.isdir(os.path.dirname(outfile_expfm)):
os.makedirs(os.path.dirname(outfile_expfm))
log.info(f'Writing {outfile_expfm}')
tmpfile = get_tempfilename(outfile_expfm)
write_bintable(tmpfile, expfm, header=header, extname='EXP_FIBERMAP',
units=units, clobber=True)
os.rename(tmpfile, outfile_expfm)
log.info("Successfully wrote {}".format(outfile_expfm))
Loading
Loading