Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transfer library building with diverse frag types #423

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion alphadia/constants/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ transfer_library:
# if true, the library is created for transfer learning
enabled: False

# semicolon separated list of fragment types to include in the library. possible values are 'a', 'b', 'c', 'x', 'y', 'z'
# semicolon separated list of fragment types to include in the library. possible values are
# 'a', 'b', 'c', 'x', 'y', 'z'
# 'b_H2O', 'y_H2O', 'b_NH3', 'y_NH3', 'b_modloss', 'y_modloss', 'c_lossH', 'z_addH'
fragment_types: 'b;y'
Comment on lines +315 to 318
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this will cause a merge conflict with #424 .. maybe omit it here to make life easier (assuming it's the same information that is added)


# maximum charge for fragments
Expand Down
288 changes: 110 additions & 178 deletions alphadia/outputaccumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,168 +37,124 @@
logger = logging.getLogger()


class SpecLibFlatFromOutput(SpecLibFlat):
def __init__(self, **kwargs):
super().__init__(**kwargs)

def _calculate_fragment_position(self):
"""
Calculate the position of the fragments based on the type and number of the fragment.
"""
# Fragtypes from ascii to char
available_frag_types = self._fragment_df["type"].unique()
self.frag_types_as_char = {i: chr(i) for i in available_frag_types}

mapped_frag_types = self._fragment_df["type"].map(self.frag_types_as_char)
a_b_c_fragments = mapped_frag_types.isin(["a", "b", "c"])
x_y_z_fragments = mapped_frag_types.isin(["x", "y", "z"])

precursor_idx_to_nAA = (
self._precursor_df[["precursor_idx", "nAA"]]
.set_index("precursor_idx")
.to_dict()["nAA"]
)
# For X,Y,Z frags calculate the position as being the nAA of the precursor - number of the fragment
x_y_z_number = (
self._fragment_df.loc[x_y_z_fragments, "precursor_idx"].map(
precursor_idx_to_nAA
)
- self._fragment_df.loc[x_y_z_fragments, "number"]
)
self._fragment_df.loc[x_y_z_fragments, "position"] = x_y_z_number - 1

# For A,B,C frags calculate the position as being the number of the fragment
self._fragment_df.loc[a_b_c_fragments, "position"] = (
self._fragment_df.loc[a_b_c_fragments, "number"] - 1
)

# Change position to int
self._fragment_df["position"] = self._fragment_df["position"].astype(int)

