From 8537719b0fc0e00469e36330ca7f3279cc05f3bb Mon Sep 17 00:00:00 2001 From: "Egor.Kraev" Date: Mon, 18 Nov 2024 13:26:30 +0000 Subject: [PATCH] Parallelize tree solver dimension search --- wise_pizza/explain.py | 1 + wise_pizza/slicer.py | 5 +++-- wise_pizza/solve/partition.py | 11 ++++++----- wise_pizza/solve/tree.py | 26 ++++++++++++++++++++++---- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/wise_pizza/explain.py b/wise_pizza/explain.py index 6b35f48..f6c07b5 100644 --- a/wise_pizza/explain.py +++ b/wise_pizza/explain.py @@ -482,6 +482,7 @@ def explain_timeseries( solver=solver, verbose=verbose, groupby_dims=groupby_dims, + cluster_values=False, ) # TODO: insert back the normalized bits? diff --git a/wise_pizza/slicer.py b/wise_pizza/slicer.py index 75e9ae2..a916a61 100644 --- a/wise_pizza/slicer.py +++ b/wise_pizza/slicer.py @@ -27,7 +27,7 @@ def _summary(obj) -> str: { k: v for k, v in s.items() - if k in ["segment", "total", "seg_size", "naive_avg"] + if k in ["segment", "total", "seg_size", "naive_avg", "impact"] } for s in obj.segments ], @@ -420,7 +420,8 @@ def relevant_cluster_names(self): relevant_clusters = {} for s in self.segments: for c in s["segment"].values(): - if c in self.cluster_names: + if c in self.cluster_names and ";" not in c: + # Then cluster names containing ; are snumerations, don't need explanation relevant_clusters[c] = self.cluster_names[c].replace("@@", ", ") return relevant_clusters diff --git a/wise_pizza/solve/partition.py b/wise_pizza/solve/partition.py index dca2f35..ab820fe 100644 --- a/wise_pizza/solve/partition.py +++ b/wise_pizza/solve/partition.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd + from .weighted_quantiles import weighted_quantiles @@ -73,8 +74,10 @@ def kmeans_partition(df: pd.DataFrame, dim: str, groupby_dims: List[str]): vector_dict[c] = (weights.loc[c], joint_mat[:, i]) cluster1, cluster2 = weighted_kmeans_two_clusters(vector_dict) - - return [(cluster1, cluster2)] + if cluster1 is None: + return [] + else: + return [(cluster1, cluster2)] def weighted_kmeans_two_clusters(data_dict, tol=1e-4, max_iter=100, max_retries=10): @@ -124,9 +127,7 @@ def weighted_kmeans_two_clusters(data_dict, tol=1e-4, max_iter=100, max_retries= centroids = new_centroids - raise ValueError( - "Failed to find a valid clustering with non-empty clusters after maximum retries." - ) + return None, None def fill_gaps(x: np.ndarray, num_iter=50): diff --git a/wise_pizza/solve/tree.py b/wise_pizza/solve/tree.py index 8fec440..6292a17 100644 --- a/wise_pizza/solve/tree.py +++ b/wise_pizza/solve/tree.py @@ -4,6 +4,7 @@ import numpy as np import pandas as pd from scipy.sparse import csc_matrix +from joblib import Parallel, delayed from .fitter import AverageFitter, Fitter, TimeFitterModel, TimeFitter @@ -17,6 +18,7 @@ def tree_solver( fitter: Fitter, max_depth: Optional[int] = None, num_leaves: Optional[int] = None, + parallel_processes: int = 10, ): """ Partition the data into segments using a greedy binary tree approach @@ -38,6 +40,7 @@ def tree_solver( dims=dims, time_col=None if isinstance(fitter, AverageFitter) else "__time", max_depth=max_depth, + parallel_processes=parallel_processes, ) build_tree(root=root, num_leaves=num_leaves, max_depth=max_depth) @@ -85,6 +88,7 @@ def __init__( time_col: str = None, max_depth: Optional[int] = None, dim_split: Optional[Dict[str, List]] = None, + parallel_processes: int = 10, ): self.df = df.copy().sort_values(dims + fitter.groupby_dims) self.fitter = fitter @@ -98,6 +102,7 @@ def __init__( self.model = None # For dimension splitting candidates, hardwired for now self.num_bins = 10 + self.parallel_processes = parallel_processes @property def depth(self): @@ -134,9 +139,9 @@ def error_improvement(self): else: iter_dims = self.dims - for dim in iter_dims: + def error_improvement_for_dim(dim): if len(self.df[dim].unique()) == 1: - continue + return float("inf"), (None, None) elif len(self.df[dim].unique()) == 2: vals = self.df[dim].unique() @@ -150,7 +155,11 @@ def error_improvement(self): partitions = kmeans_partition( self.df, dim, self.fitter.groupby_dims ) + if len(partitions) == 0: + return float("inf"), (None, None) + best_error = float("inf") + candidates = (None, None) for dim_values1, dim_values2 in partitions: left = self.df[self.df[dim].isin(dim_values1)] right = self.df[self.df[dim].isin(dim_values2)] @@ -174,8 +183,17 @@ def error_improvement(self): err = left_candidate.error + right_candidate.error if err < best_error: best_error = err - self._error_improvement = self.error - best_error - self._best_submodels = (left_candidate, right_candidate) + candidates = (left_candidate, right_candidate) + return best_error, candidates + + results = Parallel(n_jobs=self.parallel_processes)( + delayed(error_improvement_for_dim)(i) for i in iter_dims + ) + for err, candidates in results: + if err < best_error: + best_error = err + self._best_submodels = candidates + self._error_improvement = self.error - best_error return self._error_improvement