diff --git a/alphadia/constants/default.yaml b/alphadia/constants/default.yaml index e4033d61..8fbcc741 100644 --- a/alphadia/constants/default.yaml +++ b/alphadia/constants/default.yaml @@ -312,7 +312,9 @@ transfer_library: # if true, the library is created for transfer learning enabled: False - # semicolon separated list of fragment types to include in the library. possible values are 'a', 'b', 'c', 'x', 'y', 'z' + # semicolon separated list of fragment types to include in the library. possible values are + # 'a', 'b', 'c', 'x', 'y', 'z' + # 'b_H2O', 'y_H2O', 'b_NH3', 'y_NH3', 'b_modloss', 'y_modloss', 'c_lossH', 'z_addH' fragment_types: 'b;y' # maximum charge for fragments diff --git a/alphadia/outputaccumulator.py b/alphadia/outputaccumulator.py index 9f1e8547..fd0b701a 100644 --- a/alphadia/outputaccumulator.py +++ b/alphadia/outputaccumulator.py @@ -37,168 +37,124 @@ logger = logging.getLogger() -class SpecLibFlatFromOutput(SpecLibFlat): - def __init__(self, **kwargs): - super().__init__(**kwargs) - - def _calculate_fragment_position(self): - """ - Calculate the position of the fragments based on the type and number of the fragment. - """ - # Fragtypes from ascii to char - available_frag_types = self._fragment_df["type"].unique() - self.frag_types_as_char = {i: chr(i) for i in available_frag_types} - - mapped_frag_types = self._fragment_df["type"].map(self.frag_types_as_char) - a_b_c_fragments = mapped_frag_types.isin(["a", "b", "c"]) - x_y_z_fragments = mapped_frag_types.isin(["x", "y", "z"]) - - precursor_idx_to_nAA = ( - self._precursor_df[["precursor_idx", "nAA"]] - .set_index("precursor_idx") - .to_dict()["nAA"] - ) - # For X,Y,Z frags calculate the position as being the nAA of the precursor - number of the fragment - x_y_z_number = ( - self._fragment_df.loc[x_y_z_fragments, "precursor_idx"].map( - precursor_idx_to_nAA - ) - - self._fragment_df.loc[x_y_z_fragments, "number"] - ) - self._fragment_df.loc[x_y_z_fragments, "position"] = x_y_z_number - 1 - - # For A,B,C frags calculate the position as being the number of the fragment - self._fragment_df.loc[a_b_c_fragments, "position"] = ( - self._fragment_df.loc[a_b_c_fragments, "number"] - 1 - ) - - # Change position to int - self._fragment_df["position"] = self._fragment_df["position"].astype(int) - - def parse_output_folder( - self, - folder: str, - mandatory_precursor_columns: list[str] | None = None, - optional_precursor_columns: list[str] | None = None, - ) -> tuple[pd.DataFrame, pd.DataFrame]: - """ - Parse the output folder to get a precursor and fragment dataframe in the flat format. +def process_folder( + folder: str, + mandatory_precursor_columns: list[str] | None = None, + optional_precursor_columns: list[str] | None = None, + charged_frag_types: list[str] | None = None, +) -> SpecLibFlat: + """ + Parse an output folder and return a SpecLibFlat object containing the precursor and fragment data. - Parameters - ---------- - folder : str - The output folder to be parsed. - mandatory_precursor_columns : list, optional - The columns to be selected from the precursor dataframe, by default ['precursor_idx', 'sequence', 'flat_frag_start_idx', 'flat_frag_stop_idx', 'charge', 'rt_library', 'mobility_library', 'mz_library', 'proteins', 'genes', 'mods', 'mod_sites', 'proba'] + Parameters + ---------- + folder : str + The output folder to be parsed. + mandatory_precursor_columns : list[str], optional + The columns to be selected from the precursor dataframe + optional_precursor_columns : list[str], optional + Additional optional columns to include if present - Returns - ------- - pd.DataFrame - The precursor dataframe. - pd.DataFrame - The fragment dataframe. + Returns + ------- + SpecLibFlat + A spectral library object containing the parsed data + """ + speclib = SpecLibFlat() + + if mandatory_precursor_columns is None: + mandatory_precursor_columns = [ + "precursor_idx", + "sequence", + "flat_frag_start_idx", + "flat_frag_stop_idx", + "charge", + "rt_library", + "rt_observed", + "mobility_library", + "mobility_observed", + "mz_library", + "mz_observed", + "proteins", + "genes", + "mods", + "mod_sites", + "proba", + "decoy", + ] + if optional_precursor_columns is None: + optional_precursor_columns = [ + "rt_calibrated", + "mz_calibrated", + ] - """ - if mandatory_precursor_columns is None: - mandatory_precursor_columns = [ - "precursor_idx", - "sequence", - "flat_frag_start_idx", - "flat_frag_stop_idx", - "charge", - "rt_library", - "rt_observed", - "mobility_library", - "mobility_observed", - "mz_library", - "mz_observed", - "proteins", - "genes", - "mods", - "mod_sites", - "proba", - "decoy", - ] - - if optional_precursor_columns is None: - optional_precursor_columns = [ - "rt_calibrated", - "mz_calibrated", - ] - - psm_df = pd.read_parquet(os.path.join(folder, "psm.parquet")) - frag_df = pd.read_parquet(os.path.join(folder, "frag.parquet")) - - if not set(mandatory_precursor_columns).issubset(psm_df.columns): - raise ValueError( - f"mandatory_precursor_columns must be a subset of psm_df.columns didnt find {set(mandatory_precursor_columns) - set(psm_df.columns)}" - ) + psm_df = pd.read_parquet(os.path.join(folder, "psm.parquet")) + frag_df = pd.read_parquet(os.path.join(folder, "frag.parquet")) - available_columns = sorted( - list( - set(mandatory_precursor_columns) - | (set(optional_precursor_columns) & set(psm_df.columns)) - ) + if not set(mandatory_precursor_columns).issubset(psm_df.columns): + raise ValueError( + f"mandatory_precursor_columns must be a subset of psm_df.columns didnt find {set(mandatory_precursor_columns) - set(psm_df.columns)}" ) - psm_df = psm_df[available_columns] - # get foldername of the output folder - foldername = os.path.basename(folder) - psm_df["raw_name"] = foldername + available_columns = sorted( + list( + set(mandatory_precursor_columns) + | (set(optional_precursor_columns) & set(psm_df.columns)) + ) + ) + psm_df = psm_df[available_columns] - # remove decoy precursors - # assert that decoy is int - psm_df["decoy"] = psm_df["decoy"].astype(int) - psm_df = psm_df[psm_df["decoy"] == 0].reset_index(drop=True) + foldername = os.path.basename(folder) + psm_df["raw_name"] = foldername - self._precursor_df = pd.DataFrame() - for col in psm_df.columns: - self._precursor_df[col] = psm_df[col] + psm_df["decoy"] = psm_df["decoy"].astype(int) + psm_df = psm_df[psm_df["decoy"] == 0].reset_index(drop=True) - # self._precursor_df.set_index('precursor_idx', inplace=True) - # Change the data type of the mods column to string - self._precursor_df["mods"] = self._precursor_df["mods"].astype(str) + speclib._precursor_df = pd.DataFrame() + for col in psm_df.columns: + speclib._precursor_df[col] = psm_df[col] - self._precursor_df["mod_sites"] = self._precursor_df["mod_sites"].astype(str) + speclib._precursor_df["mods"] = speclib._precursor_df["mods"].astype(str) + speclib._precursor_df["mod_sites"] = speclib._precursor_df["mod_sites"].astype(str) + speclib._precursor_df["mods"] = speclib._precursor_df["mods"].replace("nan", "") + speclib._precursor_df["mod_sites"] = speclib._precursor_df["mod_sites"].replace( + "nan", "" + ) - # Replace nan with empty string - self._precursor_df["mods"] = self._precursor_df["mods"].replace("nan", "") - self._precursor_df["mod_sites"] = self._precursor_df["mod_sites"].replace( - "nan", "" - ) + speclib.calc_precursor_mz() - self.calc_precursor_mz() - - for col in ["rt", "mz", "mobility"]: - if f"{col}_observed" in psm_df.columns: - values = psm_df[f"{col}_observed"] - elif "{col}_calibrated" in psm_df.columns: - values = psm_df["{col}_calibrated"] - else: - values = psm_df[f"{col}_library"] - self._precursor_df[col] = values - - # ----------------- Fragment ----------------- - # Filer fragments that are not used in the precursors - frag_df = frag_df[ - frag_df["precursor_idx"].isin(self._precursor_df["precursor_idx"]) + for col in ["rt", "mz", "mobility"]: + if f"{col}_observed" in psm_df.columns: + values = psm_df[f"{col}_observed"] + elif "{col}_calibrated" in psm_df.columns: + values = psm_df["{col}_calibrated"] + else: + values = psm_df[f"{col}_library"] + speclib._precursor_df[col] = values + + frag_df = frag_df[ + frag_df["precursor_idx"].isin(speclib._precursor_df["precursor_idx"]) + ] + speclib._fragment_df = frag_df[ + [ + "mz", + "intensity", + "precursor_idx", + "frag_idx", + "correlation", + "number", + "type", + "charge", + "loss_type", + "position", ] - self._fragment_df = frag_df[ - ["mz", "intensity", "precursor_idx", "frag_idx", "correlation"] - ].copy() + ].copy() - for col in ["number", "type", "charge"]: - if col in self.custom_fragment_df_columns: - self._fragment_df.loc[:, col] = frag_df.loc[:, col] - - if "position" in self.custom_fragment_df_columns: - if "position" in frag_df.columns: - self._fragment_df.loc[:, "position"] = frag_df.loc[:, "position"] - else: - self._calculate_fragment_position() - - return self._precursor_df, self._fragment_df + return speclib.to_speclib_base( + charged_frag_types=charged_frag_types, + additional_columns=["intensity", "correlation"], + ) class BaseAccumulator: @@ -226,34 +182,6 @@ def post_process(self) -> None: raise NotImplementedError("Subclasses must implement the post_process method") -def process_folder(folder): - """ - Process a folder and return the speclibase object. - It does so by parsing the output folderto get SpecLibFlat object and then converting it to SpecLibBase object. - And for now it assumes that the loss_type is 0 for all the fragments. - - Parameters - ---------- - folder : str - The folder to be processed. - - Returns - ------- - SpecLibBase - The SpecLibBase object obtained from the output folder. - """ - speclibflat_object = SpecLibFlatFromOutput() - psm, frag_df = speclibflat_object.parse_output_folder(folder) - speclibflat_object._fragment_df["loss_type"] = 0 - speclibase = speclibflat_object.to_SpecLibBase() - # sort columns - for dense_df_name in speclibase.available_dense_fragment_dfs(): - df = getattr(speclibase, dense_df_name) - setattr(speclibase, dense_df_name, df[df.columns.sort_values()]) - - return speclibase - - def error_callback(e): logger.error(e, exc_info=True) @@ -264,11 +192,14 @@ class AccumulationBroadcaster: And broadcasts the output of each folder to the subscribers. """ - def __init__(self, folders: list, number_of_processes: int): - self._folders = folders + def __init__( + self, folder_list: list, number_of_processes: int, processing_kwargs: dict + ): + self._folder_list = folder_list self._number_of_processes = number_of_processes self._subscribers = [] self._lock = threading.Lock() # Lock to prevent two processes trying to update the same subscriber at the same time + self._processing_kwargs = processing_kwargs def subscribe(self, subscriber: BaseAccumulator): self._subscribers.append(subscriber) @@ -290,10 +221,11 @@ def _post_process(self): def run(self): with multiprocessing.Pool(processes=self._number_of_processes) as pool: - for folder in self._folders: + for folder in self._folder_list: _ = pool.apply_async( process_folder, (folder,), + self._processing_kwargs, callback=self._broadcast, error_callback=error_callback, ) diff --git a/alphadia/outputtransform.py b/alphadia/outputtransform.py index d98ad9cd..3b182096 100644 --- a/alphadia/outputtransform.py +++ b/alphadia/outputtransform.py @@ -10,7 +10,7 @@ import directlfq.utils as lfqutils import numpy as np import pandas as pd -from alphabase.peptide import precursor +from alphabase.peptide import fragment, precursor from alphabase.spectral_library import base from alphabase.spectral_library.base import SpecLibBase from sklearn.model_selection import train_test_split @@ -481,8 +481,16 @@ def build_transfer_library( ], ) accumulationBroadcaster = AccumulationBroadcaster( - folder_list, number_of_processes + folder_list=folder_list, + number_of_processes=number_of_processes, + processing_kwargs={ + "charged_frag_types": fragment.get_charged_frag_types( + self.config["transfer_library"]["fragment_types"].split(";"), + self.config["transfer_library"]["max_charge"], + ) + }, ) + accumulationBroadcaster.subscribe(transferAccumulator) accumulationBroadcaster.run() logger.info( diff --git a/alphadia/plexscoring.py b/alphadia/plexscoring.py index 4ae4e674..4b157ab2 100644 --- a/alphadia/plexscoring.py +++ b/alphadia/plexscoring.py @@ -767,6 +767,9 @@ def process( psm_proto_df.fragment_charge[self.output_idx, : len(fragments.charge)] = ( fragments.charge ) + psm_proto_df.fragment_loss_type[ + self.output_idx, : len(fragments.loss_type) + ] = fragments.loss_type # ============= FRAGMENT MOBILITY CORRELATIONS ============= # will be skipped if no mobility dimension is present @@ -1294,6 +1297,7 @@ class OuptutPsmDF: fragment_number: nb.uint8[:, ::1] fragment_type: nb.uint8[:, ::1] fragment_charge: nb.uint8[:, ::1] + fragment_loss_type: nb.uint8[:, ::1] def __init__(self, n_psm, top_k_fragments): self.valid = np.zeros(n_psm, dtype=np.bool_) @@ -1321,6 +1325,7 @@ def __init__(self, n_psm, top_k_fragments): self.fragment_number = np.zeros((n_psm, top_k_fragments), dtype=np.uint8) self.fragment_type = np.zeros((n_psm, top_k_fragments), dtype=np.uint8) self.fragment_charge = np.zeros((n_psm, top_k_fragments), dtype=np.uint8) + self.fragment_loss_type = np.zeros((n_psm, top_k_fragments), dtype=np.uint8) def to_fragment_df(self): mask = self.fragment_mz_library.flatten() > 0 @@ -1339,6 +1344,7 @@ def to_fragment_df(self): self.fragment_number.flatten()[mask], self.fragment_type.flatten()[mask], self.fragment_charge.flatten()[mask], + self.fragment_loss_type.flatten()[mask], ) def to_precursor_df(self): @@ -1823,6 +1829,7 @@ def collect_fragments( "number", "type", "charge", + "loss_type", ] df = pd.DataFrame( { diff --git a/alphadia/workflow/peptidecentric.py b/alphadia/workflow/peptidecentric.py index b03c05c0..2846a333 100644 --- a/alphadia/workflow/peptidecentric.py +++ b/alphadia/workflow/peptidecentric.py @@ -1154,7 +1154,7 @@ def requantify_fragments( config = plexscoring.CandidateConfig() config.update( { - "top_k_fragments": 1000, # Use all fragments ever expected, needs to be larger than charged_frag_types(8)*max_sequence_len(100?) + "top_k_fragments": 9999, # Use all fragments ever expected, needs to be larger than charged_frag_types(8)*max_sequence_len(100?) "precursor_mz_tolerance": self.config["search"]["target_ms1_tolerance"], "fragment_mz_tolerance": self.config["search"]["target_ms2_tolerance"], }