def parse_output_folder(
self,
folder: str,
mandatory_precursor_columns: list[str] | None = None,
optional_precursor_columns: list[str] | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Parse the output folder to get a precursor and fragment dataframe in the flat format.
def process_folder(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we find a more specific name here? build_speclibflat_from_folder?

folder: str,
mandatory_precursor_columns: list[str] | None = None,
optional_precursor_columns: list[str] | None = None,
charged_frag_types: list[str] | None = None,
) -> SpecLibFlat:
"""
Parse an output folder and return a SpecLibFlat object containing the precursor and fragment data.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse a folder? maybe be a bit more specific here


Parameters
----------
folder : str
The output folder to be parsed.
mandatory_precursor_columns : list, optional
The columns to be selected from the precursor dataframe, by default ['precursor_idx', 'sequence', 'flat_frag_start_idx', 'flat_frag_stop_idx', 'charge', 'rt_library', 'mobility_library', 'mz_library', 'proteins', 'genes', 'mods', 'mod_sites', 'proba']
Parameters
----------
folder : str
The output folder to be parsed.
mandatory_precursor_columns : list[str], optional
The columns to be selected from the precursor dataframe
optional_precursor_columns : list[str], optional
Additional optional columns to include if present

Returns
-------
pd.DataFrame
The precursor dataframe.
pd.DataFrame
The fragment dataframe.
Returns
-------
SpecLibFlat
A spectral library object containing the parsed data
"""
speclib = SpecLibFlat()

if mandatory_precursor_columns is None:
mandatory_precursor_columns = [
"precursor_idx",
"sequence",
"flat_frag_start_idx",
"flat_frag_stop_idx",
"charge",
"rt_library",
"rt_observed",
"mobility_library",
"mobility_observed",
"mz_library",
"mz_observed",
"proteins",
"genes",
"mods",
"mod_sites",
"proba",
"decoy",
]

if optional_precursor_columns is None:
optional_precursor_columns = [
"rt_calibrated",
"mz_calibrated",
]

"""
if mandatory_precursor_columns is None:
mandatory_precursor_columns = [
"precursor_idx",
"sequence",
"flat_frag_start_idx",
"flat_frag_stop_idx",
"charge",
"rt_library",
"rt_observed",
"mobility_library",
"mobility_observed",
"mz_library",
"mz_observed",
"proteins",
"genes",
"mods",
"mod_sites",
"proba",
"decoy",
]

if optional_precursor_columns is None:
optional_precursor_columns = [
"rt_calibrated",
"mz_calibrated",
]

psm_df = pd.read_parquet(os.path.join(folder, "psm.parquet"))
frag_df = pd.read_parquet(os.path.join(folder, "frag.parquet"))

if not set(mandatory_precursor_columns).issubset(psm_df.columns):
raise ValueError(
f"mandatory_precursor_columns must be a subset of psm_df.columns didnt find {set(mandatory_precursor_columns) - set(psm_df.columns)}"
)
psm_df = pd.read_parquet(os.path.join(folder, "psm.parquet"))
frag_df = pd.read_parquet(os.path.join(folder, "frag.parquet"))

available_columns = sorted(
list(
set(mandatory_precursor_columns)
| (set(optional_precursor_columns) & set(psm_df.columns))
)
if not set(mandatory_precursor_columns).issubset(psm_df.columns):
raise ValueError(
f"mandatory_precursor_columns must be a subset of psm_df.columns didnt find {set(mandatory_precursor_columns) - set(psm_df.columns)}"
)
psm_df = psm_df[available_columns]

# get foldername of the output folder
foldername = os.path.basename(folder)
psm_df["raw_name"] = foldername
available_columns = sorted(
list(
set(mandatory_precursor_columns)
| (set(optional_precursor_columns) & set(psm_df.columns))
)
)
psm_df = psm_df[available_columns]

# remove decoy precursors
# assert that decoy is int
psm_df["decoy"] = psm_df["decoy"].astype(int)
psm_df = psm_df[psm_df["decoy"] == 0].reset_index(drop=True)
foldername = os.path.basename(folder)
psm_df["raw_name"] = foldername
Comment on lines +108 to +109
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) psm_df["raw_name"] = os.path.basename(folder)


self._precursor_df = pd.DataFrame()
for col in psm_df.columns:
self._precursor_df[col] = psm_df[col]
psm_df["decoy"] = psm_df["decoy"].astype(int)
psm_df = psm_df[psm_df["decoy"] == 0].reset_index(drop=True)

# self._precursor_df.set_index('precursor_idx', inplace=True)
# Change the data type of the mods column to string
self._precursor_df["mods"] = self._precursor_df["mods"].astype(str)
speclib._precursor_df = pd.DataFrame()
for col in psm_df.columns:
speclib._precursor_df[col] = psm_df[col]
Comment on lines +114 to +116
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why we not just (deep)copy here?


self._precursor_df["mod_sites"] = self._precursor_df["mod_sites"].astype(str)
speclib._precursor_df["mods"] = speclib._precursor_df["mods"].astype(str)
speclib._precursor_df["mod_sites"] = speclib._precursor_df["mod_sites"].astype(str)
speclib._precursor_df["mods"] = speclib._precursor_df["mods"].replace("nan", "")
speclib._precursor_df["mod_sites"] = speclib._precursor_df["mod_sites"].replace(
"nan", ""
)

# Replace nan with empty string
self._precursor_df["mods"] = self._precursor_df["mods"].replace("nan", "")
self._precursor_df["mod_sites"] = self._precursor_df["mod_sites"].replace(
"nan", ""
)
speclib.calc_precursor_mz()

self.calc_precursor_mz()

for col in ["rt", "mz", "mobility"]:
if f"{col}_observed" in psm_df.columns:
values = psm_df[f"{col}_observed"]
elif "{col}_calibrated" in psm_df.columns:
values = psm_df["{col}_calibrated"]
else:
values = psm_df[f"{col}_library"]
self._precursor_df[col] = values

# ----------------- Fragment -----------------
# Filer fragments that are not used in the precursors
frag_df = frag_df[
frag_df["precursor_idx"].isin(self._precursor_df["precursor_idx"])
for col in ["rt", "mz", "mobility"]:
if f"{col}_observed" in psm_df.columns:
values = psm_df[f"{col}_observed"]
elif "{col}_calibrated" in psm_df.columns:
values = psm_df["{col}_calibrated"]
else:
values = psm_df[f"{col}_library"]
speclib._precursor_df[col] = values

frag_df = frag_df[
frag_df["precursor_idx"].isin(speclib._precursor_df["precursor_idx"])
]
speclib._fragment_df = frag_df[
[
"mz",
"intensity",
"precursor_idx",
"frag_idx",
"correlation",
"number",
"type",
"charge",
"loss_type",
"position",
]
self._fragment_df = frag_df[
["mz", "intensity", "precursor_idx", "frag_idx", "correlation"]
].copy()
].copy()

for col in ["number", "type", "charge"]:
if col in self.custom_fragment_df_columns:
self._fragment_df.loc[:, col] = frag_df.loc[:, col]

if "position" in self.custom_fragment_df_columns:
if "position" in frag_df.columns:
self._fragment_df.loc[:, "position"] = frag_df.loc[:, "position"]
else:
self._calculate_fragment_position()

return self._precursor_df, self._fragment_df
return speclib.to_speclib_base(
charged_frag_types=charged_frag_types,
additional_columns=["intensity", "correlation"],
)


class BaseAccumulator:
Expand Down Expand Up @@ -226,34 +182,6 @@ def post_process(self) -> None:
raise NotImplementedError("Subclasses must implement the post_process method")


def process_folder(folder):
"""
Process a folder and return the speclibase object.
It does so by parsing the output folderto get SpecLibFlat object and then converting it to SpecLibBase object.
And for now it assumes that the loss_type is 0 for all the fragments.

Parameters
----------
folder : str
The folder to be processed.

Returns
-------
SpecLibBase
The SpecLibBase object obtained from the output folder.
"""
speclibflat_object = SpecLibFlatFromOutput()
psm, frag_df = speclibflat_object.parse_output_folder(folder)
speclibflat_object._fragment_df["loss_type"] = 0
speclibase = speclibflat_object.to_SpecLibBase()
# sort columns
for dense_df_name in speclibase.available_dense_fragment_dfs():
df = getattr(speclibase, dense_df_name)
setattr(speclibase, dense_df_name, df[df.columns.sort_values()])

return speclibase


def error_callback(e):
logger.error(e, exc_info=True)

Expand All @@ -264,11 +192,14 @@ class AccumulationBroadcaster:
And broadcasts the output of each folder to the subscribers.
"""

def __init__(self, folders: list, number_of_processes: int):
self._folders = folders
def __init__(
self, folder_list: list, number_of_processes: int, processing_kwargs: dict
):
self._folder_list = folder_list
self._number_of_processes = number_of_processes
self._subscribers = []
self._lock = threading.Lock() # Lock to prevent two processes trying to update the same subscriber at the same time
self._processing_kwargs = processing_kwargs

def subscribe(self, subscriber: BaseAccumulator):
self._subscribers.append(subscriber)
Expand All @@ -290,10 +221,11 @@ def _post_process(self):

def run(self):
with multiprocessing.Pool(processes=self._number_of_processes) as pool:
for folder in self._folders:
for folder in self._folder_list:
_ = pool.apply_async(
process_folder,
(folder,),
self._processing_kwargs,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks wrong (but isn't, I know ;-))
maybe

args=(folder,),
kwds=self._processing_kwargs,

callback=self._broadcast,
error_callback=error_callback,
)
Expand Down
12 changes: 10 additions & 2 deletions alphadia/outputtransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import directlfq.utils as lfqutils
import numpy as np
import pandas as pd
from alphabase.peptide import precursor
from alphabase.peptide import fragment, precursor
from alphabase.spectral_library import base
from alphabase.spectral_library.base import SpecLibBase
from sklearn.model_selection import train_test_split
Expand Down Expand Up @@ -481,8 +481,16 @@ def build_transfer_library(
],
)
accumulationBroadcaster = AccumulationBroadcaster(
folder_list, number_of_processes
folder_list=folder_list,
number_of_processes=number_of_processes,
processing_kwargs={
"charged_frag_types": fragment.get_charged_frag_types(
self.config["transfer_library"]["fragment_types"].split(";"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this is not compatible with #424

self.config["transfer_library"]["max_charge"],
)
},
)

accumulationBroadcaster.subscribe(transferAccumulator)
accumulationBroadcaster.run()
logger.info(
Expand Down
Loading
Loading