Skip to content

Commit

Permalink
Add clustering-based tree partitioning and minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
EgorKraevTransferwise committed Nov 14, 2024
1 parent 8c684ed commit 92ab151
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 77 deletions.
228 changes: 195 additions & 33 deletions notebooks/Finding interesting segments in time series.ipynb

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions tests/timeseries_wip_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
sf = explain_timeseries(
df=df,
dims=dims,
min_segments=5,
min_depth=1,
max_segments=7,
max_depth=2,
total_name=totals,
size_name=size,
Expand All @@ -41,6 +40,6 @@
solver="tree",
fit_sizes=True,
)
sf.plot(plot_is_static=False, height=1000, width=1000, average_name="VPC")
sf.plot(plot_is_static=False, height=1500, width=1000, average_name="VPC")
print(sf.summary())
print("yay!")
11 changes: 9 additions & 2 deletions wise_pizza/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,15 @@ def nice_cluster_names(x: List[Dict[str, List[str]]]) -> Tuple[List[Dict], Dict]
for dim, clusters in cluster_strings.items():
reverse_cluster_names[dim] = {}
for i, c in enumerate(clusters):
cluster_names[f"{dim}_cluster_{i + 1}"] = c
reverse_cluster_names[dim][c] = f"{dim}_cluster_{i + 1}"
ugly_name = f"{dim}_cluster_{i + 1}"
nice_name = c.replace("@@", ";")
if len(nice_name) < 1.2 * len(ugly_name):
name = nice_name
else:
name = ugly_name

cluster_names[name] = c
reverse_cluster_names[dim][c] = name

