Skip to content

Commit

Permalink
Parallelize tree solver dimension search
Browse files Browse the repository at this point in the history
  • Loading branch information
EgorKraevTransferwise committed Nov 18, 2024
1 parent 0900f73 commit 8537719
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 11 deletions.
1 change: 1 addition & 0 deletions wise_pizza/explain.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ def explain_timeseries(
solver=solver,
verbose=verbose,
groupby_dims=groupby_dims,
cluster_values=False,
)

# TODO: insert back the normalized bits?
Expand Down
5 changes: 3 additions & 2 deletions wise_pizza/slicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def _summary(obj) -> str:
{
k: v
for k, v in s.items()
if k in ["segment", "total", "seg_size", "naive_avg"]
if k in ["segment", "total", "seg_size", "naive_avg", "impact"]
}
for s in obj.segments
],
Expand Down Expand Up @@ -420,7 +420,8 @@ def relevant_cluster_names(self):
relevant_clusters = {}
for s in self.segments:
for c in s["segment"].values():
if c in self.cluster_names:
if c in self.cluster_names and ";" not in c:
# Then cluster names containing ; are snumerations, don't need explanation
relevant_clusters[c] = self.cluster_names[c].replace("@@", ", ")
return relevant_clusters

Expand Down
11 changes: 6 additions & 5 deletions wise_pizza/solve/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pandas as pd


from .weighted_quantiles import weighted_quantiles


Expand Down Expand Up @@ -73,8 +74,10 @@ def kmeans_partition(df: pd.DataFrame, dim: str, groupby_dims: List[str]):
vector_dict[c] = (weights.loc[c], joint_mat[:, i])

cluster1, cluster2 = weighted_kmeans_two_clusters(vector_dict)

return [(cluster1, cluster2)]
if cluster1 is None:
return []
else:
return [(cluster1, cluster2)]


def weighted_kmeans_two_clusters(data_dict, tol=1e-4, max_iter=100, max_retries=10):
Expand Down Expand Up @@ -124,9 +127,7 @@ def weighted_kmeans_two_clusters(data_dict, tol=1e-4, max_iter=100, max_retries=

centroids = new_centroids

raise ValueError(
"Failed to find a valid clustering with non-empty clusters after maximum retries."
)
return None, None


def fill_gaps(x: np.ndarray, num_iter=50):
Expand Down
26 changes: 22 additions & 4 deletions wise_pizza/solve/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
import pandas as pd
from scipy.sparse import csc_matrix
from joblib import Parallel, delayed


from .fitter import AverageFitter, Fitter, TimeFitterModel, TimeFitter
Expand All @@ -17,6 +18,7 @@ def tree_solver(
fitter: Fitter,
max_depth: Optional[int] = None,
num_leaves: Optional[int] = None,
parallel_processes: int = 10,
):
"""
Partition the data into segments using a greedy binary tree approach
Expand All @@ -38,6 +40,7 @@ def tree_solver(
dims=dims,
time_col=None if isinstance(fitter, AverageFitter) else "__time",
max_depth=max_depth,
parallel_processes=parallel_processes,
)

build_tree(root=root, num_leaves=num_leaves, max_depth=max_depth)
Expand Down Expand Up @@ -85,6 +88,7 @@ def __init__(
time_col: str = None,
max_depth: Optional[int] = None,
dim_split: Optional[Dict[str, List]] = None,
parallel_processes: int = 10,
):
self.df = df.copy().sort_values(dims + fitter.groupby_dims)
self.fitter = fitter
Expand All @@ -98,6 +102,7 @@ def __init__(
self.model = None
# For dimension splitting candidates, hardwired for now
self.num_bins = 10
self.parallel_processes = parallel_processes

@property
def depth(self):
Expand Down Expand Up @@ -134,9 +139,9 @@ def error_improvement(self):
else:
iter_dims = self.dims

for dim in iter_dims:
def error_improvement_for_dim(dim):
if len(self.df[dim].unique()) == 1:
continue
return float("inf"), (None, None)

elif len(self.df[dim].unique()) == 2:
vals = self.df[dim].unique()
Expand All @@ -150,7 +155,11 @@ def error_improvement(self):
partitions = kmeans_partition(
self.df, dim, self.fitter.groupby_dims
)
if len(partitions) == 0:
return float("inf"), (None, None)

best_error = float("inf")
candidates = (None, None)
for dim_values1, dim_values2 in partitions:
left = self.df[self.df[dim].isin(dim_values1)]
right = self.df[self.df[dim].isin(dim_values2)]
Expand All @@ -174,8 +183,17 @@ def error_improvement(self):
err = left_candidate.error + right_candidate.error
if err < best_error:
best_error = err
self._error_improvement = self.error - best_error
self._best_submodels = (left_candidate, right_candidate)
candidates = (left_candidate, right_candidate)
return best_error, candidates

results = Parallel(n_jobs=self.parallel_processes)(
delayed(error_improvement_for_dim)(i) for i in iter_dims
)
for err, candidates in results:
if err < best_error:
best_error = err
self._best_submodels = candidates
self._error_improvement = self.error - best_error

return self._error_improvement

Expand Down

0 comments on commit 8537719

Please sign in to comment.