Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor flat to dense conversion #264

Merged
merged 9 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 269 additions & 0 deletions alphabase/peptide/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1596,3 +1596,272 @@ def _calc_fragment_cardinality(
)

return pd.DataFrame(fragment_cardinality, columns=fragment_mz_df.columns)


def _calc_column_indices(
fragment_df: pd.DataFrame,
charged_frag_types: list,
) -> np.ndarray:
"""
Calculate the column indices for a dense fragment matrix.
Columns are sorted according to `fragment.sort_charged_frag_types`

Parameters
----------
fragment_df : pd.DataFrame
Flat fragment dataframe with columns 'type', 'loss_type', 'charge'

charged_frag_types : list
List of charged fragment types as generated by `fragment.get_charged_frag_types`

Returns
-------
np.ndarray
Column indices with shape (n_fragments,)
"""
# features.LOSS_INVERSE but with separator '_' for non-empty values
_loss_inverse_separator = {
key: ("_" + value if value != "" else value)
for key, value in LOSS_INVERSE.items()
}

sorted_charged_frag_types = sort_charged_frag_types(charged_frag_types)

# mapping of charged fragment types to indices
inverse_frag_type_mapping = dict(
zip(sorted_charged_frag_types, range(len(sorted_charged_frag_types)))
)

# mapping of fragment type, loss type, charge to a dense column name
frag_type_list = (
fragment_df["type"].map(SERIES_INVERSE)
+ fragment_df["loss_type"].map(_loss_inverse_separator)
+ FRAGMENT_CHARGE_SEPARATOR
+ fragment_df["charge"].astype(str)
)

# Convert to integer array, using -1 for any unmapped values
return (
frag_type_list.map(inverse_frag_type_mapping)
.fillna(-1)
.astype(np.int32)
.to_numpy()
)


