Skip to content

Commit

Permalink
Merge pull request #2 from vishalmhjn/dev
Browse files Browse the repository at this point in the history
feat: merge major changes
  • Loading branch information
vishalmhjn authored Jun 5, 2024
2 parents 00f732f + 9d26f88 commit d4ee7dc
Show file tree
Hide file tree
Showing 13 changed files with 895 additions and 73 deletions.
35 changes: 1 addition & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,4 @@ Traffic-Waves is a voluntary project focused on daily traffic predictions in Par

## Overview

The project leverages deep learning techniques to analyze historical traffic data and make predictions for daily traffic patterns in Paris. This aids in providing insights for commuters and city planners alike.


### 1. Deployed System on AWS
The system is deployed on AWS, utilizing various services to process and analyze the traffic data efficiently.

![Architecture](assets/aws-traffic.png)

1. __AWS EventBridge Trigger for EC2 Instance Start__

This Lambda function is designed to start an EC2 instance of type t2.large upon receiving an event trigger from AWS EventBridge. The function is intended to automate the process of starting EC2 instances everyday according to specified configurations.

2. __EC2 Instance Bootstrap with Custom User Data__

This documentation outlines the process of bootstrapping an EC2 instance using custom user data. Upon boot, the instance executes a sequence of scripts to set up a Python environment with the necessary dependencies such as Python 3, PyTorch, MLFlow, and Pandas. Additionally, it sequentially runs specific scripts to initialize the environment and perform further configuration.

a) __Paris Traffic Data Aggregator__ : This Python script is designed to aggregate real-time traffic data for specified locations in Paris using the Open Paris API. The aggregated data is then merged into a single DataFrame and uploaded to an Amazon S3 bucket as a CSV file for further analysis.

b) __Data Processing and Upload__: This Python script is designed to run on an EC2 instance to process and upload traffic data to an Amazon S3 bucket. It retrieves data from various CSV files stored in the specified S3 bucket, performs data manipulation and merging operations, and then uploads the processed data back to S3 for further analysis.
* It merges the static attributes data with the real-time traffic data based on the Paris ID.
* Timestamps are converted to a numeric time index for temporal analysis.
* Additional features such as day of the week and hour of the day are extracted from timestamps.
* Missing values in the real-time traffic data are filled using corresponding values from the historical trends data.

c) __Inference Script for Traffic Prediction__: This Python script is designed to run on an EC2 instance for making traffic predictions using a pre-trained machine learning model. The script retrieves preprocessed data from an Amazon S3 bucket, loads the pre-trained model using MLflow, performs inference on the test data, and then uploads the prediction results back to the S3 bucket.

d) __Data Aggregation and Normalization Script__: This Python script is designed to aggregate, normalize, and upload traffic data from various sources to an Amazon S3 bucket. It retrieves real-time traffic data, prediction results from previous days, and prediction results for the current day from the specified S3 bucket, processes and normalizes the data, and then uploads it as a JSON file to another S3 bucket for Web visualization.

