Skip to content

Commit

Permalink
utils.combine_fits_files() updated
Browse files Browse the repository at this point in the history
  • Loading branch information
abhi0395 committed Aug 15, 2024
1 parent 08a53b2 commit d9cba60
Showing 1 changed file with 80 additions and 70 deletions.
150 changes: 80 additions & 70 deletions qsoabsfind/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,86 +220,96 @@ def save_plot(x, y, plot_filename='qsoabsfind_plot.png', xlabel='X-axis', ylabel

print(f"Plot saved as {plot_path}")

def combine_fits_files(directory, output_filename='combined.fits'):

def modify_units(col_name, col):
"""
Combine data from specified HDUs in all FITS files in a directory into a single FITS file.
Modify the unit of a column based on the column name.
Args:
directory: str, path to the directory containing FITS files.
output_filename: str, name of the output combined FITS file (default: 'combined.fits').
col_name (str): The name of the column.
col (Column): The column object.
Note:
This function assumes that all the FITS files in the input directory have the same structure.
Returns:
str: The modified unit if conditions are met, otherwise the original unit.
"""
# Ensure the directory exists
if not os.path.isdir(directory):
raise FileNotFoundError(f"The directory {directory} does not exist.")
if 'EW' in col_name.upper():
return 'Angstrom'
else:
return str(col.unit) if col.unit is not None else None

# Initialize lists to hold data and headers for the HDUs
primary_hdu = None
absorber_data = []
metadata_data = []
def combine_fits_files(directory, output_file):
"""
Combines data from several FITS files in a directory into a single FITS file.
Note:
This function assumes that all FITS files have the same HDU structure and that all HDUs are Tables. Any column with 'EW' in its name will have its unit set to Angstrom. The `INDEX_SPEC` column is not concatenated. The primary HDU from the first FITS file is copied to the final combined file. So make sure that directory contains only qsoabsfind output files.
# Iterate over all FITS files in the directory
for filename in os.listdir(directory):
if filename.endswith(".fits"):
file_path = os.path.join(directory, filename)
Args:
directory (str): Path to the directory containing the FITS files.
output_file (str): Path to the output FITS file.
Returns:
None: The combined FITS file is saved to the specified output path.
"""
from astropy.table import vstack
# Initialize a dictionary to store tables for each HDU
combined_tables = {}
primary_hdu = None
formats = {}
# Loop through each file in the directory
for i, file_name in enumerate(os.listdir(directory)):
if file_name.endswith('.fits'):
file_path = os.path.join(directory, file_name)
print(f"Processing file: {file_path}")

# Open the FITS file
with fits.open(file_path, memmap=True) as hdul:
# Assume HDU0 is the same for all files, use the first one encountered
with fits.open(file_path) as hdul:
if primary_hdu is None:
# Copy the primary HDU from the first file
primary_hdu = fits.PrimaryHDU(header=hdul[0].header)
print(f"Set primary HDU from {filename}")

# Process the ABSORBER HDU
if 'ABSORBER' in hdul:
absorber_hdu = hdul['ABSORBER']
absorber_data.append(absorber_hdu.data)

# Process the METADATA HDU
if 'METADATA' in hdul:
metadata_hdu = hdul['METADATA']
metadata_table = Table(metadata_hdu.data)

# Remove "INDEX_SPEC" column from METADATA if it exists
if "INDEX_SPEC" in metadata_table.colnames:
metadata_table.remove_column("INDEX_SPEC")

metadata_data.append(metadata_table.as_array())

# Concatenate data for each HDU
combined_absorber_data = np.concatenate(absorber_data, axis=0)
combined_metadata_data = np.concatenate(metadata_data, axis=0)

# Create new HDUs with concatenated data
combined_absorber_hdu = fits.BinTableHDU(data=combined_absorber_data, header=absorber_hdu.header, name='ABSORBER')
combined_metadata_hdu = fits.BinTableHDU(data=combined_metadata_data, header=metadata_hdu.header, name='METADATA')

# Remove any references to INDEX_SPEC in the metadata header
ttype_keys = [key for key in combined_metadata_hdu.header.keys() if key.startswith('TTYPE')]
tunit_keys = [key for key in combined_metadata_hdu.header.keys() if key.startswith('TUNIT')]
tform_keys = [key for key in combined_metadata_hdu.header.keys() if key.startswith('TFORM')]

for i, key in enumerate(ttype_keys):
if combined_metadata_hdu.header[key] == "INDEX_SPEC":
del combined_metadata_hdu.header[key]
del combined_metadata_hdu.header[tunit_keys[i]]
del combined_metadata_hdu.header[tform_keys[i]]
break # Assumes INDEX_SPEC is only present once

# Create HDU list for the combined file
combined_hdul = fits.HDUList([primary_hdu, combined_absorber_hdu, combined_metadata_hdu])

# Define the full output file path
output_file_path = output_filename
if not os.path.isabs(output_file_path):
output_file_path = os.path.join(directory, output_filename)

# Save the combined HDU list to a new FITS file
combined_hdul.writeto(output_file_path, overwrite=True)
print(f"Combined FITS file saved as {output_file_path}")
print("Primary HDU copied from the first file.")
for hdu in hdul[1:]:
hdu_name = hdu.name
formats[hdu_name] = {}
for col in hdu.columns:
formats[hdu.name][col.name] = col.format
for hdu in hdul:
if isinstance(hdu, fits.BinTableHDU):
hdu_name = hdu.name
table = Table(hdu.data)

# Remove the INDEX_SPEC column if it exists
if 'INDEX_SPEC' in table.colnames:
table.remove_column('INDEX_SPEC')
print(f"Removed 'INDEX_SPEC' column from HDU '{hdu_name}'.")

if hdu_name in combined_tables:
combined_tables[hdu_name] = vstack([combined_tables[hdu_name], table], metadata_conflicts='silent')
print(f"Concatenated data to HDU '{hdu_name}'.")
else:
combined_tables[hdu_name] = table
print(f"Initialized HDU '{hdu_name}' with data from file {i + 1}.")

# Create the HDUs to write to the output file
hdul_out = fits.HDUList([primary_hdu])

for hdu_name, table in combined_tables.items():
# Explicitly handle units by creating columns with modified unit information
columns = []
for col_name in table.colnames:
col = table[col_name]

# Modify the unit based on the column name
unit = modify_units(col_name, col)

# Create the FITS column, let the format be inferred
fits_format = formats[hdu_name][col_name]
columns.append(fits.Column(name=col_name, array=col.data, format=fits_format, unit=unit))
hdu_out = fits.BinTableHDU.from_columns(columns, name=hdu_name)
hdul_out.append(hdu_out)
print(f"Added HDU '{hdu_name}' to the output file.")

# Write the combined data to the output FITS file
hdul_out.writeto(output_file, overwrite=True)
print(f"Combined FITS file saved to {output_file}")


def validate_sizes(conv_arr, unmsk_residual, spec_index):
Expand Down

0 comments on commit d9cba60

Please sign in to comment.