col_defs = []
for xx in x:
Expand Down
11 changes: 7 additions & 4 deletions wise_pizza/plotting_time_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,20 @@ def preprocess_for_ts_plot(

time_df = this_df.groupby("time", as_index=False).sum()

segment_name = (
str(s["segment"]).replace(",", "<br>").replace(";", ",").replace("'", "")
)
data1 = PlotData(
regression=time_df["pred_totals"] / time_df["weights"],
bars=time_df["totals"] / time_df["weights"],
subtitle=f"{average_name} for <br> {s['segment']}",
subtitle=f"{average_name} for <br> {segment_name}",
)

if sf.weight_total_prediction is None:
data2 = PlotData(
regression=time_df["pred_totals"],
bars=time_df["totals"],
subtitle=f"{sf.total_name} for <br> {s['segment']}",
subtitle=f"{sf.total_name} for <br> {segment_name}",
)
out.append([data1, data2])
else:
Expand All @@ -109,12 +112,12 @@ def preprocess_for_ts_plot(
* time_df["pred_totals"]
/ time_df["weights"],
bars=time_df["totals"],
subtitle=f"{sf.total_name} for <br> {s['segment']}",
subtitle=f"{sf.total_name} for <br> {segment_name}",
)
data3 = PlotData(
regression=time_df["w_pred_totals"],
bars=time_df["weights"],
subtitle=f"{sf.size_name} for <br> {s['segment']}",
subtitle=f"{sf.size_name} for <br> {segment_name}",
)
out.append([data3, data1, data2])

Expand Down
13 changes: 9 additions & 4 deletions wise_pizza/slicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@
def _summary(obj) -> str:
out = {
"task": obj.task,
"segments": {
k: v
"segments": [
{
k: v
for k, v in s.items()
if k in ["segment", "total", "seg_size", "naive_avg"]
}
for s in obj.segments
for k, v in s.items()
if k in ["segment", "total", "seg_size", "naive_avg"]
],
"relevant_clusters": {
k: v for k, v in obj.relevant_cluster_names.items() if "_cluster_" in k
},
}
return json.dumps(out)
Expand Down
168 changes: 168 additions & 0 deletions wise_pizza/solve/partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
from typing import List

import numpy as np
import pandas as pd

from .weighted_quantiles import weighted_quantiles


def target_encode(df: pd.DataFrame, dim: str) -> dict:
df = df[[dim, "totals", "weights"]]
agg = df.groupby(dim, as_index=False).sum()
agg["__avg"] = agg["totals"] / agg["weights"]
agg["__avg"] = agg["__avg"].fillna(agg["__avg"].mean())
enc_map = {k: v for k, v in zip(agg[dim], agg["__avg"])}

if np.isnan(np.array(list(enc_map.values()))).any():
raise ValueError("NaNs in encoded values")
return enc_map


def target_encoding_partitions(df: pd.DataFrame, dim: str, num_bins: int):
enc_map = target_encode(df, dim)
df[dim + "_encoded"] = df[dim].apply(lambda x: enc_map[x])
if np.any(np.isnan(df[dim + "_encoded"])): # pragma: no cover
raise ValueError("NaNs in encoded values")
# Get split candidates for brute force search
deciles = np.array([q / num_bins for q in range(1, num_bins)])

splits = weighted_quantiles(df[dim + "_encoded"], deciles, df["weights"])

partitions = []
for split in np.unique(splits):
left = df[df[dim + "_encoded"] < split]
right = df[df[dim + "_encoded"] >= split]
if len(left) == 0 or len(right) == 0:
continue
dim_values1 = [k for k, v in enc_map.items() if v < split]
dim_values2 = [k for k, v in enc_map.items() if v >= split]
partitions.append((dim_values1, dim_values2))

return partitions


def kmeans_partition(df: pd.DataFrame, dim: str, groupby_dims: List[str]):
assert len(df[dim].unique()) >= 3
# Get split candidates
agg_df = df.groupby([dim] + groupby_dims, as_index=False).sum()
agg_df["__avg"] = agg_df["totals"] / agg_df["weights"]
pivot_df = agg_df.pivot(
index=groupby_dims, columns=dim, values="__avg"
).reset_index()
nice_mats = {}
for chunk in ["Average", "Weights"]:
this_df = pivot_df[pivot_df["chunk"] == chunk]
value_cols = [c for c in this_df.columns if c not in groupby_dims]
nice_values = fill_gaps(this_df[value_cols].values)
if chunk == "Weights":
nice_values = (
np.mean(nice_mats["Average"])
* nice_values
/ np.sum(nice_values, axis=0, keepdims=True)
)
nice_mats[chunk] = nice_values
joint_mat = np.concatenate([nice_mats["Average"], nice_mats["Weights"]], axis=0)
weights = pivot_df[value_cols].T.sum(axis=1)
vector_dict = {}
for i, c in enumerate(value_cols):
vector_dict[c] = (weights.loc[c], joint_mat[:, i])

cluster1, cluster2 = weighted_kmeans_two_clusters(vector_dict)

return [(cluster1, cluster2)]


def weighted_kmeans_two_clusters(data_dict, tol=1e-4, max_iter=100, max_retries=10):
keys = list(data_dict.keys())
weights = np.array([data_dict[key][0] for key in keys])
data = np.array([data_dict[key][1] for key in keys])

rng = np.random.default_rng()

for retry in range(max_retries):
# Initialize centroids by randomly choosing two data points
centroids = data[rng.choice(len(data), size=2, replace=False)]

for iteration in range(max_iter):
# Compute weighted distances to each centroid
distances = np.array(
[np.linalg.norm(data - centroid, axis=1) for centroid in centroids]
)

# Assign points to the closest centroid
labels = np.argmin(distances, axis=0)

# Check if any cluster is empty
if not np.any(labels == 0) or not np.any(labels == 1):
# If a cluster is empty, reinitialize centroids and restart
print(
f"Empty cluster detected on retry {retry + 1}, reinitializing centroids."
)
break

# Update centroids with weighted averages
new_centroids = np.array(
[
np.average(data[labels == i], axis=0, weights=weights[labels == i])
for i in range(2)
]
)

# Check for convergence
if np.linalg.norm(new_centroids - centroids) < tol:
# Successful clustering with no empty clusters
centroids = new_centroids
return (
[keys[i] for i in range(len(keys)) if labels[i] == 0],
[keys[i] for i in range(len(keys)) if labels[i] == 1],
)

centroids = new_centroids

raise ValueError(
"Failed to find a valid clustering with non-empty clusters after maximum retries."
)


def fill_gaps(x: np.ndarray, num_iter=50):
nans = np.isnan(x)
# calculate the marginal, fill the gaps, use that to interpolate individual columns

est = x
for _ in range(num_iter):
marg = np.nanmean(est, axis=1)
nice_marg = interpolate_and_extrapolate(marg)
tile_marg = np.tile(nice_marg, (x.shape[1], 1)).T
tile_marg[nans] = np.nan
reg = np.nanmedian(x) * 1e-6
coeffs = (np.nansum(x * tile_marg, axis=0) + reg) / (
np.nansum(tile_marg * tile_marg, axis=0) + reg
)
interp = coeffs[None, :] * nice_marg[:, None]
est[nans] = interp[nans]
return x


def interpolate_and_extrapolate(arr: np.ndarray) -> np.ndarray:
# Check if input is a numpy array
if not isinstance(arr, np.ndarray):
raise TypeError("Input must be a numpy ndarray.")

# Find indices of valid (non-NaN) and NaN values
nans = np.isnan(arr)
not_nans = ~nans

# If there are no NaNs, return the array as is
if not nans.any():
return arr

# Perform linear interpolation for NaNs within valid values
arr[nans] = np.interp(np.flatnonzero(nans), np.flatnonzero(not_nans), arr[not_nans])

# Perform constant extrapolation for edges
if nans[0]: # If the first values are NaNs, fill with the first non-NaN value
arr[: np.flatnonzero(not_nans)[0]] = arr[not_nans][0]
if nans[-1]: # If the last values are NaNs, fill with the last non-NaN value
arr[np.flatnonzero(not_nans)[-1] + 1 :] = arr[not_nans][-1]

return arr
50 changes: 19 additions & 31 deletions wise_pizza/solve/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import pandas as pd
from scipy.sparse import csc_matrix

from .weighted_quantiles import weighted_quantiles

from .fitter import AverageFitter, Fitter, TimeFitterModel, TimeFitter
from .partition import target_encoding_partitions, kmeans_partition
from wise_pizza.cluster import nice_cluster_names


Expand Down Expand Up @@ -75,18 +76,6 @@ def error(x: np.ndarray, y: np.ndarray) -> float:
return np.sum((x - y) ** 2)


def target_encode(df: pd.DataFrame, dim: str) -> dict:
df = df[[dim, "totals", "weights"]]
agg = df.groupby(dim, as_index=False).sum()
agg["__avg"] = agg["totals"] / agg["weights"]
agg["__avg"] = agg["__avg"].fillna(agg["__avg"].mean())
enc_map = {k: v for k, v in zip(agg[dim], agg["__avg"])}

if np.isnan(np.array(list(enc_map.values()))).any():
raise ValueError("NaNs in encoded values")
return enc_map


class ModelNode:
def __init__(
self,
Expand Down Expand Up @@ -148,24 +137,23 @@ def error_improvement(self):
for dim in iter_dims:
if len(self.df[dim].unique()) == 1:
continue
enc_map = target_encode(self.df, dim)
self.df[dim + "_encoded"] = self.df[dim].apply(lambda x: enc_map[x])
if np.any(np.isnan(self.df[dim + "_encoded"])): # pragma: no cover
raise ValueError("NaNs in encoded values")
# Get split candidates for brute force search
deciles = np.array([q / self.num_bins for q in range(1, self.num_bins)])

splits = weighted_quantiles(
self.df[dim + "_encoded"], deciles, self.df["weights"]
)

for split in np.unique(splits):
left = self.df[self.df[dim + "_encoded"] < split]
right = self.df[self.df[dim + "_encoded"] >= split]
if len(left) == 0 or len(right) == 0:
continue
dim_values1 = [k for k, v in enc_map.items() if v < split]
dim_values2 = [k for k, v in enc_map.items() if v >= split]

elif len(self.df[dim].unique()) == 2:
vals = self.df[dim].unique()
partitions = [([vals[0]], [vals[1]])]
else:
if isinstance(self.fitter, AverageFitter):
partitions = target_encoding_partitions(
self.df, dim, self.num_bins
)
else:
partitions = kmeans_partition(
self.df, dim, self.fitter.groupby_dims
)

for dim_values1, dim_values2 in partitions:
left = self.df[self.df[dim].isin(dim_values1)]
right = self.df[self.df[dim].isin(dim_values2)]
left_candidate = ModelNode(
df=left,
fitter=self.fitter,
Expand Down

0 comments on commit 92ab151

Please sign in to comment.