def _calc_row_indices(
precursor_naa: np.ndarray,
fragment_position: np.ndarray,
precursor_df_idx: np.ndarray,
fragment_df_idx: np.ndarray,
frag_start_idx: None | np.ndarray = None,
frag_stop_idx: None | np.ndarray = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Calculate new start and stop index mapping for flat fragments.

Returns the vector of row indices for the dense fragment matrix, shape (n_fragments,)
and the new start and stop indices for the flat fragments, shape (n_precursors,)

Parameters
----------
precursor_naa : np.ndarray
Array of precursor nAA values
fragment_position : np.ndarray
Array of fragment positions
precursor_df_idx : np.ndarray
Array of precursor indices
fragment_df_idx : np.ndarray
Array of fragment indices

Returns
-------
tuple[np.ndarray, np.ndarray, np.ndarray]
(row_indices, frag_start_idx, frag_stop_idx)
"""
if len(fragment_position) != len(fragment_df_idx):
raise ValueError(
"fragment_position and fragment_df_idx must have the same length"
)

if len(precursor_naa) != len(precursor_df_idx):
raise ValueError("precursor_naa and precursor_df_idx must have the same length")

build_index = (frag_start_idx is None) or (frag_stop_idx is None)
if build_index:
frag_stop_idx = (precursor_naa - 1).cumsum()

# Start indices for each precursor is the accumlated nAA of the previous precursor and for the first precursor is 0
frag_start_idx = np.zeros_like(frag_stop_idx)
frag_start_idx[1:] = frag_stop_idx[
:-1
] # shift values right by 1, first element remains 0

else:
if (frag_start_idx is None) or (frag_stop_idx is None):
raise ValueError(
"frag_start_idx and frag_stop_idx must both be provided if one is provided"
)
elif len(frag_start_idx) != len(frag_stop_idx):
raise ValueError(
"frag_start_idx and frag_stop_idx must have the same length"
)

# Row indices of a fragment being the accumlated nAA of the precursor + fragment position -1
precursor_idx_to_accumlated_nAA = dict(zip(precursor_df_idx, frag_start_idx))
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved
# Convert numpy array to pandas Series for mapping
# This massively speeds up the mapping
row_indices = (
pd.Series(fragment_df_idx).map(
precursor_idx_to_accumlated_nAA, na_action="ignore"
)
).to_numpy() + fragment_position

# fill nan with -1 and cast to int32
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved
row_indices[np.isnan(row_indices)] = -1
row_indices = row_indices.astype(np.int32)

return row_indices, frag_start_idx, frag_stop_idx


def _start_stop_to_idx(precursor_df, fragment_df, index_column="precursor_idx"):
"""
Convert start/stop indices to precursor and fragment indices.

Parameters
----------
precursor_df : pd.DataFrame
DataFrame containing flat_frag_start_idx and flat_frag_stop_idx columns
fragment_df : pd.DataFrame
DataFrame containing fragment information
index_column : str, optional
Name of the index column to use, by default "precursor_idx"

Returns
-------
tuple
(precursor_df_idx, fragment_df_idx) - numpy arrays containing indices
"""
# Handle empty DataFrames
if precursor_df.empty or fragment_df.empty:
return np.array([], dtype=np.int64), np.array([], dtype=np.int64)

# Sort precursor_df by 'flat_frag_start_idx'
precursor_df_sorted = (
precursor_df[["flat_frag_start_idx", "flat_frag_stop_idx"]]
.copy()
.reset_index(drop=True)
)
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved
precursor_df_sorted = precursor_df_sorted.sort_values("flat_frag_start_idx")

# Add precursor_idx to precursor_df as 0,1,2,3...
precursor_df_sorted[index_column] = np.arange(precursor_df_sorted.shape[0])

# Add precursor_idx to fragment_df
fragment_df_idx = np.repeat(
precursor_df_sorted[index_column].to_numpy(),
precursor_df_sorted["flat_frag_stop_idx"].to_numpy()
- precursor_df_sorted["flat_frag_start_idx"].to_numpy(),
)

if len(fragment_df_idx) != fragment_df.shape[0]:
raise ValueError(
f"Number of fragments {len(fragment_df_idx)} is not equal to the number of rows in fragment_df {fragment_df.shape[0]}"
)

# Restore original order of precursor_df
precursor_df_resorted = precursor_df_sorted.sort_index()
precursor_df_idx = precursor_df_resorted[index_column].to_numpy()

return precursor_df_idx, fragment_df_idx


def _create_dense_matrices(
precursor_df: pd.DataFrame,
fragment_df: pd.DataFrame,
charged_frag_types: list,
flat_columns: list | None = None,
) -> tuple[dict, np.ndarray, np.ndarray]:
"""
Create dense matrices for fragment dataframes.

Parameters
----------
precursor_df : pd.DataFrame
Precursor dataframe
fragment_df : pd.DataFrame
Fragment dataframe
charged_frag_types : list
List of charged fragment types
flat_columns : list | None, optional
List of columns to create dense matrices for, by default None
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
dict
Dictionary with dense matrices
np.ndarray
Start indices for the dense fragments
np.ndarray
Stop indices for the dense fragments
"""

if flat_columns is None:
flat_columns = ["intensity"]

optional_columns = [
col
for col in ["precursor_idx", "flat_frag_start_idx", "flat_frag_stop_idx"]
if col in precursor_df.columns
]
precursor_df_copy = precursor_df[
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved
["sequence", "mods", "mod_sites", "charge", "nAA"] + optional_columns
].copy()
fragment_mz_df = create_fragment_mz_dataframe(
precursor_df_copy,
charged_frag_types,
)

if ("precursor_idx" in precursor_df_copy.columns) and (
"precursor_idx" in fragment_df.columns
):
precursor_df_idx = precursor_df_copy["precursor_idx"]
fragment_df_idx = fragment_df["precursor_idx"]

elif ("flat_frag_start_idx" in precursor_df_copy.columns) and (
"flat_frag_stop_idx" in precursor_df_copy.columns
):
precursor_df_idx, fragment_df_idx = _start_stop_to_idx(
precursor_df_copy, fragment_df
)

else:
raise ValueError(
"Mapping of fragment indices to precursor indices failed, no 'precursor_idx' or 'flat_frag_start_idx' and 'flat_frag_stop_idx' columns found."
)

column_indices = _calc_column_indices(fragment_df, charged_frag_types)
row_indices, frag_start_idx, frag_stop_idx = _calc_row_indices(
precursor_df_copy["nAA"].to_numpy(),
fragment_df["position"].to_numpy(),
precursor_df_idx,
fragment_df_idx,
precursor_df_copy["frag_start_idx"].to_numpy(),
precursor_df_copy["frag_stop_idx"].to_numpy(),
)

# remove all fragments that could not be mapped to a column
match_mask = column_indices != -1
column_indices = column_indices[match_mask]
row_indices = row_indices[match_mask]

# create a dictionary with the mz matrix and the flat columns
df_collection = {"mz": fragment_mz_df}
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implemented here:
#277

for column_name in flat_columns:
matrix = np.zeros_like(fragment_mz_df.values, dtype=PEAK_INTENSITY_DTYPE)
matrix[row_indices, column_indices] = fragment_df[column_name].values[
match_mask
]
df_collection[column_name] = pd.DataFrame(matrix, columns=charged_frag_types)
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved

return df_collection, frag_start_idx, frag_stop_idx
Loading
Loading