e) __Real-time Traffic Visualization Web Application__: A S3 bucket is configured for static website hosting. A HTML file is used for visualizing real-time traffic data using D3.js. The application fetches traffic data from a JSON file stored in the same Amazon S3 bucket, creates dynamic line plots for each detector (Paris ID), and displays them on the web page.
* You can access the daily batch predictions on th [S3 Website](http://traffiq-paris.s3-website.eu-north-1.amazonaws.com/)

3. __AWS EventBridge Trigger for EC2 Instance Stop__

This Lambda function is designed to stop the running EC2 instance upon receiving an event trigger from AWS EventBridge. The function is stop after the daily inference task is complete and automated to avoid unnecessary EC2 costs.
The project leverages ML and DL techniques to analyze historical traffic data and make predictions for daily traffic patterns in Paris. This aids in providing insights for commuters and city planners alike.
40 changes: 40 additions & 0 deletions purged/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Traffic Waves

Traffic-Waves is a voluntary project focused on daily traffic predictions in Paris, utilizing data from [Open Data Paris](https://opendata.paris.fr/explore/dataset/comptages-routiers-permanents/information)

## Overview

The project leverages deep learning techniques to analyze historical traffic data and make predictions for daily traffic patterns in Paris. This aids in providing insights for commuters and city planners alike.


### 1. Deployed System on AWS
The system is deployed on AWS, utilizing various services to process and analyze the traffic data efficiently.

![Architecture](assets/aws-traffic.png)

1. __AWS EventBridge Trigger for EC2 Instance Start__

This Lambda function is designed to start an EC2 instance of type t2.large upon receiving an event trigger from AWS EventBridge. The function is intended to automate the process of starting EC2 instances everyday according to specified configurations.

2. __EC2 Instance Bootstrap with Custom User Data__

This documentation outlines the process of bootstrapping an EC2 instance using custom user data. Upon boot, the instance executes a sequence of scripts to set up a Python environment with the necessary dependencies such as Python 3, PyTorch, MLFlow, and Pandas. Additionally, it sequentially runs specific scripts to initialize the environment and perform further configuration.

a) __Paris Traffic Data Aggregator__ : This Python script is designed to aggregate real-time traffic data for specified locations in Paris using the Open Paris API. The aggregated data is then merged into a single DataFrame and uploaded to an Amazon S3 bucket as a CSV file for further analysis.

b) __Data Processing and Upload__: This Python script is designed to run on an EC2 instance to process and upload traffic data to an Amazon S3 bucket. It retrieves data from various CSV files stored in the specified S3 bucket, performs data manipulation and merging operations, and then uploads the processed data back to S3 for further analysis.
* It merges the static attributes data with the real-time traffic data based on the Paris ID.
* Timestamps are converted to a numeric time index for temporal analysis.
* Additional features such as day of the week and hour of the day are extracted from timestamps.
* Missing values in the real-time traffic data are filled using corresponding values from the historical trends data.

c) __Inference Script for Traffic Prediction__: This Python script is designed to run on an EC2 instance for making traffic predictions using a pre-trained machine learning model. The script retrieves preprocessed data from an Amazon S3 bucket, loads the pre-trained model using MLflow, performs inference on the test data, and then uploads the prediction results back to the S3 bucket.

d) __Data Aggregation and Normalization Script__: This Python script is designed to aggregate, normalize, and upload traffic data from various sources to an Amazon S3 bucket. It retrieves real-time traffic data, prediction results from previous days, and prediction results for the current day from the specified S3 bucket, processes and normalizes the data, and then uploads it as a JSON file to another S3 bucket for Web visualization.

