-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdata_utils.py
226 lines (189 loc) · 10.9 KB
/
data_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import polars as pl
import os
import json
import plotly.express as px
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder, LabelEncoder
from scipy.stats import zscore
from polars_datatypes import NUMERIC_TYPES, DATA_TYPE_OPTIONS
import ast
from constants import DQ_RULES
# Function to load datasets
def load_datasets(folder_path):
"""Loads CSV file names from the specified folder."""
files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
return files
# Function to load selected dataset
def load_data(file_path, limit=None) -> pl.dataframe:
"""Loads data from the selected CSV file and applies a row limit."""
data = pl.read_parquet(file_path)
if limit:
data = data.head(limit)
return data
# Function to apply filters to the dataset
def apply_filters(df, filters):
"""Applies the user-selected filters to the DataFrame."""
for col, val in filters.items():
if val:
df = df.filter(pl.col(col) == val)
return df
# Function to generate a summary of the dataset
def generate_summary(df: pl.DataFrame):
"""Generates a summary of the DataFrame."""
summary = {
'Number of Rows': len(df),
'Number of Columns': len(df.columns),
'Missing Values': df.null_count().to_pandas().sum().sum(),
'Duplicate Rows': int(df.is_duplicated().sum()/2),
'Memory Usage (MB)': round(df.estimated_size()/ (1024**2), 2)
}
return summary
# Function to display detailed statistics for each column
def detailed_statistics(df: pl.DataFrame):
"""Displays detailed statistics for each column."""
return df.describe()
# Function to generate a column-level summary
def column_summary(df, col):
"""Generates a detailed summary for a single column."""
column = df[col]
dtype = column.dtype
summary = {
'Data Type': dtype,
'Unique Values': column.n_unique(),
'Missing Values': column.is_null().sum(),
'Mean': column.mean() if dtype in NUMERIC_TYPES else 'N/A',
'Median': column.median() if dtype in NUMERIC_TYPES else 'N/A',
'Mode': column.mode()[0] if dtype in NUMERIC_TYPES else 'N/A',
'Standard Deviation': column.std() if dtype in NUMERIC_TYPES else 'N/A',
'Min': column.min() if dtype in NUMERIC_TYPES else 'N/A',
'Max': column.max() if dtype in NUMERIC_TYPES else 'N/A',
}
return summary
def apply_dq_rules(df, rules):
violations = []
for rule in rules:
target_column = rule.target_column
try:
# Check if the target column exists before applying the rule
if target_column not in df.columns:
raise KeyError(f"Column '{target_column}' not found.")
# Define the condition based on the rule type
condition = ""
if rule.rule_type == DQ_RULES.RANGE_CHECK.value:
lambda_func = eval(rule.condition)
condition = pl.when(pl.col(target_column).map_elements(lambda_func)).then(True).otherwise(False)
elif rule.rule_type == DQ_RULES.NULL_CHECK.value:
condition = pl.col(target_column).is_not_null()
elif rule.rule_type == DQ_RULES.UNIQUENESS_CHECK.value:
# Polars uniqueness check is done differently: first, convert to list and check uniqueness
condition = (pl.col(target_column).n_unique() == pl.count())
elif rule.rule_type == DQ_RULES.CUSTOM_LAMBDA.value:
lambda_func = eval(rule.condition)
condition = pl.when(pl.col(target_column).map_elements(lambda_func)).then(True).otherwise(False)
# Applying the condition and checking if any violations exist
if not df.select(pl.when(condition).then(True).otherwise(False)).to_series().all():
violations.append({
'column': target_column,
'message': rule.message,
'severity': rule.severity
})
except KeyError:
# Handle the case where the column was dropped during data manipulation
violations.append({
'column': target_column,
'message': f"Column '{target_column}' not found. It may have been dropped during data manipulation.",
'severity': 'Warning' # Adjust severity as needed
})
except Exception as e:
# Handle any other unexpected errors
violations.append({
'column': target_column,
'message': f"Error applying rule: {str(e)}",
'severity': 'Error'
})
return violations
def apply_operations_to_dataset(dataset, operations):
for operation in operations:
operation_type = operation.operation_type
parameters = json.loads(operation.parameters)
if operation_type == "Rename Column":
dataset = dataset.rename({parameters["old_name"]: parameters["new_name"]})
elif operation_type == "Change Data Type":
dataset = dataset.with_columns(pl.col(parameters["column"]).cast(DATA_TYPE_OPTIONS[parameters['new_type']]))
elif operation_type == "Delete Column":
dataset = dataset.drop(parameters["columns"])
elif operation_type == "Filter Rows":
dataset = dataset.filter(pl.col(parameters["condition"]))
elif operation_type == "Add Calculated Column":
dataset = dataset.with_columns(eval(parameters["formula"], {'__builtins__': None}, dataset.to_dict(False)).alias(parameters["new_column"]))
elif operation_type == "Fill Missing Values":
if parameters["method"] == "Specific Value":
dataset = dataset.with_columns(pl.col(parameters["column"]).fill_null(parameters["value"]))
elif parameters["method"] == "Mean":
dataset = dataset.with_columns(pl.col(parameters["column"]).fill_null(dataset.select(pl.col(parameters["column"]).mean())))
elif parameters["method"] == "Median":
dataset = dataset.with_columns(pl.col(parameters["column"]).fill_null(dataset.select(pl.col(parameters["column"]).median())))
elif parameters["method"] == "Mode":
mode_value = dataset.select(pl.col(parameters["column"]).mode())[0, 0]
dataset = dataset.with_columns(pl.col(parameters["column"]).fill_null(mode_value))
elif operation_type == "Duplicate Column":
dataset = dataset.with_columns(pl.col(parameters["column"]).alias(f"{parameters['column']}_duplicate"))
elif operation_type == "Reorder Columns":
dataset = dataset.select(parameters["new_order"])
elif operation_type == "Replace Values":
col_dtype = dataset.schema[parameters["column"]] # Get the data type of the column
dataset = dataset.with_columns(
pl.when(pl.col(parameters["column"]) == pl.lit(parameters["to_replace"]).cast(col_dtype)) # Cast to column type
.then(pl.lit(parameters["replace_with"]).cast(col_dtype)) # Ensure the replacement value is also casted
.otherwise(pl.col(parameters["column"]))
.alias(parameters["column"]) # Ensure the column is updated with the new values
)
elif operation_type == "Scale Data":
scaler = StandardScaler() if parameters["method"] == "StandardScaler" else MinMaxScaler()
scaled_columns = scaler.fit_transform(dataset.select(parameters["columns"]).to_numpy())
dataset = dataset.with_columns([pl.Series(col, scaled_columns[:, idx]) for idx, col in enumerate(parameters["columns"])])
elif operation_type == "Encode Data":
if parameters["type"] == "OneHotEncoding":
encoder = OneHotEncoder(sparse_output=False, drop='first')
encoded_data = encoder.fit_transform(dataset.select(parameters["columns"]).to_numpy())
encoded_df = pl.DataFrame(encoded_data, schema=encoder.get_feature_names_out(parameters["columns"]).tolist())
dataset = dataset.drop(parameters["columns"]).hstack(encoded_df)
else:
encoder = LabelEncoder()
for col in parameters["columns"]:
dataset = dataset.with_columns(pl.Series(col, encoder.fit_transform(dataset[col].to_numpy())))
elif operation_type == "Impute Missing Values":
for col in parameters["columns"]:
if parameters["method"] == "Mean":
dataset = dataset.with_columns(pl.col(col).fill_null(dataset.select(pl.col(col)).mean()))
elif parameters["method"] == "Median":
dataset = dataset.with_columns(pl.col(col).fill_null(dataset.select(pl.col(col)).median()))
elif parameters["method"] == "Mode":
mode_value = dataset.select(pl.col(col).mode())[0, 0]
dataset = dataset.with_columns(pl.col(col).fill_null(mode_value))
elif operation_type == "Remove Outliers":
if parameters["method"] == "IQR Method":
Q1 = dataset.select(pl.col(parameters["column"]).quantile(0.25)).to_numpy()[0, 0]
Q3 = dataset.select(pl.col(parameters["column"]).quantile(0.75)).to_numpy()[0, 0]
IQR = Q3 - Q1
dataset = dataset.filter((pl.col(parameters["column"]) >= (Q1 - 1.5 * IQR)) & (pl.col(parameters["column"]) <= (Q3 + 1.5 * IQR)))
elif parameters["method"] == "Z-Score Method":
dataset = dataset.filter((pl.col(parameters["column"]) - dataset.select(pl.col(parameters["column"]).mean())) / dataset.select(pl.col(parameters["column"]).std()) < 3)
elif operation_type == "Merge Datasets":
from models import get_db, DatasetOperation, Dataset, DatasetVersion
from sqlalchemy.orm import Session
merge_with = parameters["merge_with"]
merge_column = parameters["join_column"]
join_type = parameters["join_type"]
merge_version_num = parameters["merge_version"]
db: Session = next(get_db())
selected_dataset = db.query(Dataset).filter(Dataset.id == merge_with).first()
selected_version = db.query(DatasetVersion).filter(
DatasetVersion.dataset_id == selected_dataset.id,
DatasetVersion.id == merge_version_num
).first()
selected_data = load_data(selected_version.dataset.filepath)
operations = db.query(DatasetOperation).filter(DatasetOperation.version_id == selected_version.id).all()
if operations:
selected_data = apply_operations_to_dataset(selected_data, operations)
dataset = dataset.join(selected_data, on=merge_column, how=join_type)
return dataset