Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jalencato committed Oct 31, 2023
1 parent 51de128 commit b3ddb9d
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ def _convert_feature(feats: list[dict]) -> list[dict]:
if gconstruct_transform_dict["name"] == "max_min_norm":
gsp_transformation_dict["name"] = "numerical"
gsp_transformation_dict["kwargs"] = {"normalizer": "min-max", "imputer": "mean"}
elif gconstruct_transform_dict["name"] == "bucket_numerical":
elif gconstruct_transform_dict["name"] == "bucket-numerical":
gsp_transformation_dict["name"] = "numerical"
assert "bucket_cnt" in gconstruct_transform_dict, \
"bucket_cnt should be in the gconstruct bucket feature transform field"
assert "range" in gconstruct_transform_dict, \
"range should be in the gconstruct bucket feature transform field"
gsp_transformation_dict["kwargs"] = {"normalizer": "bucket_numerical",
gsp_transformation_dict["kwargs"] = {"normalizer": "bucket-numerical",
"bucket_cnt": gconstruct_transform_dict['bucket_cnt'],
"range": gconstruct_transform_dict['range'],
"slide_window_size": gconstruct_transform_dict['slide_window_size'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from graphstorm_processing.constants import SUPPORTED_FILE_TYPES
from .label_config_base import LabelConfig, EdgeLabelConfig, NodeLabelConfig
from .feature_config_base import FeatureConfig, NoopFeatureConfig
from .numerical_configs import MultiNumericalFeatureConfig, NumericalFeatureConfig
from .numerical_configs import (MultiNumericalFeatureConfig,
NumericalFeatureConfig,
BucketFeatureConfig)
from .data_config_base import DataStorageConfig


Expand Down Expand Up @@ -56,6 +58,8 @@ def parse_feat_config(feature_dict: Dict) -> FeatureConfig:
return NumericalFeatureConfig(feature_dict)
elif transformation_name == "multi-numerical":
return MultiNumericalFeatureConfig(feature_dict)
elif transformation_name == "bucket-numerical":
return BucketFeatureConfig(feature_dict)
else:
raise RuntimeError(f"Unknown transformation name: '{transformation_name}'")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from typing import Mapping
from typing import Mapping, List

from graphstorm_processing.constants import VALID_IMPUTERS, VALID_NORMALIZERS
from .feature_config_base import FeatureConfig
Expand Down Expand Up @@ -92,3 +92,61 @@ def __init__(self, config: Mapping):
self.separator = self._transformation_kwargs.get("separator", None)

self._sanity_check()


class BucketFeatureConfig(FeatureConfig):
"""Feature configuration for bucket normalization features.
Supported kwargs
----------------
imputer: str
A method to fill in missing values in the data. Valid values are:
"none" (Default), "mean", "median", and "most_frequent". Missing values will be replaced
with the respective value computed from the data.
normalizer: str
A normalization to apply to each column. Valid values are
"none", "min-max", and "standard".
The transformation applied will be:
* "none": (Default) Don't normalize the numerical values during encoding.
* "min-max": Normalize each value by subtracting the minimum value from it,
and then dividing it by the difference between the maximum value and the minimum.
* "standard": Normalize each value by dividing it by the sum of all the values.
bucket_cnt: int
The count of bucket lists used in the bucket feature transform
range: List[float]
The range of bucket lists only defining the start and end point
slide_window_size: float
Interval or range within which numeric values are grouped into buckets
"""

def __init__(self, config: Mapping):
super().__init__(config)
self.imputer = self._transformation_kwargs.get("imputer", "none")
self.norm = self._transformation_kwargs.get("normalizer", "none")
self.bucket_cnt = self._transformation_kwargs.get("bucket_cnt", "none")
self.range = self._transformation_kwargs.get("range", "none")
self.slide_window_size = self._transformation_kwargs.get("slide_window_size", "none")
self._sanity_check()

def _sanity_check(self) -> None:
super()._sanity_check()
assert (
self.imputer in VALID_IMPUTERS
), f"Unknown imputer requested, expected one of {VALID_IMPUTERS}, got {self.imputer}"
assert (
self.norm in VALID_NORMALIZERS
), f"Unknown normalizer requested, expected one of {VALID_NORMALIZERS}, got {self.norm}"
assert (
isinstance(self.bucket_cnt, int)
), f"Expect bucket_cnt {self.bucket_cnt} be an integer"
assert (
isinstance(self.range, list) and all(isinstance(x, int) for x in self.range)
and len(self.range) == 2
), f"Expect range {self.range} be a list of two integers"
assert (
isinstance(self.slide_window_size, float) or self.slide_window_size == "none"
), f"Expect no slide window size or it is a number"
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
NoopTransformation,
DistNumericalTransformation,
DistMultiNumericalTransformation,
DistBucketNumericalTransformation,
)


Expand All @@ -48,6 +49,8 @@ def __init__(self, feature_config: FeatureConfig):
self.transformation = DistNumericalTransformation(**default_kwargs, **args_dict)
elif feat_type == "multi-numerical":
self.transformation = DistMultiNumericalTransformation(**default_kwargs, **args_dict)
elif feat_type == "bucket-numerical":
self.transformation = DistBucketNumericalTransformation(**default_kwargs, **args_dict)
else:
raise NotImplementedError(
f"Feature {feat_name} has type: {feat_type} that is not supported"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
from .dist_label_transformation import DistSingleLabelTransformation, DistMultiLabelTransformation
from .dist_numerical_transformation import (
DistMultiNumericalTransformation,
DistNumericalTransformation,
DistNumericalTransformation
)
from .dist_bucket_numerical_transformation import DistBucketNumericalTransformation
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from typing import List
import math

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, IntegerType, FloatType
import numpy as np

from .base_dist_transformation import DistributedTransformation
from .dist_numerical_transformation import apply_imputation, apply_norm


class DistBucketNumericalTransformation(DistributedTransformation):
def __init__(self, cols: List[str], range: List[float],
bucket_cnt: int, slide_window_size: float, normalizer: List[str] = ['none'], imputer: List[str] = ['none']) -> None:
super().__init__(cols)
self.cols = cols
assert len(self.cols) == 1, "Bucket numerical transformation only supports single column"
self.range = range
self.bucket_count = bucket_cnt
self.slide_window_size = slide_window_size
self.shared_norm = normalizer
# Spark uses 'mode' for the most frequent element
self.shared_imputation = "mode" if imputer == "most_frequent" else imputer

@staticmethod
def get_transformation_name() -> str:
return "DistBucketNumericalTransformation"

def apply(self, input_df: DataFrame) -> DataFrame:
imputed_df = apply_imputation(self.cols,self.shared_imputation, input_df)
scaled_df = apply_norm(self.cols, self.shared_norm, imputed_df)
min_val, max_val = self.range

bucket_size = (max_val - min_val) / self.bucket_count
epsilon = bucket_size / 10

# Implementation based on graphloader.utils.parse_numerical_multihot_feat
# TODO: Test if pyspark.ml.feature.Bucketizer covers our requirements and is faster
def determine_bucket_membership(value: float) -> List[int]:
# Create value range, value -> [value - slide/2, value + slide/2]
high_val = value + self.slide_window_size/2
low_val = value - self.slide_window_size/2

# Early exits to avoid numpy calls
membership_list = [0.0] * self.bucket_count
if value >= max_val:
membership_list[-1] = 1.0
return membership_list
if value <= min_val:
membership_list[0] = 1.0
return membership_list

# Upper and lower threshold the value range
if low_val < min_val:
low_val = min_val
elif low_val >= max_val:
low_val = max_val - epsilon
if high_val < min_val:
high_val = min_val
elif high_val >= max_val:
high_val = max_val - epsilon

# Determine upper and lower bucket membership
low_val -= min_val
high_val -= min_val
low_idx = low_val // bucket_size
high_idx = (high_val // bucket_size) + 1

idx = np.arange(start=low_idx, stop=high_idx, dtype=int)
membership_array = np.zeros(self.bucket_count, dtype=float)
membership_array[idx] = 1.0

return membership_array.tolist()

bucket_udf = F.udf(determine_bucket_membership, ArrayType(FloatType()))

bucketized_df = scaled_df.select(bucket_udf(F.col(self.cols[0])).alias(self.cols[0]))

return bucketized_df
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def determine_spark_feature_type(feature_type: str) -> Type[DataType]:
# TODO: Replace with pattern matching after moving to Python 3.10?
if feature_type in ["no-op", "multi-numerical"] or feature_type.startswith("text"):
return StringType
if feature_type in ["numerical", "none"]:
if feature_type in ["numerical", "bucket-numerical", "none"]:
return FloatType
else:
raise NotImplementedError(f"Unknown feature type: {feature_type}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
"features": [
{
"column": "age",
"name": "age_bucket",
"transformation": {
"name": "numerical",
"name": "bucket-numerical",
"kwargs": {
"normalizer": "none",
"bucket_cnt": 3,
"range": [10, 20],
"slide_window_size": 0.0,
"imputer": "mean"
}
}
Expand Down
4 changes: 2 additions & 2 deletions graphstorm-processing/tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter):
"features": [
{"feature_col": ["citation_time"], "feature_name": "feat"},
{"feature_col": ["num_citations"], "transform": {"name": "max_min_norm"}},
{"feature_col": ["num_citations"], "transform": {"name": "bucket_numerical",
{"feature_col": ["num_citations"], "transform": {"name": "bucket-numerical",
"bucket_cnt": 9,
"range": [10, 100],
"slide_window_size": 5}},
Expand Down Expand Up @@ -260,7 +260,7 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter):
"column": "num_citations",
"transformation": {
"name": "numerical",
"kwargs": {"normalizer": "bucket_numerical",
"kwargs": {"normalizer": "bucket-numerical",
"bucket_cnt": 9,
"range": [10, 100],
"slide_window_size": 5,
Expand Down

0 comments on commit b3ddb9d

Please sign in to comment.