Skip to content

Commit

Permalink
adding unit support in untils.combine_fits()
Browse files Browse the repository at this point in the history
  • Loading branch information
abhi0395 committed Aug 15, 2024
1 parent 83aeac1 commit 08a53b2
Showing 1 changed file with 24 additions and 43 deletions.
67 changes: 24 additions & 43 deletions qsoabsfind/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,14 @@ def save_plot(x, y, plot_filename='qsoabsfind_plot.png', xlabel='X-axis', ylabel

def combine_fits_files(directory, output_filename='combined.fits'):
"""
Combine data from specified HDUs in all FITS files in a directory (either same as input or user-provided) into a single FITS file.
Combine data from specified HDUs in all FITS files in a directory into a single FITS file.
Args:
directory: str, path to the directory containing FITS files (absorber file for each spectra file).
directory: str, path to the directory containing FITS files.
output_filename: str, name of the output combined FITS file (default: 'combined.fits').
Note:
This function assumes that all the corresponding absorber FITS files are in the input directory,
and follow the same structure as the output of qsoabsfind. Also, there should not be any other
FITS files there. Otherwise, the script will fail.
This function assumes that all the FITS files in the input directory have the same structure.
"""
# Ensure the directory exists
if not os.path.isdir(directory):
Expand All @@ -240,10 +238,7 @@ def combine_fits_files(directory, output_filename='combined.fits'):
# Initialize lists to hold data and headers for the HDUs
primary_hdu = None
absorber_data = []
absorber_headers = []
metadata_data = []
metadata_headers = []
units_dict = {}

# Iterate over all FITS files in the directory
for filename in os.listdir(directory):
Expand All @@ -261,52 +256,38 @@ def combine_fits_files(directory, output_filename='combined.fits'):
# Process the ABSORBER HDU
if 'ABSORBER' in hdul:
absorber_hdu = hdul['ABSORBER']
absorber_table = Table(absorber_hdu.data)

# Remove "INDEX_SPEC" column from absorber data
if "INDEX_SPEC" in absorber_table.colnames:
absorber_table.remove_column("INDEX_SPEC")

absorber_data.append(absorber_table.as_array())
absorber_headers.append(absorber_hdu.header)

# Extract units from the ABSORBER HDU header
for colname in absorber_table.colnames:
unit_key = f'TUNIT{absorber_table.colnames.index(colname) + 1}'
if unit_key in absorber_hdu.header:
units_dict[colname] = absorber_hdu.header[unit_key]
print(f"Added data from 'ABSORBER' HDU of {filename} (without 'INDEX_SPEC')")
absorber_data.append(absorber_hdu.data)

# Process the METADATA HDU
if 'METADATA' in hdul:
metadata_hdu = hdul['METADATA']
metadata_data.append(metadata_hdu.data)
metadata_headers.append(metadata_hdu.header)
metadata_table = Table(metadata_hdu.data)

# Remove "INDEX_SPEC" column from METADATA if it exists
if "INDEX_SPEC" in metadata_table.colnames:
metadata_table.remove_column("INDEX_SPEC")

# Extract units from the METADATA HDU header
for colname in metadata_hdu.columns.names:
unit_key = f'TUNIT{metadata_hdu.columns.names.index(colname) + 1}'
if unit_key in metadata_hdu.header:
units_dict[colname] = metadata_hdu.header[unit_key]
print(f"Added data from 'METADATA' HDU of {filename}")
metadata_data.append(metadata_table.as_array())

# Concatenate data for each HDU
combined_absorber_data = np.concatenate(absorber_data, axis=0)
combined_metadata_data = np.concatenate(metadata_data, axis=0)

# Create new HDUs with concatenated data
combined_absorber_hdu = fits.BinTableHDU(data=combined_absorber_data, header=absorber_headers[0], name='ABSORBER')
combined_metadata_hdu = fits.BinTableHDU(data=combined_metadata_data, header=metadata_headers[0], name='METADATA')

# Add the units back to the headers
for colname, unit in units_dict.items():
unit_key = f'TUNIT{combined_absorber_hdu.columns.names.index(colname) + 1}' if colname in combined_absorber_hdu.columns.names else None
if unit_key:
combined_absorber_hdu.header[unit_key] = unit

unit_key = f'TUNIT{combined_metadata_hdu.columns.names.index(colname) + 1}' if colname in combined_metadata_hdu.columns.names else None
if unit_key:
combined_metadata_hdu.header[unit_key] = unit
combined_absorber_hdu = fits.BinTableHDU(data=combined_absorber_data, header=absorber_hdu.header, name='ABSORBER')
combined_metadata_hdu = fits.BinTableHDU(data=combined_metadata_data, header=metadata_hdu.header, name='METADATA')

# Remove any references to INDEX_SPEC in the metadata header
ttype_keys = [key for key in combined_metadata_hdu.header.keys() if key.startswith('TTYPE')]
tunit_keys = [key for key in combined_metadata_hdu.header.keys() if key.startswith('TUNIT')]
tform_keys = [key for key in combined_metadata_hdu.header.keys() if key.startswith('TFORM')]

for i, key in enumerate(ttype_keys):
if combined_metadata_hdu.header[key] == "INDEX_SPEC":
del combined_metadata_hdu.header[key]
del combined_metadata_hdu.header[tunit_keys[i]]
del combined_metadata_hdu.header[tform_keys[i]]
break # Assumes INDEX_SPEC is only present once

# Create HDU list for the combined file
combined_hdul = fits.HDUList([primary_hdu, combined_absorber_hdu, combined_metadata_hdu])
Expand Down

0 comments on commit 08a53b2

Please sign in to comment.