diff --git a/README.md b/README.md index 16fe482..e007a91 100644 --- a/README.md +++ b/README.md @@ -4,37 +4,4 @@ Traffic-Waves is a voluntary project focused on daily traffic predictions in Par ## Overview -The project leverages deep learning techniques to analyze historical traffic data and make predictions for daily traffic patterns in Paris. This aids in providing insights for commuters and city planners alike. - - -### 1. Deployed System on AWS -The system is deployed on AWS, utilizing various services to process and analyze the traffic data efficiently. - -![Architecture](assets/aws-traffic.png) - -1. __AWS EventBridge Trigger for EC2 Instance Start__ - - This Lambda function is designed to start an EC2 instance of type t2.large upon receiving an event trigger from AWS EventBridge. The function is intended to automate the process of starting EC2 instances everyday according to specified configurations. - -2. __EC2 Instance Bootstrap with Custom User Data__ - - This documentation outlines the process of bootstrapping an EC2 instance using custom user data. Upon boot, the instance executes a sequence of scripts to set up a Python environment with the necessary dependencies such as Python 3, PyTorch, MLFlow, and Pandas. Additionally, it sequentially runs specific scripts to initialize the environment and perform further configuration. - - a) __Paris Traffic Data Aggregator__ : This Python script is designed to aggregate real-time traffic data for specified locations in Paris using the Open Paris API. The aggregated data is then merged into a single DataFrame and uploaded to an Amazon S3 bucket as a CSV file for further analysis. - - b) __Data Processing and Upload__: This Python script is designed to run on an EC2 instance to process and upload traffic data to an Amazon S3 bucket. It retrieves data from various CSV files stored in the specified S3 bucket, performs data manipulation and merging operations, and then uploads the processed data back to S3 for further analysis. - * It merges the static attributes data with the real-time traffic data based on the Paris ID. - * Timestamps are converted to a numeric time index for temporal analysis. - * Additional features such as day of the week and hour of the day are extracted from timestamps. - * Missing values in the real-time traffic data are filled using corresponding values from the historical trends data. - - c) __Inference Script for Traffic Prediction__: This Python script is designed to run on an EC2 instance for making traffic predictions using a pre-trained machine learning model. The script retrieves preprocessed data from an Amazon S3 bucket, loads the pre-trained model using MLflow, performs inference on the test data, and then uploads the prediction results back to the S3 bucket. - - d) __Data Aggregation and Normalization Script__: This Python script is designed to aggregate, normalize, and upload traffic data from various sources to an Amazon S3 bucket. It retrieves real-time traffic data, prediction results from previous days, and prediction results for the current day from the specified S3 bucket, processes and normalizes the data, and then uploads it as a JSON file to another S3 bucket for Web visualization. - - e) __Real-time Traffic Visualization Web Application__: A S3 bucket is configured for static website hosting. A HTML file is used for visualizing real-time traffic data using D3.js. The application fetches traffic data from a JSON file stored in the same Amazon S3 bucket, creates dynamic line plots for each detector (Paris ID), and displays them on the web page. - * You can access the daily batch predictions on th [S3 Website](http://traffiq-paris.s3-website.eu-north-1.amazonaws.com/) - -3. __AWS EventBridge Trigger for EC2 Instance Stop__ - - This Lambda function is designed to stop the running EC2 instance upon receiving an event trigger from AWS EventBridge. The function is stop after the daily inference task is complete and automated to avoid unnecessary EC2 costs. \ No newline at end of file +The project leverages ML and DL techniques to analyze historical traffic data and make predictions for daily traffic patterns in Paris. This aids in providing insights for commuters and city planners alike. \ No newline at end of file diff --git a/purged/README.md b/purged/README.md new file mode 100644 index 0000000..16fe482 --- /dev/null +++ b/purged/README.md @@ -0,0 +1,40 @@ +# Traffic Waves + +Traffic-Waves is a voluntary project focused on daily traffic predictions in Paris, utilizing data from [Open Data Paris](https://opendata.paris.fr/explore/dataset/comptages-routiers-permanents/information) + +## Overview + +The project leverages deep learning techniques to analyze historical traffic data and make predictions for daily traffic patterns in Paris. This aids in providing insights for commuters and city planners alike. + + +### 1. Deployed System on AWS +The system is deployed on AWS, utilizing various services to process and analyze the traffic data efficiently. + +![Architecture](assets/aws-traffic.png) + +1. __AWS EventBridge Trigger for EC2 Instance Start__ + + This Lambda function is designed to start an EC2 instance of type t2.large upon receiving an event trigger from AWS EventBridge. The function is intended to automate the process of starting EC2 instances everyday according to specified configurations. + +2. __EC2 Instance Bootstrap with Custom User Data__ + + This documentation outlines the process of bootstrapping an EC2 instance using custom user data. Upon boot, the instance executes a sequence of scripts to set up a Python environment with the necessary dependencies such as Python 3, PyTorch, MLFlow, and Pandas. Additionally, it sequentially runs specific scripts to initialize the environment and perform further configuration. + + a) __Paris Traffic Data Aggregator__ : This Python script is designed to aggregate real-time traffic data for specified locations in Paris using the Open Paris API. The aggregated data is then merged into a single DataFrame and uploaded to an Amazon S3 bucket as a CSV file for further analysis. + + b) __Data Processing and Upload__: This Python script is designed to run on an EC2 instance to process and upload traffic data to an Amazon S3 bucket. It retrieves data from various CSV files stored in the specified S3 bucket, performs data manipulation and merging operations, and then uploads the processed data back to S3 for further analysis. + * It merges the static attributes data with the real-time traffic data based on the Paris ID. + * Timestamps are converted to a numeric time index for temporal analysis. + * Additional features such as day of the week and hour of the day are extracted from timestamps. + * Missing values in the real-time traffic data are filled using corresponding values from the historical trends data. + + c) __Inference Script for Traffic Prediction__: This Python script is designed to run on an EC2 instance for making traffic predictions using a pre-trained machine learning model. The script retrieves preprocessed data from an Amazon S3 bucket, loads the pre-trained model using MLflow, performs inference on the test data, and then uploads the prediction results back to the S3 bucket. + + d) __Data Aggregation and Normalization Script__: This Python script is designed to aggregate, normalize, and upload traffic data from various sources to an Amazon S3 bucket. It retrieves real-time traffic data, prediction results from previous days, and prediction results for the current day from the specified S3 bucket, processes and normalizes the data, and then uploads it as a JSON file to another S3 bucket for Web visualization. + + e) __Real-time Traffic Visualization Web Application__: A S3 bucket is configured for static website hosting. A HTML file is used for visualizing real-time traffic data using D3.js. The application fetches traffic data from a JSON file stored in the same Amazon S3 bucket, creates dynamic line plots for each detector (Paris ID), and displays them on the web page. + * You can access the daily batch predictions on th [S3 Website](http://traffiq-paris.s3-website.eu-north-1.amazonaws.com/) + +3. __AWS EventBridge Trigger for EC2 Instance Stop__ + + This Lambda function is designed to stop the running EC2 instance upon receiving an event trigger from AWS EventBridge. The function is stop after the daily inference task is complete and automated to avoid unnecessary EC2 costs. \ No newline at end of file diff --git a/src/app.py b/src/app.py new file mode 100644 index 0000000..de3a7fe --- /dev/null +++ b/src/app.py @@ -0,0 +1,42 @@ +from flask import Flask, render_template, jsonify + +import pandas as pd +from config_interface import * +from frontend import DashboardData + +pd.options.mode.chained_assignment = None + +dashboard_object = DashboardData( + path_o_t_1=f"../data/processed_data/inference_data_{INFERENCE_INPUT_DATE_FMT}.csv", + path_pt_1=f"../predictions/knn_{INFERENCE_INPUT_DATE_FMT}.csv", + path_pt=f"../predictions/knn_{INFERENCE_PREDICTION_DATE_FMT}.csv", + path_variance=f"../data/variance/df_var_2023.csv", +) + +dashboard_object.read_data() + +app = Flask(__name__) + + +@app.route("/") +def index(): + return render_template("index.html") + + +@app.route("/data.json") +def data(): + data_asset = dashboard_object.processing_pipeline() + + return DashboardData.write_to_json("../frontend/data.json", data_asset) + + +@app.after_request +def add_header(response): + response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate" + response.headers["Pragma"] = "no-cache" + response.headers["Expires"] = "0" + return response + + +if __name__ == "__main__": + app.run(debug=True) diff --git a/src/call_data_api.py b/src/call_data_api.py index ed65b34..3017e51 100644 --- a/src/call_data_api.py +++ b/src/call_data_api.py @@ -7,13 +7,12 @@ import os from dataclasses import dataclass import requests -from tqdm import tqdm import pandas as pd from utils import setup_logging -from config import URL, LINKS, INFERENCE_DATA_DATE, data_folder +from config_data import URL, LINKS, input_date_formatted, BASE_PATH_DATA -temp_path = data_folder / "raw_data" +temp_path = BASE_PATH_DATA / "raw_data" logging = setup_logging(file_name="call_data_api.log") @@ -65,7 +64,7 @@ def merge_data(self): df = pd.read_csv(f"{self.read_path}/raw_data_{i}.csv") df["t_1h"] = pd.to_datetime(df["t_1h"]) - if str(df["t_1h"].dt.date.min()) == INFERENCE_DATA_DATE: + if str(df["t_1h"].dt.date.min()) == input_date_formatted: full_data_list.append(df) else: logging.info("Data for %s detector is not available", i) @@ -89,7 +88,7 @@ def clean_data(self): def data_collector(limit=24, offset=0, timezone="Europe/Berlin"): """Wrapper fucntion to collect and save the data""" - for link in tqdm(LINKS): + for link in LINKS: # Define the query parameters params = { @@ -109,7 +108,7 @@ def data_collector(limit=24, offset=0, timezone="Europe/Berlin"): api_handler.call_open_api() data_merger = DataMerger( - path=f"{temp_path}/raw_data_{INFERENCE_DATA_DATE}.csv", + path=f"{temp_path}/raw_data_{input_date_formatted}.csv", list_links=LINKS, read_path=temp_path, ) diff --git a/src/config.py b/src/config_data.py similarity index 71% rename from src/config.py rename to src/config_data.py index adb19d6..ab4d5bb 100644 --- a/src/config.py +++ b/src/config_data.py @@ -1,5 +1,4 @@ from datetime import datetime, timedelta -import os from pathlib import Path # API URL @@ -8,35 +7,6 @@ "comptages-routiers-permanents/records" ) -# Previous day's input data i.e., to make predictions for today, we use yesterday's data -INFERENCE_DATA_DATE = (datetime.today() - timedelta(1)).strftime("%Y-%m-%d") - -# Path handling - -# Define the base paths -data_folder = Path("../data") - -# Define the specific paths using the base paths -file_raw_input = data_folder / "raw_data" / f"raw_data_{INFERENCE_DATA_DATE}.csv" -file_train_input = data_folder / "historical_data" / "paris_trunk_june_july.csv" -file_static_attributes = data_folder / "processed_data" / "link_static_attributes.csv" -file_historical_trends = data_folder / "processed_data" / "link_historical_trends.csv" -file_processed_input = ( - data_folder / "processed_data" / f"inference_data_{INFERENCE_DATA_DATE}.csv" -) - -# column names -list_column_order = [ - "time_idx", - "day", - "hour", - "maxspeed", - "length", - "lanes", - "paris_id", - "q", -] - # Network detector IDs used to query the data LINKS = [ "5169", @@ -155,3 +125,39 @@ "5455", "5456", ] + +# Define the base paths +BASE_PATH_DATA = Path("../data") + +# column names +LIST_COLUMN_ORDER = [ + "time_idx", + "day", + "hour", + "maxspeed", + "length", + "lanes", + "paris_id", + "q", +] + + +# Previous day's input data i.e., to make predictions for today, we use yesterday's data +input_date = datetime.today() - timedelta(1) +input_date_formatted = input_date.strftime("%Y-%m-%d") +prediction_date = datetime.today() +prediction_date_formatted = prediction_date.strftime("%Y-%m-%d") + +# Define the specific paths using the base paths +file_raw_input = BASE_PATH_DATA / "raw_data" / f"raw_data_{input_date_formatted}.csv" +file_train_input = BASE_PATH_DATA / "historical_data" / "paris_trunk_june_july.csv" +file_model_train = BASE_PATH_DATA / "historical_data" / "paris_trunk_june_july.csv" +file_static_attributes = ( + BASE_PATH_DATA / "processed_data" / "link_static_attributes.csv" +) +file_historical_trends = ( + BASE_PATH_DATA / "processed_data" / "link_historical_trends.csv" +) +file_processed_input = ( + BASE_PATH_DATA / "processed_data" / f"inference_data_{input_date_formatted}.csv" +) diff --git a/src/config_model.py b/src/config_model.py new file mode 100644 index 0000000..cc843de --- /dev/null +++ b/src/config_model.py @@ -0,0 +1,86 @@ +from config_data import file_model_train + +TRAINING_PARAMS = { + "metric": "smape", + "training": True, + "data_path": file_model_train, + "model_output_dir": "modeloutput/", + "seed": 46, + "test_proportion": 0.15, + "validation_proportion": 0.15, + "patience": 25, + "train_episodes": 1000, + "batch_size": 512, +} + +FORECASTING_PARAMS = { + "lb": 24, + "ph": 24, +} + +## month and day are considered as static for the forecasting horizon, short-term forecasting +FEATURE_SETTINGS = { + "dyn_to_static": ["day", "hour"], + "include_occupancy": False, + "dynamic_categorical_features": ["day", "hour"], + "static_categorical_features": [], + "dynamic_continous_features": [ + # "speed_kph_mean", + # "speed_kph_stddev", + "q", + ], + "static_continous_features": [ + "maxspeed", + "lanes", + "length", + ], + "other_columns": ["time_idx", "paris_id"], + "occupancy_column": ["k"], + "target_as_autoregressive_feature": ["q"], + "target_column": ["qt"], +} + +# Using the CONFIG dictionary +metric = TRAINING_PARAMS["metric"] +training = TRAINING_PARAMS["training"] +data_path = TRAINING_PARAMS["data_path"] +train_episodes = TRAINING_PARAMS["train_episodes"] + +dynamic_continous_features = FEATURE_SETTINGS["dynamic_continous_features"] +dynamic_categorical_features = FEATURE_SETTINGS["dynamic_categorical_features"] +static_continous_features = FEATURE_SETTINGS["static_continous_features"] +static_categorical_features = FEATURE_SETTINGS["static_categorical_features"] + +continous_features = [*static_continous_features, *dynamic_continous_features] +categorical_features = [*static_categorical_features, *dynamic_categorical_features] + +dyn_to_static = FEATURE_SETTINGS["dyn_to_static"] + +occupancy_column = FEATURE_SETTINGS["occupancy_column"] +other_columns = FEATURE_SETTINGS["other_columns"] +target_as_autoregressive_feature = FEATURE_SETTINGS["target_as_autoregressive_feature"] +target_column = FEATURE_SETTINGS["target_column"] + +if not FEATURE_SETTINGS["include_occupancy"]: + pass +else: + dynamic_continous_features.append(*occupancy_column) + +if FEATURE_SETTINGS["include_occupancy"]: + dynamic_features = [ + # TODO: ordering as first: categorical and second: continous in the list + # is important for preprocess data fucntion + *dynamic_categorical_features, + *dynamic_continous_features, + *occupancy_column, + *target_column, + ] +else: + dynamic_features = [ + # TODO: ordering as first: categorical and second: continous in the list + # is important for preprocess data fucntion + *dynamic_categorical_features, + *dynamic_continous_features, + *target_column, + ] +static_features = [*static_categorical_features, *static_continous_features] diff --git a/src/dataset.py b/src/dataset.py new file mode 100644 index 0000000..a6450e4 --- /dev/null +++ b/src/dataset.py @@ -0,0 +1,227 @@ +from abc import ABC, abstractmethod +import random +from itertools import groupby +from operator import itemgetter +from typing import List, Tuple +import numpy as np +import pandas as pd + +import joblib +from sklearn.preprocessing import StandardScaler + + +class Dataset: + def __init__(self, path) -> None: + self.path = path + self.df = self.read_data() + super().__init__() + + def read_data(self): + df = pd.read_csv(self.path) + return df + + @property + def get_groups(self): + return list(self.df.paris_id.unique()) + + +class DataSplitter(Dataset): + def __init__(self, path) -> None: + super().__init__(path) + + def split_groups(self, seed, val_prop, test_prop): + + len_groups = len(self.get_groups) + + n_test = int(test_prop * len_groups) + self.test_ids = list(random.sample(self.get_groups, n_test)) + non_test_ids = [] + + random.seed(seed) + for i in self.get_groups: + if i not in self.test_ids: + non_test_ids.append(i) + + self.val_ids = list(random.sample(non_test_ids, int(val_prop * len_groups))) + new_train_ids = [] + for i in non_test_ids: + if i not in self.val_ids: + new_train_ids.append(i) + self.train_ids = new_train_ids + + def split_data(self): + train = self.df[self.df.paris_id.isin(self.train_ids)] + test = self.df[self.df.paris_id.isin(self.test_ids)] + val = self.df[self.df.paris_id.isin(self.val_ids)] + return train, test, val + + +class DataScaler(ABC): + def __init__(self) -> None: + super().__init__() + + @abstractmethod + def scaler_fit(self): + pass + + @abstractmethod + def scaler_transform(self): + pass + + @abstractmethod + def save_scaler(self): + pass + + @abstractmethod + def load_scaler(self): + pass + + +class TimeSeriesScaler(DataScaler): + + def __init__( + self, + continous_features, + categorical_features, + other_columns, + original_target_column, + duplicated_target_column, + ) -> None: + self.continous_features = continous_features + self.categorical_features = categorical_features + self.other_columns = other_columns + self.original_target_column = original_target_column + self.duplicated_target_column = duplicated_target_column + super().__init__() + + def copy_target_column(self, _df): + _df.loc[_df.index, self.duplicated_target_column] = _df[ + self.original_target_column + ] + return _df + + def scaler_fit(self, scaler_type, X): + if scaler_type == "minmax": + self.scaler = StandardScaler() + else: + raise NotImplementedError + self.scaler.fit(X[self.continous_features].values) + return self.scaler + + def scaler_transform(self, X): + scaled_features = self.scaler.transform(X[self.continous_features].values) + scaled_features_df = pd.DataFrame( + scaled_features, index=X.index, columns=self.continous_features + ) + scaled_features_df[self.categorical_features] = X[self.categorical_features] + scaled_features_df[self.other_columns] = X[self.other_columns] + # Assign 'q' column to scaled DataFrame + scaled_features_df[self.duplicated_target_column] = X[ + self.original_target_column + ] + return scaled_features_df + + def save_scaler(self, path): + joblib.dump(self.scaler, path) + + def load_scaler(self, path): + scaler = joblib.load(path) + self.scaler = scaler + + +class TimeSeriesFormatter: + + def __init__( + self, + lookback_timesteps, + prediction_horizon, + features_static, + features_dynamic, + auto_regressive, + inference, + ) -> None: + self.lb = lookback_timesteps + self.ph = prediction_horizon + self.static_fs = features_static + self.dynamic_fs = features_dynamic + self.auto_regressive = auto_regressive + self.inference = inference + + @staticmethod + def reshape_x(X, W=None, use_static=False): + X_reshaped = np.reshape(X, (X.shape[0], -1)) + if use_static: + X_reshaped = np.hstack((W, X_reshaped)) + return X_reshaped + + def split_sequences( + self, + sequences: np.ndarray, + static: np.ndarray, + id_det: int, + ): + W, X, y, z = list(), list(), list(), list() + + if self.inference: + indices = [0] + else: + indices = range(len(sequences)) + + for i in indices: + end_ix = i + self.lb + + if not self.inference and (end_ix + self.ph > len(sequences)): + break + + if not self.auto_regressive: + seq_x = sequences[i:end_ix, :-1] + else: + seq_x = sequences[i:end_ix, :] + + X.append(seq_x) + W.append(static) + z.append(id_det) + + if not self.inference: + seq_y = sequences[end_ix : end_ix + self.ph, -1] + y.append(seq_y) + + if self.inference: + return np.array(W), np.array(X), np.array(z) + else: + return np.array(W), np.array(X), np.array(y), np.array(z) + + def format_data(self, df): + + W_list, X_list, y_list, z_list = list(), list(), list(), list() + for i in df.paris_id.unique(): + temp = df[df.paris_id == i] + temp = temp.sort_values(by="time_idx") + temp.index = temp.time_idx + + w = np.array(temp[self.static_fs].drop_duplicates())[0] + for k, g in groupby(enumerate(list(temp.index)), lambda ix: ix[0] - ix[1]): + temp_list = list(map(itemgetter(1), g)) + + if len(temp_list) >= self.lb: + temp_df = temp.loc[temp_list, self.dynamic_fs] + if self.inference: + W, X, z = self.split_sequences(np.array(temp_df), w, i) + W_list.extend(W) + X_list.extend(X) + z_list.extend(z) + else: + W, X, y, z = self.split_sequences(np.array(temp_df), w, i) + W_list.extend(W) + X_list.extend(X) + y_list.extend(y) + z_list.extend(z) + if self.inference: + return np.array(W_list), np.array(X_list), np.array(z_list) + else: + return ( + np.array(W_list), + np.array(X_list), + np.array(y_list), + np.array(z_list), + ) diff --git a/src/frontend.py b/src/frontend.py new file mode 100644 index 0000000..d10a0c0 --- /dev/null +++ b/src/frontend.py @@ -0,0 +1,135 @@ +from abc import ABC, abstractmethod + +import simplejson +import json +from flask import jsonify +import math +import numpy as np +import pandas as pd + +from config_data import prediction_date, input_date + + +class PlotFormatting(ABC): + def __init__(self) -> None: + super().__init__() + + @abstractmethod + def read_data(): + pass + + +class DashboardData(PlotFormatting): + _prediction_date = prediction_date + _input_date = input_date + + def __init__(self, path_pt, path_pt_1, path_o_t_1, path_variance=None) -> None: + self.path_predictions_t = path_pt + self.path_predictions_t_1 = path_pt_1 + self.path_observed_t_1 = path_o_t_1 + self.path_variance = path_variance + super().__init__() + self.set_dates() + + @staticmethod + def create_date_strings(): + current_date = DashboardData._prediction_date.strftime("%d-%m-%Y") + previous_date = DashboardData._input_date.strftime("%d-%m-%Y") + return previous_date, current_date + + @classmethod + def set_dates(cls): + cls.previous_date, cls.current_date = cls.create_date_strings() + + def read_data(self): + self.df_observed_t_1 = pd.read_csv(self.path_observed_t_1) + self.df_predictions_t_1 = pd.read_csv(self.path_predictions_t_1) + self.df_predictions_t = pd.read_csv(self.path_predictions_t) + if self.path_variance != None: + self.df_variance = pd.read_csv(self.path_variance) + + @staticmethod + def filter_df(df, i): + return df[df.paris_id == i] + + def get_time_and_target(self, filtered_df, col_name, date): + filtered_df.loc[:, "new_time_idx"] = filtered_df["time_idx"].apply( + lambda idx: f"{date}:{idx:02d}" + ) + time_idx = filtered_df["new_time_idx"].tolist() + target_value = filtered_df[col_name].tolist() + return time_idx, target_value + + @staticmethod + def get_variance(df): + var_t_1 = var_t = df["q"].tolist() + var_t.extend(var_t_1) + return var_t + + @staticmethod + def create_confidence_intervals(predictions, variance): + ci_upper = list(np.array(predictions) + np.array(variance)) + ci_lower = list(np.array(predictions) - np.array(variance)) + + ci_upper = [int(np.maximum(x, 0)) if not math.isnan(x) else x for x in ci_upper] + ci_lower = [int(np.maximum(x, 0)) if not math.isnan(x) else x for x in ci_lower] + return ci_lower, ci_upper + + def processing_pipeline(self): + data = [] + for paris_id in self.df_observed_t_1["paris_id"].unique(): + + try: + temp_o = DashboardData.filter_df(self.df_observed_t_1, paris_id) + temp_p_t = DashboardData.filter_df(self.df_predictions_t, paris_id) + temp_p_t_1 = DashboardData.filter_df(self.df_predictions_t_1, paris_id) + temp_var = DashboardData.filter_df(self.df_variance, paris_id) + + time_idx_o, q_o = self.get_time_and_target( + temp_o, "q_real", DashboardData.previous_date + ) + time_idx_p_t_1, q_p_t_1 = self.get_time_and_target( + temp_p_t_1, "preds", DashboardData.previous_date + ) + time_idx_p_t, q_p_t = self.get_time_and_target( + temp_p_t, "preds", DashboardData.current_date + ) + + time_idx_p_t_1.extend(time_idx_p_t) + q_p_t_1.extend(q_p_t) + + q_o = DashboardData.array_to_list(q_o) + q_p_t_1 = DashboardData.array_to_list(q_p_t_1) + + if self.path_variance != None: + var_preditctions = DashboardData.get_variance(temp_var) + ci_lower, ci_upper = DashboardData.create_confidence_intervals( + q_p_t_1, var_preditctions + ) + + data.append( + { + "paris_id": int(paris_id), + "real_time_idx": time_idx_o, + "real_q": q_o, + "predictions_time_idx": time_idx_p_t_1, + "predictions_preds": q_p_t_1, + "lower_bound": ci_lower, + "upper_bound": ci_upper, + } + ) + except ValueError: + pass + return data + + @staticmethod + def array_to_list(X): + return [int(x) if not math.isnan(x) else x for x in X] + + @staticmethod + def write_to_json(path, data): + with open(path, "w") as json_file: + normalized_str = simplejson.dumps(data, ignore_nan=True) + data = simplejson.loads(normalized_str) + json.dump(data, json_file) # , ignore_nan=True) + return jsonify(data) diff --git a/src/models.py b/src/models.py new file mode 100644 index 0000000..55b1eb5 --- /dev/null +++ b/src/models.py @@ -0,0 +1,119 @@ +from abc import ABC, abstractmethod +from typing import Any + +import numpy as np +import pickle + +from sklearn.neighbors import KNeighborsRegressor +from sklearn.metrics import mean_squared_error +from sklearn.model_selection import cross_val_score, RepeatedKFold + +import xgboost as xgb + + +class Model(ABC): + + @abstractmethod + def train_model(self): + pass + + @abstractmethod + def predict_model(self): + pass + + @abstractmethod + def cross_validation(self): + pass + + @abstractmethod + def save_model(self): + pass + + @abstractmethod + def load_model(self): + pass + + +class KNNModel(Model): + + def __init__(self, **kwargs) -> None: + self.model = KNeighborsRegressor(**kwargs) + + def train_model(self, X, y): + self.model.fit(X, y) + return self.model + + def predict_model(self, X): + return self.model.predict(X) + + def cross_validation(self, X, y, lower_k, upper_k): + k_values = list(range(lower_k, upper_k)) + cv_scores = [ + np.mean( + cross_val_score( + KNeighborsRegressor( + n_neighbors=k, weights="distance", algorithm="auto", p=2 + ), + X, + y, + cv=5, + verbose=3, + scoring="neg_root_mean_squared_error", + ) + ) + for k in k_values + ] + optimal_k = k_values[np.argmax(cv_scores)] + return optimal_k + + def save_model(self, path): + saved_model = open(path, "wb") + pickle.dump(self.model, saved_model) + saved_model.close() + + def load_model(self, path): + self.model = pickle.load(open(path, "rb")) + + +class XGBoostModel(Model): + + def __init__(self, **kwargs) -> None: + self.model = xgb.XGBRegressor(**kwargs) + + def train_model(self, X, y): + self.model.fit( + X, + y, + verbose=2, + ) + return self.model + + def predict_model(self, X): + return self.model.predict(X) + + def cross_validation(self, X, y): + cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=1) + self.cv_mocel = cross_val_score( + self.model, X, y, scoring="neg_mean_absolute_error", cv=cv, n_jobs=-1 + ) + return self.cv_model + + def save_model(self, path): + saved_model = open(path, "wb") + pickle.dump(self.model, saved_model) + saved_model.close() + + def load_model(self, path): + self.model = pickle.load(open(path, "rb")) + + +class EvaluationMetrics: + def __init__(self, y, y_hat) -> None: + self.y = y + self.y_hat = y_hat + + def mse(self): + return mean_squared_error(self.y, self.y_hat) + + def rmse(self): + return np.sqrt(self.mse()) diff --git a/src/predict.py b/src/predict.py new file mode 100644 index 0000000..00a9d3f --- /dev/null +++ b/src/predict.py @@ -0,0 +1,69 @@ +from pathlib import Path +import argparse + +from config_model import FORECASTING_PARAMS +from config_model import ( + continous_features, + categorical_features, + other_columns, + target_as_autoregressive_feature, + target_column, + static_features, + dynamic_features, +) +from config_data import prediction_date_formatted, file_processed_input + +from models import KNNModel, XGBoostModel +from dataset import DataSplitter, TimeSeriesScaler, TimeSeriesFormatter +from utils import setup_logging, predicitons_to_df + +lb, ph = (FORECASTING_PARAMS["lb"], FORECASTING_PARAMS["ph"]) + +parser = argparse.ArgumentParser() +parser.add_argument( + "-m", + "--model", + help="type of machine learning model", + choices=["knn", "xgboost"], + default="knn", +) +args = parser.parse_args() + +# Set up logging +logging = setup_logging("predict.log") + +if __name__ == "__main__": + + data_object = DataSplitter(file_processed_input) + X_formatted = data_object.df + + time_series_object = TimeSeriesScaler( + continous_features, + categorical_features, + other_columns, + target_as_autoregressive_feature, + target_column, + ) + scaler = time_series_object.load_scaler("artifacts/minmax_scaler.gz") + scaled_test = time_series_object.scaler_transform(X_formatted) + + series_formatter_obj = TimeSeriesFormatter( + lb, ph, static_features, dynamic_features, True, True + ) + + W_test, X_test, z_test = series_formatter_obj.format_data(scaled_test) + X_test = TimeSeriesFormatter.reshape_x(X_test) + + if args.model == "knn": + traffic_model = KNNModel() + traffic_model.load_model(f"artifacts/{args.model}_model") + elif args.model == "xgboost": + traffic_model = XGBoostModel() + traffic_model.load_model(f"artifacts/{args.model}_model") + + y_test_hat = traffic_model.predict_model(X_test) + + df_test = predicitons_to_df(ph, z_test, y_test_hat) + df_test.to_csv( + Path("..") / "predictions" / f"knn_{prediction_date_formatted}.csv", index=False + ) diff --git a/src/process_data.py b/src/process_data.py index 711f960..ad713a2 100644 --- a/src/process_data.py +++ b/src/process_data.py @@ -6,12 +6,12 @@ from utils import setup_logging -from config import ( +from config_data import ( file_static_attributes, file_train_input, file_historical_trends, file_raw_input, - list_column_order, + LIST_COLUMN_ORDER, file_processed_input, ) @@ -113,7 +113,7 @@ def fill_missing_values(_df): file_static_attributes, file_raw_input, file_historical_trends, - list_column_order, + LIST_COLUMN_ORDER, ) merged_df = fill_missing_values(df) diff --git a/src/train.py b/src/train.py new file mode 100644 index 0000000..06a1bc5 --- /dev/null +++ b/src/train.py @@ -0,0 +1,119 @@ +import argparse + +from config_model import TRAINING_PARAMS, FORECASTING_PARAMS +from config_model import ( + continous_features, + categorical_features, + other_columns, + target_as_autoregressive_feature, + target_column, + static_features, + dynamic_features, +) +from config_data import prediction_date_formatted +from utils import setup_logging +from models import KNNModel, EvaluationMetrics, XGBoostModel +from dataset import DataSplitter, TimeSeriesScaler, TimeSeriesFormatter + +# Set up logging +logging = setup_logging("train.log") + + +parser = argparse.ArgumentParser() +parser.add_argument( + "-m", + "--model", + help="type of machine learning model", + choices=["knn", "xgboost"], + default="knn", +) +args = parser.parse_args() + +if __name__ == "__main__": + + data_object = DataSplitter(TRAINING_PARAMS["data_path"]) + X_formatted = data_object.df + + for lb, ph in [(FORECASTING_PARAMS["lb"], FORECASTING_PARAMS["ph"])]: + det_ids = data_object.get_groups + + seed = TRAINING_PARAMS["seed"] + validation_prop = TRAINING_PARAMS["validation_proportion"] + test_prop = TRAINING_PARAMS["test_proportion"] + + data_object.split_groups(seed, validation_prop, test_prop) + + X_formatted_train, X_formatted_val, X_formatted_test = data_object.split_data() + + time_series_object = TimeSeriesScaler( + continous_features, + categorical_features, + other_columns, + target_as_autoregressive_feature, + target_column, + ) + + (X_formatted_train, X_formatted_val, X_formatted_test) = [ + time_series_object.copy_target_column(df) + for df in (X_formatted_train, X_formatted_val, X_formatted_test) + ] + + scaler = time_series_object.scaler_fit("minmax", X_formatted_train) + + (scaled_train, scaled_val, scaled_test) = [ + time_series_object.scaler_transform(df) + for df in (X_formatted_train, X_formatted_val, X_formatted_test) + ] + + series_formatter_obj = TimeSeriesFormatter( + lb, ph, static_features, dynamic_features, True, False + ) + + W_train, X_train, y_train, z_train = series_formatter_obj.format_data( + scaled_train + ) + W_val, X_val, y_val, z_val = series_formatter_obj.format_data(scaled_val) + + W_test, X_test, y_test, z_test = series_formatter_obj.format_data(scaled_test) + + logging.info(f"Column order: {scaled_train.columns}") + + lookback_timesteps = lb + prediction_horizon = ph + + X_train = TimeSeriesFormatter.reshape_x(X_train) + X_val = TimeSeriesFormatter.reshape_x(X_val) + X_test = TimeSeriesFormatter.reshape_x(X_test) + + if args.model == "knn": + optimal_k = 2 + traffic_model = KNNModel( + n_neighbors=optimal_k, weights="uniform", algorithm="kd_tree", p=2 + ) + elif args.model == "xgboost": + traffic_model = XGBoostModel( + n_estimators=300, + max_depth=5, + eta=0.1, + subsample=0.7, + colsample_bytree=0.8, + ) + else: + pass + + traffic_model.train_model(X_train, y_train) + + y_train_hat = traffic_model.predict_model(X_train) + train_rmse = EvaluationMetrics(y_train, y_train_hat).rmse() + # logging.info("RMSE on Train Set:", train_rmse) + + y_val_hat = traffic_model.predict_model(X_val) + val_rmse = EvaluationMetrics(y_val, y_val_hat).rmse() + logging.info(f"RMSE on Validation Set: {val_rmse}") + + y_test_hat = traffic_model.predict_model(X_test) + test_rmse = EvaluationMetrics(y_test, y_test_hat).rmse() + logging.info(f"RMSE on Test Set: {test_rmse}") + + traffic_model.save_model(f"artifacts/{args.model}_model") + time_series_object.save_scaler("artifacts/minmax_scaler.gz") diff --git a/src/utils.py b/src/utils.py index 8f64c4d..e7af2c1 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,4 +1,8 @@ import logging +from itertools import repeat + +import numpy as np +import pandas as pd def setup_logging(file_name="logfile.log"): @@ -11,3 +15,12 @@ def setup_logging(file_name="logfile.log"): filemode="w", ) return logging + + +def predicitons_to_df(ph, z_test, y_test_hat): + df_test = pd.DataFrame({"paris_id": [x for x in z_test for _ in repeat(None, ph)]}) + + df_test["time_idx"] = np.tile(np.arange(ph), len(z_test)) + df_test["preds"] = np.ravel(y_test_hat) + df_test["preds"] = df_test["preds"].astype(int) + return df_test