diff --git a/notebooks/RunExperiments/cluster_config.yaml b/notebooks/RunExperiments/cluster_config.yaml index 9e4dbbf..d3c9775 100644 --- a/notebooks/RunExperiments/cluster_config.yaml +++ b/notebooks/RunExperiments/cluster_config.yaml @@ -6,7 +6,7 @@ cluster_name: default # The maximum number of workers nodes to launch in addition to the head # node. -max_workers: 2 +max_workers: 20 # The autoscaler will scale up the cluster faster with higher upscaling speed. # E.g., if the task requires adding more nodes then autoscaler will gradually @@ -41,7 +41,7 @@ idle_timeout_minutes: 5 provider: type: gcp region: us-west1 - availability_zone: us-west1-a + availability_zone: us-west1-b project_id: motleys # Globally unique project id # How Ray will authenticate with newly launched nodes. @@ -60,7 +60,7 @@ auth: available_node_types: ray_head_default: # The resources provided by this node type. - resources: {"CPU": 2} + resources: {"CPU": 0} # Provider-specific config for the head node, e.g. instance type. By default # Ray will auto-configure unspecified fields such as subnets and ssh-keys. # For more documentation on available fields, see: @@ -93,7 +93,7 @@ available_node_types: min_workers: 1 # The maximum number of worker nodes of this type to launch. # This takes precedence over min_workers. - max_workers: 5 + max_workers: 20 # The resources provided by this node type. resources: {"CPU": 2} # Provider-specific config for the head node, e.g. instance type. By default @@ -101,7 +101,7 @@ available_node_types: # For more documentation on available fields, see: # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert node_config: - machineType: n1-standard-2 + machineType: n2-highmem-2 disks: - boot: true autoDelete: true diff --git a/notebooks/RunExperiments/runners/experiment_runner.py b/notebooks/RunExperiments/runners/experiment_runner.py index 3d774e0..a77af58 100644 --- a/notebooks/RunExperiments/runners/experiment_runner.py +++ b/notebooks/RunExperiments/runners/experiment_runner.py @@ -4,24 +4,29 @@ import os import pickle import sys +import time import warnings -from datetime import datetime from typing import List, Union +import cloudpickle import matplotlib import matplotlib.pyplot as plt import numpy as np import pandas as pd +import ray from sklearn.model_selection import train_test_split + +from causaltune import CausalTune +from causaltune.data_utils import CausalityDataset +from causaltune.models.passthrough import passthrough_model + # Ensure CausalTune is in the Python path root_path = os.path.realpath("../../../../..") # noqa: E402 sys.path.append(os.path.join(root_path, "causaltune")) # noqa: E402 # Import CausalTune and other custom modules after setting up the path -from causaltune import CausalTune # noqa: E402 from causaltune.datasets import load_dataset # noqa: E402 -from causaltune.models.passthrough import passthrough_model # noqa: E402 from causaltune.search.params import SimpleParamService # noqa: E402 from causaltune.score.scoring import ( metrics_to_minimize, # noqa: E402 @@ -31,6 +36,8 @@ # Configure warnings warnings.filterwarnings("ignore") +RAY_NAMESPACE = "causaltune_experiments" + def parse_arguments(): parser = argparse.ArgumentParser(description="Run CausalTune experiments") @@ -47,9 +54,7 @@ def parse_arguments(): help="Datasets to use (format: Size Name, e.g., Small Linear_RCT)", ) parser.add_argument("--n_runs", type=int, default=1, help="Number of runs") - parser.add_argument( - "--num_samples", type=int, default=-1, help="Maximum number of iterations" - ) + parser.add_argument("--num_samples", type=int, default=-1, help="Maximum number of iterations") parser.add_argument("--outcome_model", type=str, default="nested", help="Outcome model type") parser.add_argument( @@ -90,7 +95,7 @@ def get_estimator_list(dataset_name): return [est for est in estimator_list if "Dummy" not in est] -def run_experiment(args, dataset_path: str, use_ray: bool = False): +def run_experiment(args, dataset_path: str, use_ray: bool): # Process datasets data_sets = {} for dataset in args.datasets: @@ -104,11 +109,7 @@ def run_experiment(args, dataset_path: str, use_ray: bool = False): file_path = f"{dataset_path}/{size}/{name}.pkl" data_sets[f"{size} {name}"] = load_dataset(file_path) - if args.timestamp_in_dirname: - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - out_dir = f"EXPERIMENT_RESULTS_{timestamp}_{args.identifier}" - else: - out_dir = f"EXPERIMENT_RESULTS_{args.identifier}" + out_dir = f"../EXPERIMENT_RESULTS_{args.identifier}" os.makedirs(out_dir, exist_ok=True) out_dir = os.path.realpath(os.path.join(out_dir, size)) @@ -116,143 +117,103 @@ def run_experiment(args, dataset_path: str, use_ray: bool = False): print(f"Loaded datasets: {list(data_sets.keys())}") - # Set time budgets properly - if args.time_budget is not None and args.components_time_budget is not None: - raise ValueError("Please specify either time_budget or components_time_budget, not both.") - elif args.time_budget is None and args.components_time_budget is None: - args.components_time_budget = 30 # Set default components budget - - # If only time_budget is specified, derive components_time_budget from it - if args.time_budget is not None: - args.components_time_budget = max(30, args.time_budget / 4) # Ensure minimum budget - args.time_budget = None # Use only components_time_budget - - for dataset_name, cd in data_sets.items(): - # Extract case while preserving original string checking logic - if "KCKP" in dataset_name: - case = "KCKP" - elif "KC" in dataset_name: - case = "KC" - elif "IV" in dataset_name: - case = "IV" - else: - case = "RCT" - - os.makedirs(f"{out_dir}/{case}", exist_ok=True) - - for i_run in range(1, args.n_runs + 1): - cd_i = copy.deepcopy(cd) - train_df, test_df = train_test_split(cd_i.data, test_size=args.test_size) - test_df = test_df.reset_index(drop=True) - cd_i.data = train_df - + already_running = False + if use_ray: + try: + runner = ray.get_actor("TaskRunner") + print("\n" * 4) + print( + "!!! Found an existing detached TaskRunner. Will assume the tasks have already been submitted." + ) + print( + "!!! If you want to re-run the experiments from scratch, " + 'run ray.kill(ray.get_actor("TaskRunner", namespace="{}")) or recreate the cluster.'.format( + RAY_NAMESPACE + ) + ) + print("\n" * 4) + already_running = True + except ValueError: + print("Ray: no detached TaskRunner found, creating...") + # This thing will be alive even if the host program exits + # Must be killed explicitly: ray.kill(ray.get_actor("TaskRunner")) + runner = TaskRunner.options(name="TaskRunner", lifetime="detached").remote() + + out = [] + if not already_running: + tasks = [] + i_run = 1 + + for dataset_name, cd in data_sets.items(): + estimators = get_estimator_list(dataset_name) + # Extract case while preserving original string checking logic + if "KCKP" in dataset_name: + case = "KCKP" + elif "KC" in dataset_name: + case = "KC" + elif "IV" in dataset_name: + case = "IV" + else: + case = "RCT" + + os.makedirs(f"{out_dir}/{case}", exist_ok=True) for metric in args.metrics: - if metric == "ate": # this is not something to optimize + fn = make_filename(metric, dataset_name, i_run) + out_fn = os.path.join(out_dir, case, fn) + if os.path.isfile(out_fn): + print(f"File {out_fn} exists, skipping...") continue - - print(f"Optimizing {metric} for {dataset_name} (run {i_run})") - try: - fn = make_filename(metric, dataset_name, i_run) - out_fn = os.path.join(out_dir, case, fn) - if os.path.isfile(out_fn): - print(f"File {out_fn} exists, skipping...") - continue - - # Set propensity model using string checking like original version - if "KCKP" in dataset_name: - print(f"Using passthrough propensity model for {dataset_name}") - propensity_model = passthrough_model( - cd_i.propensity_modifiers, include_control=False + if use_ray: + tasks.append( + runner.remote_single_run.remote( + dataset_name, + cd, + metric, + args.test_size, + args.num_samples, + args.components_time_budget, + out_dir, + out_fn, + estimators, ) - elif "KC" in dataset_name: - print(f"Using auto propensity model for {dataset_name}") - propensity_model = "auto" - else: - print(f"Using dummy propensity model for {dataset_name}") - propensity_model = "dummy" - - ct = CausalTune( - metric=metric, - estimator_list=get_estimator_list(dataset_name), - num_samples=args.num_samples, - components_time_budget=args.components_time_budget, # Use this instead - verbose=1, - components_verbose=1, - store_all_estimators=True, - propensity_model=propensity_model, - outcome_model=args.outcome_model, - use_ray=use_ray, ) - - ct.fit( - data=cd_i, - treatment="treatment", - outcome="outcome", + else: + results = single_run( + dataset_name, + cd, + metric, + args.test_size, + args.num_samples, + args.components_time_budget, + out_dir, + out_fn, ) + out.append(results) - # Compute scores and save results - results = compute_scores(ct, metric, test_df) - - with open(out_fn, "wb") as f: - pickle.dump(results, f) - except Exception as e: - print(f"Error processing {dataset_name}_{metric}_{i_run}: {e}") - - return out_dir - - -def compute_scores(ct, metric, test_df): - datasets = {"train": ct.train_df, "validation": ct.test_df, "test": test_df} - estimator_scores = {est: [] for est in ct.scores.keys() if "NewDummy" not in est} + if use_ray: + while True: + completed, in_progress = ray.get(runner.get_progress.remote()) + print(f"Ray: {completed}/{completed + in_progress} tasks completed") + if not in_progress: + print("Ray: all tasks completed!") + break + time.sleep(10) - all_scores = [] - for trial in ct.results.trials: - try: - estimator_name = trial.last_result["estimator_name"] - if "estimator" in trial.last_result and trial.last_result["estimator"]: - estimator = trial.last_result["estimator"] - scores = {} - for ds_name, df in datasets.items(): - scores[ds_name] = {} - est_scores = ct.scorer.make_scores( - estimator, - df, - metrics_to_report=ct.metrics_to_report, - ) - est_scores["estimator_name"] = estimator_name + print("Ray: fetching results...") + out = ray.get(runner.get_results.remote()) - scores[ds_name]["CATE_estimate"] = np.squeeze(estimator.estimator.effect(df)) - scores[ds_name]["CATE_groundtruth"] = np.squeeze(df["true_effect"]) - est_scores["MSE"] = np.mean( - (scores[ds_name]["CATE_estimate"] - scores[ds_name]["CATE_groundtruth"]) - ** 2 - ) - scores[ds_name]["scores"] = est_scores - scores["optimization_score"] = trial.last_result.get("optimization_score") - estimator_scores[estimator_name].append(copy.deepcopy(scores)) - # Will use this in the nex - all_scores.append(scores) - except Exception as e: - print(f"Error processing trial: {e}") - - for k in estimator_scores.keys(): - estimator_scores[k] = sorted( - estimator_scores[k], - key=lambda x: x["validation"]["scores"][metric], - reverse=metric not in metrics_to_minimize(), - ) + for out_fn, results in out: + with open(out_fn, "wb") as f: + pickle.dump(results, f) - # Debugging: Log final result structure - print(f"Returning scores for metric {metric}: Best estimator: {ct.best_estimator}") + if use_ray: + destroy = input("Ray: seems like the results fetched OK. Destroy TaskRunner? ") + if destroy.lower().startswith("y"): + print("Destroying TaskRunner... ", end="") + ray.kill(runner) + print("success!") - return { - "best_estimator": ct.best_estimator, - "best_config": ct.best_config, - "best_score": ct.best_score, - "optimised_metric": metric, - "scores_per_estimator": estimator_scores, - "all_scores": all_scores, - } + return out_dir def extract_metrics_datasets(out_dir: str): @@ -333,11 +294,8 @@ def generate_plots( def plot_grid(title): # Use determined problem type instead of hardcoding "backdoor" - all_metrics = [ - m - for m in supported_metrics(problem, False, False) - if m.lower() != "ate" and m.lower() != "norm_erupt" - ] + files = os.listdir(out_dir) + all_metrics = sorted(list(set([f.split("-")[0] for f in files]))) fig, axs = plt.subplots( len(all_metrics), len(datasets), figsize=(20, 5 * len(all_metrics)), dpi=300 @@ -351,7 +309,7 @@ def plot_grid(title): # For multiple metrics in args.metrics, use the first one that has a results file results_files = {} for dataset in datasets: - for metric in args.metrics: + for metric in all_metrics: filename = make_filename(metric, dataset, 1) filepath = os.path.join(out_dir, filename) if os.path.exists(filepath): @@ -375,11 +333,7 @@ def plot_grid(title): try: # Find best estimator for this metric best_estimator = None - best_score = ( - float("inf") - if metric in metrics_to_minimize() - else float("-inf") - ) + best_score = float("inf") if metric in metrics_to_minimize() else float("-inf") estimator_name = None for score in results["all_scores"]: @@ -389,24 +343,16 @@ def plot_grid(title): if current_score < best_score: best_score = current_score best_estimator = score - estimator_name = score["test"]["scores"][ - "estimator_name" - ] + estimator_name = score["test"]["scores"]["estimator_name"] else: if current_score > best_score: best_score = current_score best_estimator = score - estimator_name = score["test"]["scores"][ - "estimator_name" - ] + estimator_name = score["test"]["scores"]["estimator_name"] if best_estimator: - CATE_gt = np.array( - best_estimator["test"]["CATE_groundtruth"] - ).flatten() - CATE_est = np.array( - best_estimator["test"]["CATE_estimate"] - ).flatten() + CATE_gt = np.array(best_estimator["test"]["CATE_groundtruth"]).flatten() + CATE_est = np.array(best_estimator["test"]["CATE_estimate"]).flatten() # Plotting ax.scatter(CATE_gt, CATE_est, s=40, alpha=0.5) @@ -445,9 +391,7 @@ def plot_grid(title): ) except Exception as e: - print( - f"Error processing metric {metric} for dataset {dataset}: {e}" - ) + print(f"Error processing metric {metric} for dataset {dataset}: {e}") ax.text( 0.5, 0.5, @@ -466,9 +410,7 @@ def plot_grid(title): labelpad=5, # Reduce padding between label and plot ) if i == 0: - ax.set_title( - dataset, fontsize=font_size + 14, fontweight="bold", pad=15 - ) + ax.set_title(dataset, fontsize=font_size + 14, fontweight="bold", pad=15) ax.set_xticks([]) ax.set_yticks([]) @@ -488,7 +430,10 @@ def plot_mse_grid(title): est_names = sorted(df["estimator_name"].unique()) # Problem type already determined at top level - all_metrics = [c for c in df.columns if c in supported_metrics(problem, False, False)and c.lower() != "ate" + all_metrics = [ + c + for c in df.columns + if c in supported_metrics(problem, False, False) and c.lower() != "ate" ] fig, axs = plt.subplots( @@ -570,9 +515,7 @@ def plot_mse_grid(title): labelpad=5, ) if i == 0: - ax.set_title( - dataset, fontsize=font_size + 14, fontweight="bold", pad=15 - ) + ax.set_title(dataset, fontsize=font_size + 14, fontweight="bold", pad=15) plt.suptitle( f"MSE vs. Scores: {title}", @@ -582,6 +525,11 @@ def plot_mse_grid(title): # Match spacing style with plot_grid plt.tight_layout(rect=[0.1, 0, 1, 0.96], h_pad=1.0, w_pad=0.5) + + fig_legend, ax_legend = plt.subplots(figsize=(6, 6)) + ax_legend.legend(handles=legend_elements, loc="center", fontsize=10) + ax_legend.axis("off") + plt.savefig(os.path.join(out_dir, "MSE_grid.pdf"), format="pdf", bbox_inches="tight") plt.savefig(os.path.join(out_dir, "MSE_grid.png"), format="png", bbox_inches="tight") plt.close() @@ -600,10 +548,7 @@ def plot_mse_grid(title): def run_batch( - identifier: str, - kind: str, - metrics: List[str], - dataset_path: str, + identifier: str, kind: str, metrics: List[str], dataset_path: str, use_ray: bool = False ): args = parse_arguments() args.identifier = identifier @@ -613,54 +558,172 @@ def run_batch( args.num_samples = 100 args.timestamp_in_dirname = False args.outcome_model = "auto" # or use "nested" for the old-style nested model + args.components_time_budget = 120 - # os.environ["RAY_ADDRESS"] = "ray://127.0.0.1:8265" - - use_ray = True if use_ray: import ray # Assuming we port-mapped already by running ray dashboard ray.init( - "ray://localhost:10001", runtime_env={"pip": ["causaltune", "catboost"]} - ) # "34.82.184.148:6379" + "ray://localhost:10001", + runtime_env={"working_dir": ".", "pip": ["causaltune", "catboost"]}, + namespace=RAY_NAMESPACE, + ) + out_dir = run_experiment(args, dataset_path=dataset_path, use_ray=use_ray) return out_dir -if __name__ == "__main__": +@ray.remote +def remote_single_run(*args): + return single_run(*args) - args = parse_arguments() - args.identifier = "Egor_test" - args.metrics = supported_metrics("backdoor", False, False) - # run_experiment assumes we don't mix large and small datasets in the same call - args.datasets = ["Large Linear_RCT", "Large NonLinear_RCT"] - args.num_samples = 100 - args.timestamp_in_dirname = False - args.outcome_model = "auto" # or use "nested" for the old-style nested model - use_ray = True - if use_ray: - import ray +@ray.remote +class TaskRunner: + def __init__(self): + self.futures = {} - ray.init() - out_dir = run_experiment(args, dataset_path="../RunDatasets", use_ray=use_ray) + def remote_single_run(self, *args): + ref = remote_single_run.remote(*args) + self.futures[ref.hex()] = ref + return ref.hex() - # plot results - upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} - lower_bounds = {"erupt": 0.06, "bite": 0.75} + def get_results(self): + return ray.get(list(self.futures.values())) - # Determine case from datasets - if any("IV" in dataset for dataset in args.datasets): - case = "IV" - elif any("KC" in dataset for dataset in args.datasets): - case = "KC" - elif any("KCKP" in dataset for dataset in args.datasets): - case = "KCKP" - else: - case = "RCT" - # upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} - # lower_bounds = {"erupt": 0.06, "bite": 0.75} - generate_plots( - os.path.join(out_dir, case), font_size=8 - ) # , upper_bounds=upper_bounds, lower_bounds=lower_bounds) + def get_single_result(self, ref_hex): + return ray.get(self.futures[ref_hex]) + + def is_ready(self, ref_hex): + ready, _ = ray.wait([self.futures[ref_hex]], timeout=0, fetch_local=False) + return bool(ready) + + def all_tasks_ready(self): + _, in_progress = ray.wait(list(self.futures.values()), timeout=0, fetch_local=False) + return not bool(in_progress) + + def get_progress(self): + completed, in_progress = ray.wait( + list(self.futures.values()), num_returns=len(self.futures), timeout=0, fetch_local=False + ) + return len(completed), len(in_progress) + + +def single_run( + dataset_name: str, + cd: CausalityDataset, + metric: str, + test_size: float, + num_samples: int, + components_time_budget: int, + out_dir: str, + out_fn: str, + estimators: List[str], + outcome_model: str = "auto", + i_run: int = 1, +): + + cd_i = copy.deepcopy(cd) + train_df, test_df = train_test_split(cd_i.data, test_size=test_size) + test_df = test_df.reset_index(drop=True) + cd_i.data = train_df + print(f"Optimizing {metric} for {dataset_name} (run {i_run})") + try: + + # Set propensity model using string checking like original version + if "KCKP" in dataset_name: + print(f"Using passthrough propensity model for {dataset_name}") + propensity_model = passthrough_model(cd_i.propensity_modifiers, include_control=False) + elif "KC" in dataset_name: + print(f"Using auto propensity model for {dataset_name}") + propensity_model = "auto" + else: + print(f"Using dummy propensity model for {dataset_name}") + propensity_model = "dummy" + + ct = CausalTune( + metric=metric, + estimator_list=estimators, + num_samples=num_samples, + components_time_budget=components_time_budget, # Use this instead + verbose=1, + components_verbose=1, + store_all_estimators=True, + propensity_model=propensity_model, + outcome_model=outcome_model, + use_ray=False, + ) + + ct.fit( + data=cd_i, + treatment="treatment", + outcome="outcome", + ) + + # Embedding this so it ships well to Ray remotes + + def compute_scores(ct, metric, test_df): + datasets = {"train": ct.train_df, "validation": ct.test_df, "test": test_df} + estimator_scores = {est: [] for est in ct.scores.keys() if "NewDummy" not in est} + + all_scores = [] + for trial in ct.results.trials: + try: + estimator_name = trial.last_result["estimator_name"] + if "estimator" in trial.last_result and trial.last_result["estimator"]: + estimator = trial.last_result["estimator"] + scores = {} + for ds_name, df in datasets.items(): + scores[ds_name] = {} + est_scores = ct.scorer.make_scores( + estimator, + df, + metrics_to_report=ct.metrics_to_report, + ) + est_scores["estimator_name"] = estimator_name + + scores[ds_name]["CATE_estimate"] = np.squeeze( + estimator.estimator.effect(df) + ) + scores[ds_name]["CATE_groundtruth"] = np.squeeze(df["true_effect"]) + est_scores["MSE"] = np.mean( + ( + scores[ds_name]["CATE_estimate"] + - scores[ds_name]["CATE_groundtruth"] + ) + ** 2 + ) + scores[ds_name]["scores"] = est_scores + scores["optimization_score"] = trial.last_result.get("optimization_score") + estimator_scores[estimator_name].append(copy.deepcopy(scores)) + # Will use this in the nex + all_scores.append(scores) + except Exception as e: + print(f"Error processing trial: {e}") + + for k in estimator_scores.keys(): + estimator_scores[k] = sorted( + estimator_scores[k], + key=lambda x: x["validation"]["scores"][metric], + reverse=metric not in metrics_to_minimize(), + ) + + # Debugging: Log final result structure + print(f"Returning scores for metric {metric}: Best estimator: {ct.best_estimator}") + + return { + "best_estimator": ct.best_estimator, + "best_config": ct.best_config, + "best_score": ct.best_score, + "optimised_metric": metric, + "scores_per_estimator": estimator_scores, + "all_scores": all_scores, + } + + # Compute scores and save results + results = compute_scores(ct, metric, test_df) + + return out_fn, results + except Exception as e: + print(f"Error processing {dataset_name}_{metric}_{i_run}: {e}") diff --git a/notebooks/RunExperiments/runners/iv.py b/notebooks/RunExperiments/runners/iv.py index 8e975e8..fde3434 100644 --- a/notebooks/RunExperiments/runners/iv.py +++ b/notebooks/RunExperiments/runners/iv.py @@ -1,12 +1,18 @@ import os +import ray from experiment_runner import run_batch, generate_plots identifier = "Egor_test" kind = "IV" metrics = ["energy_distance", "frobenius_norm", "codec"] +use_ray = False +remote_function = ray.remote(run_batch) +calls = [] -out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +out_dir = run_batch( + identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets"), use_ray=use_ray +) # plot results # upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} # lower_bounds = {"erupt": 0.06, "bite": 0.75} diff --git a/notebooks/RunExperiments/runners/kc.py b/notebooks/RunExperiments/runners/kc.py index 3168719..570c575 100644 --- a/notebooks/RunExperiments/runners/kc.py +++ b/notebooks/RunExperiments/runners/kc.py @@ -15,8 +15,10 @@ "codec", # NEW "bite", # NEW ] - -out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +use_ray = True +out_dir = run_batch( + identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets"), use_ray=use_ray +) # plot results # upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} # lower_bounds = {"erupt": 0.06, "bite": 0.75} diff --git a/notebooks/RunExperiments/runners/kckp.py b/notebooks/RunExperiments/runners/kckp.py index 79955ca..e6ac376 100644 --- a/notebooks/RunExperiments/runners/kckp.py +++ b/notebooks/RunExperiments/runners/kckp.py @@ -15,8 +15,10 @@ "codec", # NEW "bite", # NEW ] - -out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +use_ray = True +out_dir = run_batch( + identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets"), use_ray=use_ray +) # plot results # upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} # lower_bounds = {"erupt": 0.06, "bite": 0.75} diff --git a/notebooks/RunExperiments/runners/rct.py b/notebooks/RunExperiments/runners/rct.py index 797bc69..efc6afb 100644 --- a/notebooks/RunExperiments/runners/rct.py +++ b/notebooks/RunExperiments/runners/rct.py @@ -15,8 +15,10 @@ "codec", # NEW "bite", # NEW ] - -out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +use_ray = True +out_dir = run_batch( + identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets"), use_ray=use_ray +) # plot results # upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} # lower_bounds = {"erupt": 0.06, "bite": 0.75}