e) __Real-time Traffic Visualization Web Application__: A S3 bucket is configured for static website hosting. A HTML file is used for visualizing real-time traffic data using D3.js. The application fetches traffic data from a JSON file stored in the same Amazon S3 bucket, creates dynamic line plots for each detector (Paris ID), and displays them on the web page.
* You can access the daily batch predictions on th [S3 Website](http://traffiq-paris.s3-website.eu-north-1.amazonaws.com/)

3. __AWS EventBridge Trigger for EC2 Instance Stop__

This Lambda function is designed to stop the running EC2 instance upon receiving an event trigger from AWS EventBridge. The function is stop after the daily inference task is complete and automated to avoid unnecessary EC2 costs.
42 changes: 42 additions & 0 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from flask import Flask, render_template, jsonify

import pandas as pd
from config_interface import *
from frontend import DashboardData

pd.options.mode.chained_assignment = None

dashboard_object = DashboardData(
path_o_t_1=f"../data/processed_data/inference_data_{INFERENCE_INPUT_DATE_FMT}.csv",
path_pt_1=f"../predictions/knn_{INFERENCE_INPUT_DATE_FMT}.csv",
path_pt=f"../predictions/knn_{INFERENCE_PREDICTION_DATE_FMT}.csv",
path_variance=f"../data/variance/df_var_2023.csv",
)

dashboard_object.read_data()

app = Flask(__name__)


@app.route("/")
def index():
return render_template("index.html")


@app.route("/data.json")
def data():
data_asset = dashboard_object.processing_pipeline()

return DashboardData.write_to_json("../frontend/data.json", data_asset)


@app.after_request
def add_header(response):
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
return response


if __name__ == "__main__":
app.run(debug=True)
11 changes: 5 additions & 6 deletions src/call_data_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
import os
from dataclasses import dataclass
import requests
from tqdm import tqdm
import pandas as pd

from utils import setup_logging
from config import URL, LINKS, INFERENCE_DATA_DATE, data_folder
from config_data import URL, LINKS, input_date_formatted, BASE_PATH_DATA

temp_path = data_folder / "raw_data"
temp_path = BASE_PATH_DATA / "raw_data"

logging = setup_logging(file_name="call_data_api.log")

Expand Down Expand Up @@ -65,7 +64,7 @@ def merge_data(self):
df = pd.read_csv(f"{self.read_path}/raw_data_{i}.csv")
df["t_1h"] = pd.to_datetime(df["t_1h"])

if str(df["t_1h"].dt.date.min()) == INFERENCE_DATA_DATE:
if str(df["t_1h"].dt.date.min()) == input_date_formatted:
full_data_list.append(df)
else:
logging.info("Data for %s detector is not available", i)
Expand All @@ -89,7 +88,7 @@ def clean_data(self):

def data_collector(limit=24, offset=0, timezone="Europe/Berlin"):
"""Wrapper fucntion to collect and save the data"""
for link in tqdm(LINKS):
for link in LINKS:

# Define the query parameters
params = {
Expand All @@ -109,7 +108,7 @@ def data_collector(limit=24, offset=0, timezone="Europe/Berlin"):
api_handler.call_open_api()

data_merger = DataMerger(
path=f"{temp_path}/raw_data_{INFERENCE_DATA_DATE}.csv",
path=f"{temp_path}/raw_data_{input_date_formatted}.csv",
list_links=LINKS,
read_path=temp_path,
)
Expand Down
66 changes: 36 additions & 30 deletions src/config.py → src/config_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import datetime, timedelta
import os
from pathlib import Path

# API URL
Expand All @@ -8,35 +7,6 @@
"comptages-routiers-permanents/records"
)

# Previous day's input data i.e., to make predictions for today, we use yesterday's data
INFERENCE_DATA_DATE = (datetime.today() - timedelta(1)).strftime("%Y-%m-%d")

# Path handling

# Define the base paths
data_folder = Path("../data")

# Define the specific paths using the base paths
file_raw_input = data_folder / "raw_data" / f"raw_data_{INFERENCE_DATA_DATE}.csv"
file_train_input = data_folder / "historical_data" / "paris_trunk_june_july.csv"
file_static_attributes = data_folder / "processed_data" / "link_static_attributes.csv"
file_historical_trends = data_folder / "processed_data" / "link_historical_trends.csv"
file_processed_input = (
data_folder / "processed_data" / f"inference_data_{INFERENCE_DATA_DATE}.csv"
)

# column names
list_column_order = [
"time_idx",
"day",
"hour",
"maxspeed",
"length",
"lanes",
"paris_id",
"q",
]

# Network detector IDs used to query the data
LINKS = [
"5169",
Expand Down Expand Up @@ -155,3 +125,39 @@
"5455",
"5456",
]

# Define the base paths
BASE_PATH_DATA = Path("../data")

# column names
LIST_COLUMN_ORDER = [
"time_idx",
"day",
"hour",
"maxspeed",
"length",
"lanes",
"paris_id",
"q",
]


# Previous day's input data i.e., to make predictions for today, we use yesterday's data
input_date = datetime.today() - timedelta(1)
input_date_formatted = input_date.strftime("%Y-%m-%d")
prediction_date = datetime.today()
prediction_date_formatted = prediction_date.strftime("%Y-%m-%d")

# Define the specific paths using the base paths
file_raw_input = BASE_PATH_DATA / "raw_data" / f"raw_data_{input_date_formatted}.csv"
file_train_input = BASE_PATH_DATA / "historical_data" / "paris_trunk_june_july.csv"
file_model_train = BASE_PATH_DATA / "historical_data" / "paris_trunk_june_july.csv"
file_static_attributes = (
BASE_PATH_DATA / "processed_data" / "link_static_attributes.csv"
)
file_historical_trends = (
BASE_PATH_DATA / "processed_data" / "link_historical_trends.csv"
)
file_processed_input = (
BASE_PATH_DATA / "processed_data" / f"inference_data_{input_date_formatted}.csv"
)
86 changes: 86 additions & 0 deletions src/config_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from config_data import file_model_train

TRAINING_PARAMS = {
"metric": "smape",
"training": True,
"data_path": file_model_train,
"model_output_dir": "modeloutput/",
"seed": 46,
"test_proportion": 0.15,
"validation_proportion": 0.15,
"patience": 25,
"train_episodes": 1000,
"batch_size": 512,
}

FORECASTING_PARAMS = {
"lb": 24,
"ph": 24,
}

## month and day are considered as static for the forecasting horizon, short-term forecasting
FEATURE_SETTINGS = {
"dyn_to_static": ["day", "hour"],
"include_occupancy": False,
"dynamic_categorical_features": ["day", "hour"],
"static_categorical_features": [],
"dynamic_continous_features": [
# "speed_kph_mean",
# "speed_kph_stddev",
"q",
],
"static_continous_features": [
"maxspeed",
"lanes",
"length",
],
"other_columns": ["time_idx", "paris_id"],
"occupancy_column": ["k"],
"target_as_autoregressive_feature": ["q"],
"target_column": ["qt"],
}

# Using the CONFIG dictionary
metric = TRAINING_PARAMS["metric"]
training = TRAINING_PARAMS["training"]
data_path = TRAINING_PARAMS["data_path"]
train_episodes = TRAINING_PARAMS["train_episodes"]

dynamic_continous_features = FEATURE_SETTINGS["dynamic_continous_features"]
dynamic_categorical_features = FEATURE_SETTINGS["dynamic_categorical_features"]
static_continous_features = FEATURE_SETTINGS["static_continous_features"]
static_categorical_features = FEATURE_SETTINGS["static_categorical_features"]

continous_features = [*static_continous_features, *dynamic_continous_features]
categorical_features = [*static_categorical_features, *dynamic_categorical_features]

dyn_to_static = FEATURE_SETTINGS["dyn_to_static"]

occupancy_column = FEATURE_SETTINGS["occupancy_column"]
other_columns = FEATURE_SETTINGS["other_columns"]
target_as_autoregressive_feature = FEATURE_SETTINGS["target_as_autoregressive_feature"]
target_column = FEATURE_SETTINGS["target_column"]

if not FEATURE_SETTINGS["include_occupancy"]:
pass
else:
dynamic_continous_features.append(*occupancy_column)

if FEATURE_SETTINGS["include_occupancy"]:
dynamic_features = [
# TODO: ordering as first: categorical and second: continous in the list
# is important for preprocess data fucntion
*dynamic_categorical_features,
*dynamic_continous_features,
*occupancy_column,
*target_column,
]
else:
dynamic_features = [
# TODO: ordering as first: categorical and second: continous in the list
# is important for preprocess data fucntion
*dynamic_categorical_features,
*dynamic_continous_features,
*target_column,
]
static_features = [*static_categorical_features, *static_continous_features]
Loading

0 comments on commit d4ee7dc

Please sign in to comment.