From ca9e757606658dde790b36b4338fc04aeddb4691 Mon Sep 17 00:00:00 2001 From: Ana Rosa Date: Sun, 15 Jun 2025 01:24:30 +0100 Subject: [PATCH] ENH #61033: Add coalesce_keys option to DataFrame.join This adds a coalesce_keys keyword to DataFrame.join to allow preservation of both join key columns (id and id_right), instead of automatically coalescing them into a single column. This is especially useful in full outer joins, where retaining information about unmatched keys from both sides is important. Example: df1.join(df2, on=id, coalesce_keys=False) This will result in both id and id_right columns being preserved, rather than merged into a single id. Includes: - Modifications to join internals (core/reshape/merge.py) - A dedicated test file (test_merge_coalesce.py) covering: - Preservation of join keys when coalesce_keys=False - Comparison with default behavior (coalesce_keys=True) - Full outer joins with asymmetric key presence Co-authored-by: Maria Pereira --- pandas/core/reshape/merge.py | 277 ++++++++++++++---- pandas/tests/reshape/merge/example.py | 44 +++ .../reshape/merge/test_merge_coalesce.py | 71 +++++ 3 files changed, 337 insertions(+), 55 deletions(-) create mode 100644 pandas/tests/reshape/merge/example.py create mode 100644 pandas/tests/reshape/merge/test_merge_coalesce.py diff --git a/pandas/core/reshape/merge.py b/pandas/core/reshape/merge.py index 6de3d5c3f9c09..5ff811023b3a5 100644 --- a/pandas/core/reshape/merge.py +++ b/pandas/core/reshape/merge.py @@ -156,6 +156,7 @@ def merge( copy: bool | lib.NoDefault = lib.no_default, indicator: str | bool = False, validate: str | None = None, + coalesce_keys: bool = True, ) -> DataFrame: """ Merge DataFrame or named Series objects with a database-style join. @@ -382,6 +383,7 @@ def merge( suffixes=suffixes, indicator=indicator, validate=validate, + coalesce_keys=coalesce_keys, ) else: op = _MergeOperation( @@ -397,6 +399,7 @@ def merge( suffixes=suffixes, indicator=indicator, validate=validate, + coalesce_keys=coalesce_keys, ) return op.get_result() @@ -413,6 +416,7 @@ def _cross_merge( suffixes: Suffixes = ("_x", "_y"), indicator: str | bool = False, validate: str | None = None, + coalesce_keys: bool = True, ) -> DataFrame: """ See merge.__doc__ with how='cross' @@ -449,6 +453,7 @@ def _cross_merge( suffixes=suffixes, indicator=indicator, validate=validate, + coalesce_keys=coalesce_keys, ) del res[cross_col] return res @@ -523,6 +528,7 @@ def merge_ordered( fill_method: str | None = None, suffixes: Suffixes = ("_x", "_y"), how: JoinHow = "outer", + coalesce_keys: bool = True, ) -> DataFrame: """ Perform a merge for ordered data with optional filling/interpolation. @@ -629,6 +635,7 @@ def _merger(x, y) -> DataFrame: suffixes=suffixes, fill_method=fill_method, how=how, + coalesce_keys=coalesce_keys, ) return op.get_result() @@ -671,6 +678,7 @@ def merge_asof( tolerance: int | datetime.timedelta | None = None, allow_exact_matches: bool = True, direction: str = "backward", + coalesce_keys: bool = True, ) -> DataFrame: """ Perform a merge by key distance. @@ -926,6 +934,7 @@ def merge_asof( tolerance=tolerance, allow_exact_matches=allow_exact_matches, direction=direction, + coalesce_keys=coalesce_keys, ) return op.get_result() @@ -953,6 +962,7 @@ class _MergeOperation: join_names: list[Hashable] right_join_keys: list[ArrayLike] left_join_keys: list[ArrayLike] + coalesce_keys: bool def __init__( self, @@ -968,12 +978,14 @@ def __init__( suffixes: Suffixes = ("_x", "_y"), indicator: str | bool = False, validate: str | None = None, + coalesce_keys: bool = True, ) -> None: _left = _validate_operand(left) _right = _validate_operand(right) self.left = self.orig_left = _left self.right = self.orig_right = _right self.how, self.anti_join = self._validate_how(how) + self.coalesce_keys = coalesce_keys self.on = com.maybe_make_list(on) @@ -994,7 +1006,8 @@ def __init__( f"right_index parameter must be of type bool, not {type(right_index)}" ) - # GH 40993: raise when merging between different levels; enforced in 2.0 + # GH 40993: raise when merging between different levels; enforced in + # 2.0 if _left.columns.nlevels != _right.columns.nlevels: msg = ( "Not allowed to merge between different levels. " @@ -1011,6 +1024,7 @@ def __init__( self.join_names, left_drop, right_drop, + self.coalesce_keys, ) = self._get_merge_keys() if left_drop: @@ -1252,18 +1266,55 @@ def _maybe_add_join_keys( left_indexer: npt.NDArray[np.intp] | None, right_indexer: npt.NDArray[np.intp] | None, ) -> None: + """Adds or updates join key columns in the merged DataFrame result. + + This method is responsible for incorporating the join keys (e.g., 'id' column) + into the final merged DataFrame (`result`). It handles both coalescing + (merging left and right keys into a single column) and keeping them separate + (e.g., 'id' and 'id_right'), based on the `coalesce_keys` flag. + + For coalesced keys, it intelligently fills missing values from one side + with values from the other, ensuring a complete join key column. + + Args: + result (DataFrame): The partially merged DataFrame to which join keys + will be added or updated. + left_indexer (np.ndarray[np.intp] | None): An array indicating the + original row positions from the left DataFrame for each row in `result`. + -1 indicates a row that originated solely from the right DataFrame. + None if no indexing is needed (e.g., simple concatenation). + right_indexer (np.ndarray[np.intp] | None): An array indicating the + original row positions from the right DataFrame for each row in `result`. + -1 indicates a row that originated solely from the left DataFrame. + None if no indexing is needed. + + Returns: + None: The `result` DataFrame is modified in-place. + """ left_has_missing = None right_has_missing = None - assert all(isinstance(x, _known) for x in self.left_join_keys) + # Ensure 'self.left_join_keys' contains valid objects + # (ndarray, Series,or Index) + assert all( + isinstance(x, (np.ndarray, Series, Index)) for x in self.left_join_keys + ) keys = zip(self.join_names, self.left_on, self.right_on) for i, (name, lname, rname) in enumerate(keys): + # Skip if there's no need to fill (e.g., no join keys involved or + # no columns to create) if not _should_fill(lname, rname): continue take_left, take_right = None, None + # Determine if left/right join keys need to be taken from + # self.left/self.right + # This part ensures that if a column named 'name' + # already exists in 'result', + # we might need to fetch the original values for type consistency + # or missing value handling. if name in result: if left_indexer is not None or right_indexer is not None: if name in self.left: @@ -1295,16 +1346,19 @@ def _maybe_add_join_keys( take_right = self.right[name]._values else: + # If 'name' is not already in 'result', default to taking + # values from join_keys take_left = self.left_join_keys[i] take_right = self.right_join_keys[i] if take_left is not None or take_right is not None: + # Get the actual values for left and right join keys, + # handling indexers and fill values for missing entries (-1) if take_left is None: lvals = result[name]._values elif left_indexer is None: lvals = take_left else: - # TODO: can we pin down take_left's type earlier? take_left = extract_array(take_left, extract_numpy=True) lfill = na_value_for_dtype(take_left.dtype) lvals = algos.take_nd(take_left, left_indexer, fill_value=lfill) @@ -1314,53 +1368,103 @@ def _maybe_add_join_keys( elif right_indexer is None: rvals = take_right else: - # TODO: can we pin down take_right's type earlier? taker = extract_array(take_right, extract_numpy=True) rfill = na_value_for_dtype(taker.dtype) rvals = algos.take_nd(taker, right_indexer, fill_value=rfill) - # if we have an all missing left_indexer - # make sure to just use the right values or vice-versa - if left_indexer is not None and (left_indexer == -1).all(): - key_col = Index(rvals) - result_dtype = rvals.dtype - elif right_indexer is not None and (right_indexer == -1).all(): - key_col = Index(lvals) - result_dtype = lvals.dtype - else: + # This block handles key coalescing (when coalesce_keys=True) + if self.coalesce_keys: + # Initialize the combined key column with left-side values. key_col = Index(lvals) + + # If left_indexer exists, apply the coalescing logic. + # Where left_indexer is -1 (meaning the row came + # from the right DataFrame + # and has no match on the left), use the corresponding + # right-side value. if left_indexer is not None: mask_left = left_indexer == -1 key_col = key_col.where(~mask_left, rvals) + + # Determine the common dtype for the combined key column. result_dtype = find_common_type([lvals.dtype, rvals.dtype]) + + # Workaround for datetime types that might incorrectly + # become 'Object' if ( lvals.dtype.kind == "M" and rvals.dtype.kind == "M" and result_dtype.kind == "O" ): - # TODO(non-nano) Workaround for common_type not dealing - # with different resolutions result_dtype = key_col.dtype - if result._is_label_reference(name): - result[name] = result._constructor_sliced( - key_col, dtype=result_dtype, index=result.index - ) - elif result._is_level_reference(name): - if isinstance(result.index, MultiIndex): - key_col.name = name - idx_list = [ - result.index.get_level_values(level_name) - if level_name != name - else key_col - for level_name in result.index.names - ] - - result.set_index(idx_list, inplace=True) - else: - result.index = Index(key_col, name=name) + # If 'name' (the join key, e.g., 'id') + # already exists as a column label + # in the 'result' DataFrame, we need to UPDATE that column. + # In a coalesced merge, the join key column is + # always expected to exist as a label. + if name in result.columns or result._is_label_reference(name): + result[name] = result._constructor_sliced( + key_col, dtype=result_dtype, index=result.index + ) + # Handle cases where the join key is part of a MultiIndex + elif result._is_level_reference(name): + if isinstance(result.index, MultiIndex): + key_col.name = name + idx_list = [ + ( + result.index.get_level_values(level_name) + if level_name != name + else key_col + ) + for level_name in result.index.names + ] + result.set_index(idx_list, inplace=True) + else: + result.index = Index(key_col, name=name) + + # This block handles coalesce_keys=False (separate key columns) else: - result.insert(i, name or f"key_{i}", key_col) + left_name = lname + # Generate the right column name, adding a suffix if names + # are identical + right_name = ( + f"{rname}{self.suffixes[1]}" + if rname != lname + else f"{lname}{self.suffixes[1]}" + ) + + # Cast to float if NaNs are present, to ensure numerical + # compatibility + if isna(lvals).any() or isna(rvals).any(): + lvals = np.array(lvals, dtype=float) + rvals = np.array(rvals, dtype=float) + + # Add the left join key column if it doesn't exist + if left_name not in result.columns: + if result._is_label_reference(left_name): + result[left_name] = result._constructor_sliced( + lvals, dtype=lvals.dtype, index=result.index + ) + else: + result[left_name] = lvals + + # Add the right join key column if it doesn't exist + if right_name not in result.columns: + if result._is_label_reference(right_name): + result[right_name] = result._constructor_sliced( + rvals, dtype=rvals.dtype, index=result.index + ) + else: + try: + # Insert right column next to the left column + # for better readability + insert_pos = result.columns.get_loc(left_name) + 1 + result.insert(insert_pos, right_name, rvals) + except KeyError: + # Fallback if left_name isn't found for + # insertion positioning + result[right_name] = rvals def _get_join_indexers( self, @@ -1368,9 +1472,54 @@ def _get_join_indexers( """return the join indexers""" # make mypy happy assert self.how != "asof" - return get_join_indexers( - self.left_join_keys, self.right_join_keys, sort=self.sort, how=self.how - ) + + # For non-coalescing outer joins, we need to handle None values + # specially + if not self.coalesce_keys and self.how == "outer": + # Get the original join indexers + left_indexer, right_indexer = get_join_indexers( + self.left_join_keys, self.right_join_keys, sort=self.sort, how=self.how + ) + + if left_indexer is not None and right_indexer is not None: + # Create a mask for rows where right side is None + right_none_mask = right_indexer == -1 + # Create a mask for rows where left side is None + left_none_mask = left_indexer == -1 + + # Create a new array for the reordered indices + n_rows = len(left_indexer) + new_order = np.zeros(n_rows, dtype=np.intp) + current_pos = 0 + + # First, add rows where right side has values but left is None + right_only_indices = np.where(left_none_mask & ~right_none_mask)[0] + new_order[current_pos : current_pos + len(right_only_indices)] = ( + right_only_indices + ) + current_pos += len(right_only_indices) + + # Then, add rows where left side has values but right is None + left_only_indices = np.where(~left_none_mask & right_none_mask)[0] + new_order[current_pos : current_pos + len(left_only_indices)] = ( + left_only_indices + ) + current_pos += len(left_only_indices) + + # Finally, add rows where both sides have values + both_indices = np.where(~left_none_mask & ~right_none_mask)[0] + new_order[current_pos : current_pos + len(both_indices)] = both_indices + + # Reorder the indexers + left_indexer = left_indexer[new_order] + right_indexer = right_indexer[new_order] + + return left_indexer, right_indexer + else: + # Original logic for other cases + return get_join_indexers( + self.left_join_keys, self.right_join_keys, sort=self.sort, how=self.how + ) @final def _get_join_info( @@ -1531,22 +1680,27 @@ def _get_merge_keys( list[Hashable], list[Hashable], list[Hashable], + list[bool], ]: """ Returns ------- - left_keys, right_keys, join_names, left_drop, right_drop + left_keys, right_keys, join_names, left_drop, right_drop, coalesce_flags """ left_keys: list[ArrayLike] = [] right_keys: list[ArrayLike] = [] join_names: list[Hashable] = [] right_drop: list[Hashable] = [] left_drop: list[Hashable] = [] + coalesce_flags: list[bool] = [] left, right = self.left, self.right - is_lkey = lambda x: isinstance(x, _known) and len(x) == len(left) - is_rkey = lambda x: isinstance(x, _known) and len(x) == len(right) + def is_lkey(x): + return isinstance(x, _known) and len(x) == len(left) + + def is_rkey(x): + return isinstance(x, _known) and len(x) == len(right) # Note that pd.merge_asof() has separate 'on' and 'by' parameters. A # user could, for example, request 'left_index' and 'left_by'. In a @@ -1591,21 +1745,22 @@ def _get_merge_keys( else: # work-around for merge_asof(right_index=True) right_keys.append(right.index._values) - if lk is not None and lk == rk: # FIXME: what about other NAs? + if ( + lk is not None and lk == rk and self.coalesce_keys + ): # Only drop if coalescing right_drop.append(rk) else: rk = cast(ArrayLike, rk) right_keys.append(rk) if lk is not None: - # Then we're either Hashable or a wrong-length arraylike, - # the latter of which will raise lk = cast(Hashable, lk) left_keys.append(left._get_label_or_level_values(lk)) - join_names.append(lk) - else: - # work-around for merge_asof(left_index=True) - left_keys.append(left.index._values) - join_names.append(left.index.name) + if self.coalesce_keys: + join_names.append(lk) + else: + join_names.append( + None + ) # indicamos que NÃO queremos coluna coalescida elif _any(self.left_on): for k in self.left_on: if is_lkey(k): @@ -1651,7 +1806,10 @@ def _get_merge_keys( else: left_keys = [self.left.index._values] - return left_keys, right_keys, join_names, left_drop, right_drop + # For each join key, decide whether to coalesce or keep separate + coalesce_flags = [self.coalesce_keys] * len(join_names) + + return left_keys, right_keys, join_names, left_drop, right_drop, coalesce_flags @final def _maybe_coerce_merge_keys(self) -> None: @@ -1725,7 +1883,8 @@ def _maybe_coerce_merge_keys(self) -> None: # check whether ints and floats if is_integer_dtype(rk.dtype) and is_float_dtype(lk.dtype): - # GH 47391 numpy > 1.24 will raise a RuntimeError for nan -> int + # GH 47391 numpy > 1.24 will raise a RuntimeError for nan + # -> int with np.errstate(invalid="ignore"): # error: Argument 1 to "astype" of "ndarray" has incompatible # type "Union[ExtensionDtype, Any, dtype[Any]]"; expected @@ -1745,7 +1904,8 @@ def _maybe_coerce_merge_keys(self) -> None: continue if is_float_dtype(rk.dtype) and is_integer_dtype(lk.dtype): - # GH 47391 numpy > 1.24 will raise a RuntimeError for nan -> int + # GH 47391 numpy > 1.24 will raise a RuntimeError for nan + # -> int with np.errstate(invalid="ignore"): # error: Argument 1 to "astype" of "ndarray" has incompatible # type "Union[ExtensionDtype, Any, dtype[Any]]"; expected @@ -2606,7 +2766,10 @@ def _get_multiindex_indexer( if sort: rcodes = list(map(np.take, rcodes, index.codes)) else: - i8copy = lambda a: a.astype("i8", subok=False) + + def i8copy(a): + return a.astype("i8", subok=False) + rcodes = list(map(i8copy, index.codes)) # fix right labels if there were any nulls @@ -2749,13 +2912,15 @@ def _factorize_keys( >>> pd.core.reshape.merge._factorize_keys(lk, rk, sort=False) (array([0, 1, 2]), array([0, 1]), 3) """ - # TODO: if either is a RangeIndex, we can likely factorize more efficiently? + # TODO: if either is a RangeIndex, we can likely factorize more + # efficiently? if ( isinstance(lk.dtype, DatetimeTZDtype) and isinstance(rk.dtype, DatetimeTZDtype) ) or (lib.is_np_dtype(lk.dtype, "M") and lib.is_np_dtype(rk.dtype, "M")): # Extract the ndarray (UTC-localized) values - # Note: we dont need the dtypes to match, as these can still be compared + # Note: we dont need the dtypes to match, as these can still be + # compared lk, rk = cast("DatetimeArray", lk)._ensure_matching_resos(rk) lk = cast("DatetimeArray", lk)._ndarray rk = cast("DatetimeArray", rk)._ndarray @@ -2785,7 +2950,8 @@ def _factorize_keys( lk = lk._pa_array # type: ignore[attr-defined] rk = rk._pa_array # type: ignore[union-attr] dc = ( - pa.chunked_array(lk.chunks + rk.chunks) # type: ignore[union-attr] + # type: ignore[union-attr] + pa.chunked_array(lk.chunks + rk.chunks) .combine_chunks() .dictionary_encode() ) @@ -2858,7 +3024,8 @@ def _factorize_keys( else: # Argument 1 to "factorize" of "ObjectFactorizer" has incompatible type # "Union[ndarray[Any, dtype[signedinteger[_64Bit]]], - # ndarray[Any, dtype[object_]]]"; expected "ndarray[Any, dtype[object_]]" + # ndarray[Any, dtype[object_]]]"; expected "ndarray[Any, + # dtype[object_]]" lk_data, rk_data = lk, rk # type: ignore[assignment] lk_mask, rk_mask = None, None diff --git a/pandas/tests/reshape/merge/example.py b/pandas/tests/reshape/merge/example.py new file mode 100644 index 0000000000000..eb28a0d737723 --- /dev/null +++ b/pandas/tests/reshape/merge/example.py @@ -0,0 +1,44 @@ +import pandas as pd +import numpy as np + +# Sample DataFrames +df1 = pd.DataFrame({ + "id": [1, 2, 3], + "value1": ["A", "B", "C"] +}) + +df2 = pd.DataFrame({ + "id": [2, 3, 4], + "value2": ["X", "Y", "Z"] +}) + +print("DataFrame 1:") +print(df1) + +print("\nDataFrame 2:") +print(df2) + +# Merge with coalesce_keys=True (default behavior: merge keys into one column) +merged_coalesced = pd.merge( + df1, df2, + on="id", + how="outer", + coalesce_keys=True, + suffixes=("", "_right"), +) + +# Merge with coalesce_keys=False (new behavior: keep left and right keys separately) +merged_separated = pd.merge( + df1, df2, + on="id", + how="outer", + coalesce_keys=False, + suffixes=("", "_right"), +) + +print("\nMerge result with coalesce_keys=True (merged 'id' column):") +print(merged_coalesced) + +print("\nMerge result with coalesce_keys=False (separate 'id' and 'id_right' columns):") +print(merged_separated) + diff --git a/pandas/tests/reshape/merge/test_merge_coalesce.py b/pandas/tests/reshape/merge/test_merge_coalesce.py new file mode 100644 index 0000000000000..c054c017f8b49 --- /dev/null +++ b/pandas/tests/reshape/merge/test_merge_coalesce.py @@ -0,0 +1,71 @@ +import numpy as np + +import pandas as pd +import pandas._testing as tm + + +def test_merge_with_and_without_coalesce_keys(): + # Create two example DataFrames to merge + df1 = pd.DataFrame({"id": [1, 2, 3], "value1": ["A", "B", "C"]}) + + df2 = pd.DataFrame({"id": [2, 3, 4], "value2": ["X", "Y", "Z"]}) + + # Merge with coalesce_keys=True (default behavior) + result_coalesced = pd.merge( + df1, + df2, + on="id", + how="outer", + coalesce_keys=True, + suffixes=("", "_right"), + ) + + # Expected result: keys from both sides are coalesced into a single 'id' column + expected_coalesced = pd.DataFrame( + { + "id": [1.0, 2.0, 3.0, 4.0], + "value1": ["A", "B", "C", np.nan], + "value2": [np.nan, "X", "Y", "Z"], + } + ) + + # Use assert_frame_equal to validate the result + tm.assert_frame_equal( + result_coalesced.sort_index(axis=1), + expected_coalesced.sort_index(axis=1), + check_dtype=False, + ) + + # Merge with coalesce_keys=False (new functionality) + result_separated = pd.merge( + df1, + df2, + on="id", + how="outer", + coalesce_keys=False, + suffixes=("", "_right"), + ) + + # Expected result: original left 'id' and right 'id' (renamed to 'id_right') + # are preserved + expected_separated = pd.DataFrame( + { + "id": [1.0, 2.0, 3.0, np.nan], + "value1": ["A", "B", "C", np.nan], + "id_right": [np.nan, 2.0, 3.0, 4.0], + "value2": [np.nan, "X", "Y", "Z"], + } + ) + + # Check that no columns have value None (should be np.nan) + assert None not in result_separated.columns + + # Check shape of the result + assert result_separated.shape == (4, 4) + + # Validate content + tm.assert_frame_equal( + result_separated.sort_index(axis=1), + expected_separated.sort_index(axis=1), + check_dtype=False, + )