diff --git a/notebooks/Finding interesting segments in time series.ipynb b/notebooks/Finding interesting segments in time series.ipynb index 7838f7a..f717f1f 100644 --- a/notebooks/Finding interesting segments in time series.ipynb +++ b/notebooks/Finding interesting segments in time series.ipynb @@ -42,20 +42,10 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 10, "id": "961bc9d1", "metadata": {}, "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "C:\\Users\\EgorKraev\\AppData\\Local\\Temp\\ipykernel_29848\\3308931027.py:2: DeprecationWarning:\n", - "\n", - "Importing display from IPython.core.display is deprecated since IPython 7.14, please import from IPython.display\n", - "\n" - ] - }, { "data": { "text/html": [ @@ -71,7 +61,8 @@ ], "source": [ "# this makes the notebook expand to full width of the browser window\n", - "from IPython.core.display import display, HTML\n", + "from IPython.core.display import HTML\n", + "from IPython.display import display\n", "display(HTML(\"\"))" ] }, @@ -150,28 +141,26 @@ "source": [ "# Finding the juiciest slices\n", "\n", - "**explain_timeseries**: Find the most unusual segments in the timeseries\n", + "The most important choice you have to make here is whether you just want to look at time series behavior for the averages, or also to that of the weights - this is controlled by the `fit_sizes` parameter. `max_depth` works as usual, controlling the maximal number of dimensions any segment can constrain.\n", + "\n", + "**explain_timeseries**: \n", + "\n", + "This function divides a time series panel dataset into segments that are as distinct as possible.\n", + "\n", + "Parameters:\n", + "\n", + "- **df**: A pandas DataFrame with the time series data.\n", + "- **dims**: Discrete dimensions to segment by.\n", + "- **total_name**: Name of the column containing totals.\n", + "- **time_name**: Name of the column containing the time values.\n", + "- **num_segments**: Number of segments to find.\n", + "- **size_name** (Optional): Name of the column containing the size of the segment.\n", + "- **max_depth** (Optional, defaults to 2): Maximum number of dimensions to constrain per segment.\n", + "- **fit_sizes** (Optional): Whether to fit the sizes of the segments or just the averages.\n", + "- **n_jobs** (Optional, defaults to 10): Number of jobs to run in parallel when finding segments.\n", + "- **num_breaks** (Optional, defaults to 3): Number of breaks in the stylized time series used for comparing segments.\n", + "\n", "\n", - "- `df`: Dataset\n", - "- `dims`: List of discrete dimensions\n", - "- `total_name`: Name of column that contains totals per segment\n", - "- `size_name`: Name of column containing segment sizes\n", - "- `min_segments`: Minimum number of segments to find\n", - "- `max_segments`: Maximum number of segments to find, defaults to min_segments\n", - "- `min_depth`: Minimum number of dimension to constrain in segment definition\n", - "- `max_depth`: Maximum number of dimension to constrain in segment definition\n", - "- `solver`: If this equals to \"lp\" uses the LP solver, else uses the (recommended) Lasso solver\n", - " - `\"lasso\"`: Lasso-based finder of unusual segments\n", - " - `\"lp\"`: LP-based finder of unusual segments\n", - "- `cluster_values`: In addition to single-value slices, consider slices that consist of a\n", - " group of segments from the same dimension with similar naive averages\n", - " - `True`: to use cluster values, you can them using `sf.relevant_cluster_names`\n", - " - `False`: to use simple segments\n", - "- `verbose`: If set to a truish value, lots of debug info is printed to console, also you can check progressbar\n", - " - `True`: to get info\n", - " - `False`: to get result without info\n", - " \n", - " \n", "- Use `.plot()` to see the plot after fitting:\n", " - `plot_is_static`: static (True) or dynamic (False) plotly result\n", " - `True`: to get static plots (Doesn't work on all platforms yet)\n", @@ -188,48 +177,6 @@ "scrolled": false }, "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "yay!\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "C:\\Users\\EgorKraev\\Documents\\Code\\wise-pizza\\wise_pizza\\slicer.py:213: UserWarning:\n", - "\n", - "Ignoring cluster_values argument as tree solver makes its own clusters\n", - "\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Adding node 1...\n", - "Done!\n", - "Adding node 2...\n", - "Done!\n", - "Adding node 3...\n", - "Done!\n", - "Adding node 4...\n", - "Done!\n", - "Adding node 5...\n", - "Done!\n", - "Adding node 6...\n", - "Done!\n", - "0 {'segment': {'SOURCE_CURRENCY': 'SOURCE_CURRENCY_cluster_2', 'PRODUCT': 'Credit;Spend'}, 'index': 1, 'orig_i': 1, 'total': 8215853399.18479, 'seg_size': 28914190.0, 'naive_avg': 284.14606804426444, 'dummy': array([1, 1, 1, ..., 0, 0, 0])}\n", - "1 {'segment': {'SOURCE_CURRENCY': 'SOURCE_CURRENCY_cluster_4'}, 'index': 5, 'orig_i': 5, 'total': 5407855256.179745, 'seg_size': 39528930.0, 'naive_avg': 136.807529477265, 'dummy': array([0, 0, 0, ..., 0, 0, 0])}\n", - "2 {'segment': {'SOURCE_CURRENCY': 'PGK;SHP', 'PRODUCT': 'Credit;Spend'}, 'index': 4, 'orig_i': 4, 'total': 1496636816.867681, 'seg_size': 4120685.0, 'naive_avg': 363.20097674723525, 'dummy': array([0, 0, 0, ..., 0, 0, 0])}\n", - "3 {'segment': {'SOURCE_CURRENCY': 'CZK', 'PRODUCT': 'Spend'}, 'index': 3, 'orig_i': 3, 'total': 1253650889.0440717, 'seg_size': 2018425.0, 'naive_avg': 621.1035282678681, 'dummy': array([0, 0, 0, ..., 0, 0, 0])}\n", - "4 {'segment': {'SOURCE_CURRENCY': 'SOURCE_CURRENCY_cluster_3'}, 'index': 6, 'orig_i': 6, 'total': 930638724.9999521, 'seg_size': 12992130.0, 'naive_avg': 71.63095851103338, 'dummy': array([0, 0, 0, ..., 1, 1, 1])}\n", - "5 {'segment': {'SOURCE_CURRENCY': 'CZK', 'PRODUCT': 'Credit'}, 'index': 2, 'orig_i': 2, 'total': 801995917.6628782, 'seg_size': 1962900.0, 'naive_avg': 408.577063356706, 'dummy': array([0, 0, 0, ..., 0, 0, 0])}\n", - "6 {'segment': {'SOURCE_CURRENCY': 'SOURCE_CURRENCY_cluster_5', 'PRODUCT': 'Transfer'}, 'index': 0, 'orig_i': 0, 'total': 168988001.49668851, 'seg_size': 7289425.0, 'naive_avg': 23.18262434920292, 'dummy': array([0, 0, 0, ..., 0, 0, 0])}\n" - ] - }, { "data": { "text/html": [ @@ -262,9 +209,9 @@ { "data": { "text/html": [ - "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, { "name": "stdout", "output_type": "stream", "text": [ - "{'relevant_clusters': {'SOURCE_CURRENCY_cluster_2': 'AFN, GBP, KRW, MVR, NZD, '\n", - " 'PHP, SEK, UYU',\n", - " 'SOURCE_CURRENCY_cluster_3': 'AED, ALL, AMD, ANG, AOA, '\n", - " 'ARS, AUD, AWG, AZN, BAM, '\n", - " 'BBD, BDT, BIF, BMD, BND, '\n", - " 'BOB, BSD, BTN, BYN, CAD, '\n", - " 'CLP, CRC, CUP, DKK, DOP, '\n", - " 'DZD, EGP, ETB, FJD, FKP, '\n", - " 'GEL, GHS, GIP, GMD, GNF, '\n", - " 'GTQ, GYD, HNL, HRK, HTG, '\n", - " 'HUF, IDR, ILS, INR, IQD, '\n", - " 'ISK, JMD, JOD, KES, KGS, '\n", - " 'KHR, KWD, KYD, LAK, MAD, '\n", - " 'MGA, MKD, MMK, MNT, MOP, '\n", - " 'MRU, MWK, MYR, MZN, NAD, '\n", - " 'NIO, OMR, PAB, PEN, PKR, '\n", - " 'PYG, QAR, RON, RSD, RUB, '\n", - " 'RWF, SAR, SBD, SCR, SDG, '\n", - " 'SLE, SLL, SRD, STN, SYP, '\n", - " 'SZL, THB, TJS, TOP, TRY, '\n", - " 'TTD, TWD, TZS, UAH, UZS, '\n", - " 'VES, VUV, WST, XCD, XOF, '\n", - " 'XPF, YER, ZAR, ZMW, ZWL',\n", - " 'SOURCE_CURRENCY_cluster_4': 'BGN, BHD, BRL, BWP, CDF, '\n", - " 'CHF, CNY, COP, CVE, EUR, '\n", - " 'HKD, KMF, LSL, LYD, MDL, '\n", - " 'MUR, MXN, NOK, NPR, PLN, '\n", - " 'SGD, TMT, TND, UGX, USD, '\n", - " 'VND, XAF',\n", - " 'SOURCE_CURRENCY_cluster_5': 'AFN, CZK, GBP, KRW, MVR, '\n", - " 'NZD, PGK, PHP, SEK, SHP, '\n", - " 'UYU'},\n", - " 'segments': [{'naive_avg': 284.14606804426444,\n", - " 'seg_size': 28914190.0,\n", - " 'segment': {'PRODUCT': 'Credit;Spend',\n", - " 'SOURCE_CURRENCY': 'SOURCE_CURRENCY_cluster_2'},\n", - " 'total': 8215853399.18479},\n", - " {'naive_avg': 136.807529477265,\n", - " 'seg_size': 39528930.0,\n", - " 'segment': {'SOURCE_CURRENCY': 'SOURCE_CURRENCY_cluster_4'},\n", - " 'total': 5407855256.179745},\n", - " {'naive_avg': 363.20097674723525,\n", - " 'seg_size': 4120685.0,\n", - " 'segment': {'PRODUCT': 'Credit;Spend',\n", - " 'SOURCE_CURRENCY': 'PGK;SHP'},\n", - " 'total': 1496636816.867681},\n", - " {'naive_avg': 621.1035282678681,\n", - " 'seg_size': 2018425.0,\n", - " 'segment': {'PRODUCT': 'Spend', 'SOURCE_CURRENCY': 'CZK'},\n", - " 'total': 1253650889.0440717},\n", - " {'naive_avg': 71.63095851103338,\n", - " 'seg_size': 12992130.0,\n", - " 'segment': {'SOURCE_CURRENCY': 'SOURCE_CURRENCY_cluster_3'},\n", - " 'total': 930638724.9999521},\n", - " {'naive_avg': 408.577063356706,\n", - " 'seg_size': 1962900.0,\n", - " 'segment': {'PRODUCT': 'Credit', 'SOURCE_CURRENCY': 'CZK'},\n", - " 'total': 801995917.6628782},\n", - " {'naive_avg': 23.18262434920292,\n", - " 'seg_size': 7289425.0,\n", - " 'segment': {'PRODUCT': 'Transfer',\n", - " 'SOURCE_CURRENCY': 'SOURCE_CURRENCY_cluster_5'},\n", - " 'total': 168988001.49668851}],\n", - " 'task': 'time'}\n" + "{'TARGET_CURRENCY_cluster_3': 'AED, ARS, AUD, BDT, BWP, CAD, CHF, CLP, CNY, '\n", + " 'CRC, CZK, DKK, EUR, GBP, GEL, GHS, HKD, HRK, '\n", + " 'HUF, IDR, ILS, INR, JPY, KES, KRW, LKR, MXN, '\n", + " 'MYR, NGN, NOK, NPR, NZD, PEN, PKR, PLN, RUB, '\n", + " 'SEK, THB, TRY, TZS, UAH, UGX, UYU, XOF, ZAR',\n", + " 'TARGET_CURRENCY_cluster_5': 'AED, ARS, AUD, BDT, BGN, BWP, CAD, CHF, CLP, '\n", + " 'CNY, COP, CRC, CZK, DKK, EUR, GBP, GEL, GHS, '\n", + " 'HKD, HRK, HUF, IDR, ILS, INR, JPY, KES, KRW, '\n", + " 'LKR, MXN, MYR, NGN, NOK, NPR, NZD, PEN, PHP, '\n", + " 'PKR, PLN, RON, RUB, SEK, SGD, THB, TRY, TZS, '\n", + " 'UAH, UGX, USD, UYU, VND, XOF, ZAR, ZMW'}\n" ] } ], "source": [ - "import json, pprint\n", - "pprint.pprint(json.loads(sf.summary()))" + "# And here is a run that jointly segments by the trends in the averages and the segment sizes\n", + "\n", + "sf = explain_timeseries(\n", + " df=df,\n", + " dims=dims,\n", + " num_segments=7,\n", + " max_depth=2,\n", + " total_name=totals,\n", + " size_name=size,\n", + " time_name=time,\n", + " verbose=False,\n", + " fit_sizes=True,\n", + ")\n", + "sf.plot(plot_is_static=plot_is_static, height=1500, width=1000, average_name=\"VPC\")\n", + "pprint(sf.relevant_cluster_names)" ] }, { @@ -402,9 +384,9 @@ ], "metadata": { "kernelspec": { - "display_name": "Python [conda env:wise-pizza3.10]", + "display_name": "Python [conda env:wise-pizza3.11]", "language": "python", - "name": "conda-env-wise-pizza3.10-py" + "name": "conda-env-wise-pizza3.11-py" }, "language_info": { "codemirror_mode": { @@ -416,7 +398,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.15" + "version": "3.11.10" } }, "nbformat": 4, diff --git a/requirements.txt b/requirements.txt index fcb3ef1..f92baa4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,5 +9,5 @@ scipy>=1.8.0 tqdm cloudpickle pivottablejs -streamlit==1.32.0 +streamlit>=1.32.0 nbformat>=4.2.0 diff --git a/tests/test_fit.py b/tests/test_fit.py index 1ba9802..4bb8eaf 100644 --- a/tests/test_fit.py +++ b/tests/test_fit.py @@ -139,7 +139,7 @@ def test_time_series_tree_solver(fit_sizes: bool): sf = explain_timeseries( df=data.data, dims=data.dimensions, - max_segments=7, + num_segments=7, max_depth=2, total_name=data.segment_total, size_name=data.segment_size, diff --git a/tests/timeseries_wip_entrypoint.py b/tests/timeseries_wip_entrypoint.py index fffb4fc..84c197a 100644 --- a/tests/timeseries_wip_entrypoint.py +++ b/tests/timeseries_wip_entrypoint.py @@ -30,14 +30,15 @@ sf = explain_timeseries( df=df, dims=dims, - max_segments=7, + num_segments=7, max_depth=2, total_name=totals, size_name=size, time_name=time, verbose=False, solver="tree", - fit_sizes=False, + fit_sizes=True, + num_breaks=100, ) sf.plot(plot_is_static=False, height=1500, width=1000, average_name="VPC") print(sf.summary()) diff --git a/tests/timeseries_wip_entrypoint_2.py b/tests/timeseries_wip_entrypoint_2.py new file mode 100644 index 0000000..809092f --- /dev/null +++ b/tests/timeseries_wip_entrypoint_2.py @@ -0,0 +1,47 @@ +import os, sys +import pandas as pd + +root_path = os.path.realpath("../..") +print(root_path) + +# this assumes that all of the following files are checked in the same directory +sys.path.append(os.path.join(root_path, "wise-pizza")) + +# create data-related directories +data_dir = os.path.realpath(os.path.join(root_path, "wise-pizza/data")) +if not os.path.isdir(data_dir): + os.mkdir(data_dir) +print(data_dir) + +from wise_pizza import explain_timeseries + +df = pd.read_csv( + os.path.join(data_dir, "volume_data_new.csv") +) # replace this variable with your data +dims = [ + "CUSTOMER_TYPE", + "STRATEGIC_PRODUCT", + "SOURCE_CURRENCY", + "TARGET_CURRENCY", + "PRODUCT_USE_CASE", + "REGION", + "TRANS_VOL_BUCKET", +] # dimensions to find segments +totals = "VOLUME_GBP" # value to analyze +size = "NUM_CUSTOMERS" #'NUM_TRANSACTIONS' # number of objects +time = "ACTION_YM" +sf = explain_timeseries( + df=df, + dims=dims, + max_segments=7, + max_depth=2, + total_name=totals, + size_name=size, + time_name=time, + verbose=False, + solver="tree", + fit_sizes=True, +) +sf.plot(plot_is_static=False, height=1500, width=1000, average_name="VPC") +print(sf.summary()) +print("yay!") diff --git a/wise_pizza/explain.py b/wise_pizza/explain.py index 6b35f48..747f57c 100644 --- a/wise_pizza/explain.py +++ b/wise_pizza/explain.py @@ -367,18 +367,37 @@ def explain_timeseries( total_name: str, time_name: str, size_name: Optional[str] = None, - min_segments: int = None, - max_segments: int = None, - min_depth: int = 1, + num_segments: int = None, max_depth: int = 2, solver: str = "tree", verbose: bool = False, time_basis: Optional[pd.DataFrame] = None, fit_log_space: bool = False, fit_sizes: Optional[bool] = None, - num_breaks: int = 2, + num_breaks: int = 3, + n_jobs: int = 10, + ignore_averages: bool = True, log_space_weight_sc: float = 0.5, ): + """ + Split a time series panel dataset into segments that are as different as possible + :param df: A pandas DataFrame with the time series data + :param dims: Discrete dimensions to segment by + :param total_name: Name of the column containing totals + :param time_name: Name of the column containing the time values + :param num_segments: Number of segments to find + :param size_name: (Optional) Name of the column containing the size of the segment + :param max_depth: (Optional, defaults to 2) Maximum number of dimensions to constrain per segment + :param fit_sizes: (Optional) Whether to fit the sizes of the segments, or just the averages + :param n_jobs: (Optional, defaults to 10) Number of jobs to run in parallel when finding segments + :param num_breaks: (Optional, defaults to 3) Number of breaks in stylized time series used for comparing segments + :param ignore_averages: If set to True (recommended), the level (across time) of each segment is ignored when calculating similarity + :param time_basis: A DataFrame with the time basis to use. Only use if you know what you're doing. + :param solver: (Optional) The solver to use, currently only "tree" is supported + :param fit_log_space: Do not use + :param log_space_weight_sc: Do not use + :return: + """ assert ( solver == "tree" ), "Only the tree solver is supported for time series at the moment" @@ -450,16 +469,31 @@ def explain_timeseries( time_basis = ( pd.concat([time_basis, re_basis], axis=0).fillna(0.0).reset_index(drop=True) ) - print("yay!") groupby_dims = ["chunk", "__time"] else: groupby_dims = ["__time"] df2["_target"] = df2[total_name] df2["__time"] = df2[time_name] - df2["total_adjustment"] = 0.0 - avg_df = 0.0 - average = 0.0 + + # Adds the column of the time average over each dimension combination + if ignore_averages: + df2, avg_df = add_average_over_time( + df2, + dims=dims, + total_name=total_name, + size_name=size_name, + time_name="__time", + groupby_dims=groupby_dims, + cartesian=False, + ) + else: + df2["total_adjustment"] = 0.0 + avg_df = None + + # The join in the above function could have messed up the ordering + df2 = df2.sort_values(by=dims + groupby_dims) + average = df2[total_name].sum() / df2[size_name].sum() sf = SliceFinder() sf.global_average = average @@ -468,20 +502,20 @@ def explain_timeseries( sf.time_name = time_name sf.y_adj = df2["total_adjustment"].values sf.avg_df = avg_df - sf.time_values = df2[time_name].unique() + sf.time_values = df2["__time"].unique() sf.fit( - df2[dims + groupby_dims], - df2["_target"], - time_col=df2[time_name], + df2[dims + groupby_dims + ["total_adjustment"]], + df2[total_name], + time_col=df2["__time"], time_basis=time_basis, weights=df2[size_name], - min_segments=min_segments, - max_segments=max_segments, - min_depth=min_depth, + max_segments=num_segments, max_depth=max_depth, solver=solver, verbose=verbose, groupby_dims=groupby_dims, + cluster_values=False, + n_jobs=n_jobs, ) # TODO: insert back the normalized bits? diff --git a/wise_pizza/plotting_time_tree.py b/wise_pizza/plotting_time_tree.py index 04efa5a..98b2e49 100644 --- a/wise_pizza/plotting_time_tree.py +++ b/wise_pizza/plotting_time_tree.py @@ -75,7 +75,7 @@ def preprocess_for_ts_plot( ) -> List[List[PlotData]]: out = [] for row, s in enumerate(sf.segments): - print(row, s) + # print(row, s) this_df = pd.DataFrame( { "time": sf.time, @@ -158,3 +158,4 @@ def simple_ts_plot( row=row_num, col=col_num, ) + fig.update_layout(xaxis=dict(autorange=True), yaxis=dict(autorange=True)) diff --git a/wise_pizza/slicer.py b/wise_pizza/slicer.py index 75e9ae2..f2aba60 100644 --- a/wise_pizza/slicer.py +++ b/wise_pizza/slicer.py @@ -27,7 +27,7 @@ def _summary(obj) -> str: { k: v for k, v in s.items() - if k in ["segment", "total", "seg_size", "naive_avg"] + if k in ["segment", "total", "seg_size", "naive_avg", "impact"] } for s in obj.segments ], @@ -124,6 +124,7 @@ def fit( constrain_signs: bool = True, cluster_values: bool = True, groupby_dims: Optional[List[str]] = None, + n_jobs: int = 1, ): """ Function to fit slicer and find segments @@ -143,6 +144,9 @@ def fit( group of segments from the same dimension with similar naive averages """ + dim_df = dim_df.copy() + if groupby_dims is None: + groupby_dims = [] assert solver.lower() in ["lasso", "tree", "omp", "lp"] min_segments, max_segments = clean_up_min_max(min_segments, max_segments) @@ -160,18 +164,20 @@ def fit( assert np.sum(np.abs(totals[weights == 0])) == 0 # Cast all dimension values to strings - dim_df = dim_df.astype(str) + for c in dim_df.columns: + if c not in groupby_dims + ["total_adjustment"]: + dim_df[c] = dim_df[c].astype(str) dims = list(dim_df.columns) - if groupby_dims is not None: - dims = [d for d in dims if d not in groupby_dims] + if groupby_dims: + dims = [d for d in dims if d not in groupby_dims + ["total_adjustment"]] # sort the dataframe by dimension values, # making sure the other vectors stay aligned dim_df = dim_df.reset_index(drop=True) dim_df["totals"] = totals dim_df["weights"] = weights - if groupby_dims is not None: + if groupby_dims: dim_df = pd.merge(dim_df, time_basis, on=groupby_dims) sort_dims = dims + groupby_dims else: @@ -220,6 +226,8 @@ def fit( num_leaves=max_segments, max_depth=max_depth, fitter=AverageFitter(), + n_jobs=n_jobs, + verbose=verbose, ) Xw = csc_matrix(diags(self.weights) @ self.X) @@ -256,6 +264,8 @@ def fit( fitter=fitter, num_leaves=max_segments, max_depth=max_depth, + n_jobs=n_jobs, + verbose=verbose, ) self.nonzeros = np.array(range(self.X.shape[1])) @@ -420,7 +430,8 @@ def relevant_cluster_names(self): relevant_clusters = {} for s in self.segments: for c in s["segment"].values(): - if c in self.cluster_names: + if c in self.cluster_names and ";" not in c: + # Then cluster names containing ; are snumerations, don't need explanation relevant_clusters[c] = self.cluster_names[c].replace("@@", ", ") return relevant_clusters diff --git a/wise_pizza/solve/fitter.py b/wise_pizza/solve/fitter.py index f002cdd..a1b3c64 100644 --- a/wise_pizza/solve/fitter.py +++ b/wise_pizza/solve/fitter.py @@ -48,7 +48,6 @@ def debug_plot(X, y, y_pred, w): plt.plot(X_agg["y_pred"] / X_agg["weights"], label="y_pred") plt.legend() plt.show() - print("yay!") class TimeFitterModel(ABC): diff --git a/wise_pizza/solve/partition.py b/wise_pizza/solve/partition.py index dca2f35..5ea3381 100644 --- a/wise_pizza/solve/partition.py +++ b/wise_pizza/solve/partition.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd + from .weighted_quantiles import weighted_quantiles @@ -41,9 +42,15 @@ def target_encoding_partitions(df: pd.DataFrame, dim: str, num_bins: int): return partitions -def kmeans_partition(df: pd.DataFrame, dim: str, groupby_dims: List[str]): +def kmeans_partition( + df: pd.DataFrame, + dim: str, + groupby_dims: List[str], + normalize_averages: bool = False, +): assert len(df[dim].unique()) >= 3 # Get split candidates + # Get time profiles split by the dimension we are evaluating agg_df = df.groupby([dim] + groupby_dims, as_index=False).sum() agg_df["__avg"] = agg_df["totals"] / agg_df["weights"] pivot_df = agg_df.pivot( @@ -56,16 +63,31 @@ def kmeans_partition(df: pd.DataFrame, dim: str, groupby_dims: List[str]): for chunk in ["Average", "Weights"]: this_df = pivot_df[pivot_df["chunk"] == chunk] nice_values = fill_gaps(this_df[value_cols].values) - if chunk == "Weights": - nice_values = ( - np.mean(nice_mats["Average"]) - * nice_values - / np.sum(nice_values, axis=0, keepdims=True) + + if normalize_averages: + # Normalize both subsegments separately: weights and averages + nice_values /= ( + np.linalg.norm(nice_values, ord=2, axis=0, keepdims=True) + 1e-6 ) + else: + if chunk == "Weights": + nice_values = ( + np.mean(nice_mats["Average"]) + * nice_values + / ( + np.linalg.norm(nice_values, ord=2, axis=0, keepdims=True) + + 1e-6 + ) + ) nice_mats[chunk] = nice_values joint_mat = np.concatenate([nice_mats["Average"], nice_mats["Weights"]], axis=0) else: - joint_mat = fill_gaps(pivot_df[value_cols].values) + nice_values = fill_gaps(pivot_df[value_cols].values) + if normalize_averages: + nice_values /= ( + np.linalg.norm(nice_values, ord=2, axis=0, keepdims=True) + 1e-6 + ) + joint_mat = nice_values weights = pivot_df[value_cols].T.sum(axis=1) vector_dict = {} @@ -73,8 +95,10 @@ def kmeans_partition(df: pd.DataFrame, dim: str, groupby_dims: List[str]): vector_dict[c] = (weights.loc[c], joint_mat[:, i]) cluster1, cluster2 = weighted_kmeans_two_clusters(vector_dict) - - return [(cluster1, cluster2)] + if cluster1 is None: + return [] + else: + return [(cluster1, cluster2)] def weighted_kmeans_two_clusters(data_dict, tol=1e-4, max_iter=100, max_retries=10): @@ -106,12 +130,20 @@ def weighted_kmeans_two_clusters(data_dict, tol=1e-4, max_iter=100, max_retries= break # Update centroids with weighted averages - new_centroids = np.array( - [ - np.average(data[labels == i], axis=0, weights=weights[labels == i]) - for i in range(2) - ] - ) + try: + new_centroids = np.array( + [ + np.average( + data[labels == i], axis=0, weights=weights[labels == i] + ) + for i in range(2) + ] + ) + except ZeroDivisionError: + print( + f"Zero division error detected on retry {retry + 1}, reinitializing centroids." + ) + break # Check for convergence if np.linalg.norm(new_centroids - centroids) < tol: @@ -124,9 +156,7 @@ def weighted_kmeans_two_clusters(data_dict, tol=1e-4, max_iter=100, max_retries= centroids = new_centroids - raise ValueError( - "Failed to find a valid clustering with non-empty clusters after maximum retries." - ) + return None, None def fill_gaps(x: np.ndarray, num_iter=50): @@ -139,7 +169,7 @@ def fill_gaps(x: np.ndarray, num_iter=50): nice_marg = interpolate_and_extrapolate(marg) tile_marg = np.tile(nice_marg, (x.shape[1], 1)).T tile_marg[nans] = np.nan - reg = np.nanmedian(x) * 1e-6 + reg = np.nanmedian(x) * 1e-6 + 1e-6 coeffs = (np.nansum(x * tile_marg, axis=0) + reg) / ( np.nansum(tile_marg * tile_marg, axis=0) + reg ) diff --git a/wise_pizza/solve/tree.py b/wise_pizza/solve/tree.py index 8fec440..a2eaf47 100644 --- a/wise_pizza/solve/tree.py +++ b/wise_pizza/solve/tree.py @@ -4,6 +4,7 @@ import numpy as np import pandas as pd from scipy.sparse import csc_matrix +from joblib import Parallel, delayed from .fitter import AverageFitter, Fitter, TimeFitterModel, TimeFitter @@ -17,6 +18,8 @@ def tree_solver( fitter: Fitter, max_depth: Optional[int] = None, num_leaves: Optional[int] = None, + n_jobs: int = 10, + verbose: bool = False, ): """ Partition the data into segments using a greedy binary tree approach @@ -25,10 +28,15 @@ def tree_solver( :param fitter: A model to fit on the chunks :param max_depth: max depth of the tree :param num_leaves: num leaves to generate + :param n_jobs: number of parallel jobs + :param verbose: print progress :return: Segment description, column definitions, and cluster names """ df = dim_df.copy().reset_index(drop=True) + if "total_adjustment" not in df.columns: + df["total_adjustment"] = 0.0 + df["totals"] -= df["total_adjustment"] df["__avg"] = df["totals"] / df["weights"] df["__avg"] = df["__avg"].fillna(df["__avg"].mean()) @@ -38,9 +46,10 @@ def tree_solver( dims=dims, time_col=None if isinstance(fitter, AverageFitter) else "__time", max_depth=max_depth, + n_jobs=n_jobs, ) - build_tree(root=root, num_leaves=num_leaves, max_depth=max_depth) + build_tree(root=root, num_leaves=num_leaves, max_depth=max_depth, verbose=verbose) leaves = get_leaves(root) @@ -53,6 +62,10 @@ def tree_solver( re_df = pd.concat([leaf.df for leaf in leaves]).sort_values( dims + fitter.groupby_dims ) + # Put back the averages over time by segment + re_df["prediction"] += re_df["total_adjustment"] / re_df["weights"] + + # re_df["totals"] += re_df["total_adjustment"] if len(fitter.groupby_dims) == 2: # Time series with weights re_df_w = re_df[re_df["chunk"] == "Weights"].copy() @@ -85,6 +98,7 @@ def __init__( time_col: str = None, max_depth: Optional[int] = None, dim_split: Optional[Dict[str, List]] = None, + n_jobs: int = 10, ): self.df = df.copy().sort_values(dims + fitter.groupby_dims) self.fitter = fitter @@ -98,6 +112,7 @@ def __init__( self.model = None # For dimension splitting candidates, hardwired for now self.num_bins = 10 + self.parallel_processes = n_jobs @property def depth(self): @@ -134,9 +149,9 @@ def error_improvement(self): else: iter_dims = self.dims - for dim in iter_dims: + def error_improvement_for_dim(dim): if len(self.df[dim].unique()) == 1: - continue + return float("inf"), (None, None) elif len(self.df[dim].unique()) == 2: vals = self.df[dim].unique() @@ -150,7 +165,11 @@ def error_improvement(self): partitions = kmeans_partition( self.df, dim, self.fitter.groupby_dims ) + if len(partitions) == 0: + return float("inf"), (None, None) + best_error = float("inf") + candidates = (None, None) for dim_values1, dim_values2 in partitions: left = self.df[self.df[dim].isin(dim_values1)] right = self.df[self.df[dim].isin(dim_values2)] @@ -174,8 +193,17 @@ def error_improvement(self): err = left_candidate.error + right_candidate.error if err < best_error: best_error = err - self._error_improvement = self.error - best_error - self._best_submodels = (left_candidate, right_candidate) + candidates = (left_candidate, right_candidate) + return best_error, candidates + + results = Parallel(n_jobs=self.parallel_processes)( + delayed(error_improvement_for_dim)(i) for i in iter_dims + ) + for err, candidates in results: + if err < best_error: + best_error = err + self._best_submodels = candidates + self._error_improvement = self.error - best_error return self._error_improvement @@ -196,15 +224,23 @@ def get_best_subtree_result( return node2 -def build_tree(root: ModelNode, num_leaves: int, max_depth: Optional[int] = 1000): +def build_tree( + root: ModelNode, + num_leaves: int, + max_depth: Optional[int] = 1000, + verbose: bool = False, +): for i in range(num_leaves - 1): - print(f"Adding node {i+1}...") + if verbose: + print(f"Adding node {i+1}...") best_node = get_best_subtree_result(root, max_depth) if best_node.error_improvement > 0: best_node.children = best_node._best_submodels - print("Done!") + if verbose: + print("Done!") else: - print("No more improvement, stopping") + if verbose: + print("No more improvement, stopping") break diff --git a/wise_pizza/time.py b/wise_pizza/time.py index e50e569..bb7d2e4 100644 --- a/wise_pizza/time.py +++ b/wise_pizza/time.py @@ -45,6 +45,7 @@ def prune_time_basis( dtrend_cols = [t for t in time_basis.columns if "dtrend" in t] chosen_cols = [] # from all the possible kinks, choose evenly spaced num_breaks ones + num_breaks = min(num_breaks, len(dtrend_cols) - 1) for i in range(1, num_breaks + 1): chosen_cols.append(dtrend_cols[int(i * len(dtrend_cols) / (num_breaks + 1))]) pre_basis = time_basis[["Intercept", "Slope"] + chosen_cols].copy() @@ -90,36 +91,48 @@ def add_average_over_time( total_name: str, size_name: str, time_name: str, + groupby_dims: List[str] = None, cartesian: bool = False, ) -> Tuple[pd.DataFrame, pd.DataFrame]: - avgs = df[dims + [total_name, size_name]].groupby(dims, as_index=False).sum() - avgs["avg"] = avgs[total_name] / avgs[size_name] - if cartesian: - # make sure that the cartesian product of dimension combinations x time is present, - # without changing the totals - times = df[[time_name]].groupby(time_name, as_index=False).sum() - times["key"] = 1 - avgs["key"] = 1 - cartesian_df = pd.merge(avgs, times, on="key").drop(columns=["key"]) - joined = pd.merge( - df, - cartesian_df[dims + [time_name]], - on=dims + [time_name], - how="right", - ) - joined[size_name] = joined[size_name].fillna( - np.nanmean(joined[size_name].values) - ) - joined[total_name] = joined[total_name].fillna(0.0) - df = joined - - avgs = df[dims + [total_name, size_name]].groupby(dims, as_index=False).sum() + groupby_dims = groupby_dims or [time_name] + + # get the average of the total over time + group_dims = dims + [c for c in groupby_dims if c != time_name] + avgs = ( + df[group_dims + [total_name, size_name]] + .groupby(group_dims, as_index=False) + .sum() + ) + avgs["avg"] = avgs[total_name] / avgs[size_name] - joined = pd.merge(df, avgs[dims + ["avg"]], on=dims) + # if cartesian: + # # make sure that the cartesian product of dimension combinations x time is present, + # # without changing the totals + # times = df[[time_name]].groupby(time_name, as_index=False).sum() + # times["key"] = 1 + # avgs["key"] = 1 + # cartesian_df = pd.merge(avgs, times, on="key").drop(columns=["key"]) + # joined = pd.merge( + # df, + # cartesian_df[dims + [time_name]], + # on=dims + [time_name], + # how="right", + # ) + # joined[size_name] = joined[size_name].fillna( + # np.nanmean(joined[size_name].values) + # ) + # joined[total_name] = joined[total_name].fillna(0.0) + # df = joined + + # avgs = df[dims + [total_name, size_name]].groupby(dims, as_index=False).sum() + # avgs["avg"] = avgs[total_name] / avgs[size_name] + + joined = pd.merge(df, avgs[group_dims + ["avg"]], on=group_dims) joined["total_adjustment"] = joined[size_name] * joined["avg"] - out = joined[dims + [total_name, size_name, time_name, "total_adjustment"]] - tmp = out[dims + [total_name, "total_adjustment"]].groupby(dims).sum() + + out = joined[group_dims + [total_name, size_name, time_name, "total_adjustment"]] + tmp = out[group_dims + [total_name, "total_adjustment"]].groupby(dims).sum() assert (tmp[total_name] - tmp["total_adjustment"]).abs().sum() < 1e-6 * df[ total_name ].abs().max()