Source code for hyperts.utils.transformers

import copy
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

from hyperts.utils import consts
from hyperts.utils._base import get_tool_box
from hypernets.pipeline.base import HyperTransformer


##################################### Define sklearn Transformer #####################################
[docs]class TimeSeriesTransformer: """Scale time series features. """ def __init__(self, time_series_col=None): self.time_series_col = time_series_col
[docs] def transform(self, X, y=None, **kwargs): # TODO: return X.values
[docs] def fit(self, X, y=None, **kwargs): # TODO: return self
[docs]class LogXplus1Transformer(BaseEstimator, TransformerMixin): """Scale each feature by log(x+1). Parameters ---------- nan_tolerance : int, default=5. Tolerate the number of nans that exist. eps : float, default=1e-8. To prevent the division by 0. copy : bool, default=True. Set to False to perform inplace row normalization and avoid a copy. """ def __init__(self, nan_tolerance=5, eps=1e-8, copy=True): super(LogXplus1Transformer, self).__init__() self.eps = eps self.copy = copy self.nan_tolerance = nan_tolerance self.is_trans = True
[docs] def fit(self, X, y=None, **kwargs): return self
[docs] def transform(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) transform_X = np.log(X + 1) transform_X = np.clip(transform_X, self.eps, abs(transform_X)) indices = np.where(np.isnan(transform_X)) if len(indices[0]) <= self.nan_tolerance: transform_X[indices] = self.eps else: transform_X = X self.is_trans = False return transform_X
[docs] def inverse_transform(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) if self.is_trans: X = np.exp(X) - 1 return X
[docs]class IdentityTransformer(BaseEstimator, TransformerMixin): """Identity transformer. """ def __init__(self): super(IdentityTransformer, self).__init__()
[docs] def fit(self, X, y=None, **kwargs): return self
[docs] def transform(self, X, y=None, **kwargs): return X
[docs] def inverse_transform(self, X, y=None, **kwargs): return X
[docs]class StandardTransformer(BaseEstimator, TransformerMixin): """Standardize features by removing the mean and scaling to unit variance. Notes ---------- Unlike scikit-learn, it can process 3D time series - (nb_samples, series_length, nb_dims). The transformation is given by:: X_scaled = (X - X.mean) / (X.var + eps), where, for 2D features: mean = X.mean(axis=0), var = ((X - mean) ** 2).mean(axis=0) for 3D features: mean = X.mean(axis=0, keepdims=True).mean(axis=1, keepdims=True), var = ((X - mean) ** 2).mean(axis=0, keepdims=True).mean(axis=1, keepdims=True). Parameters ---------- eps : float, default=1e-8. To prevent the division by 0. copy : bool, default=True. Set to False to perform inplace row normalization and avoid a copy. """ def __init__(self, eps=1e-8, copy=True): super(StandardTransformer, self).__init__() self.eps = eps self.copy = copy self.mean = None self.var = None
[docs] def fit(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) if len(X.shape) <= 2: if len(X.shape) == 1: X = X.reshape(-1, 1) self.mean = X.mean(axis=0) self.var = ((X - self.mean) ** 2).mean(axis=0) else: self.mean = X.mean(axis=0, keepdims=True).mean(axis=1, keepdims=True) self.var = ((X - self.mean) ** 2).mean(axis=0, keepdims=True).mean(axis=1, keepdims=True) return self
[docs] def transform(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) transform_X = (X - self.mean) / np.sqrt(self.var + self.eps) return transform_X
[docs] def inverse_transform(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) inverse_X = X * np.sqrt(self.var + self.eps) + self.mean return inverse_X
[docs]class MinMaxTransformer(BaseEstimator, TransformerMixin): """Transform features by scaling each feature to a given range. Notes ---------- Unlike scikit-learn, it can process 3D time series - (nb_samples, series_length, nb_dims). The transformation is given by:: X_scaled = (X - X.min) / (X.max - X.min + eps), where, for 2D features: min = X.min(axis=0, initial=None), max = X.max(axis=0, initial=None), for 3D features: min = X.min(axis=0, keepdims=True, initial=None).min(axis=1, keepdims=True), max = X.max(axis=0, keepdims=True, initial=None).max(axis=1, keepdims=True). Parameters ---------- eps : float, default=1e-8. To prevent the division by 0. copy : bool, default=True. Set to False to perform inplace row normalization and avoid a copy. """ def __init__(self, eps=1e-8, copy=True): super(MinMaxTransformer, self).__init__() self.eps = eps self.copy = copy self.min = None self.max = None
[docs] def fit(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) if len(X.shape) <= 2: if len(X.shape) == 1: X = X.reshape(-1, 1) self.min = X.min(axis=0, initial=None) self.max = X.max(axis=0, initial=None) else: self.min = X.min(axis=0, keepdims=True, initial=None).min(axis=1, keepdims=True) self.max = X.max(axis=0, keepdims=True, initial=None).max(axis=1, keepdims=True) return self
[docs] def transform(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) transform_X = (X - self.min) / (self.max - self.min + self.eps) return transform_X
[docs] def inverse_transform(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) inverse_X = X * (self.max - self.min + self.eps) + self.min return inverse_X
[docs]class MaxAbsTransformer(BaseEstimator, TransformerMixin): """Scale each feature by its maximum absolute value. Notes ---------- Unlike scikit-learn, it can process 3D time series - (nb_samples, series_length, nb_dims). The transformation is given by:: X_scaled = X / (X.max_abs + eps), where, for 2D features: max_abs = np.max(np.abs(X), axis=0), for 3D features: max_abs = np.abs(X).max(axis=0, keepdims=True).max(axis=1, keepdims=True) Parameters ---------- eps : float, default=1e-8. To prevent the division by 0. copy : bool, default=True. Set to False to perform inplace row normalization and avoid a copy. """ def __init__(self, eps=1e-8, copy=True): super(MaxAbsTransformer, self).__init__() self.eps = eps self.copy = copy self.max_abs = None
[docs] def fit(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) if len(X.shape) <= 2: if len(X.shape) == 1: X = X.reshape(-1, 1) self.max_abs = np.max(np.abs(X), axis=0) else: X = np.abs(X) self.max_abs = X.max(axis=0, keepdims=True).max(axis=1, keepdims=True) return self
[docs] def transform(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) transform_X = X / (self.max_abs + self.eps) return transform_X
[docs] def inverse_transform(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if not isinstance(X, np.ndarray): X = np.array(X) inverse_X = X * (self.max_abs + self.eps) return inverse_X
[docs]class CategoricalTransformer(BaseEstimator, TransformerMixin): """Transform categorical labels to one hot labels. Parameters ---------- label_encoder : An existing Label encoder, default=None. onehot_encoder : An existing OneHot encoder, default=None. copy : bool, default=True. Set to False to perform inplace row normalization and avoid a copy. """ def __init__(self, label_encoder=None, onehot_encoder=None, copy=True): super(CategoricalTransformer, self).__init__() self.copy = copy if label_encoder is None: self.label_encoder = LabelEncoder() else: self.label_encoder = label_encoder if onehot_encoder is None: self.onehot_encoder = OneHotEncoder(sparse=False, categories="auto") else: self.onehot_encoder = onehot_encoder self.classes_ = None self.nb_classes_ = None
[docs] def fit(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) self.label_encoder.fit(X) self.classes_ = self.label_encoder.classes_ self.nb_classes_ = len(self.classes_) X = self.label_encoder.transform(X) self.onehot_encoder.fit(X.reshape(len(X), 1)) return self
[docs] def transform(self, X, y=None,**kwargs): if self.copy: X = copy.deepcopy(X) transform_X = self.label_encoder.transform(X) if self.nb_classes_ > 2: # multiclass transform_X = self.onehot_encoder.transform(transform_X.reshape(len(X), 1)) return transform_X
[docs] def inverse_transform(self, X, y=None, **kwargs): if self.copy: X = copy.deepcopy(X) if self.nb_classes_ > 2: # multiclass X = self.onehot_encoder.inverse_transform(X) inverse_X = self.label_encoder.inverse_transform(X) return inverse_X
[docs]class CovariateTransformer(BaseEstimator, TransformerMixin): """Transform covariates by 'drop_constant_columns', 'drop_duplicated_columns', 'drop_idness_columns', 'replace_inf_values' and so on. Parameters ---------- covariables: list[n*str], if the data contains covariables, specify the covariable column names, (default=None). data_cleaner_args : dict or None, (default=None). If not None, the definition example is as follows: data_cleaner_args = { 'correct_object_dtype': False, 'int_convert_to': 'str', 'drop_constant_columns': True, 'drop_duplicated_columns': True, 'drop_idness_columns': True, 'replace_inf_values': np.nan, ... } Reference for details: https://github.com/DataCanvasIO/Hypernets/blob/master/hypernets/tabular/data_cleaner.py """ def __init__(self, covariables, data_cleaner_args=None): super(CovariateTransformer, self).__init__() self.covariables = covariables if data_cleaner_args is None: self.data_cleaner_args = {'correct_object_dtype': False, 'int_convert_to': 'str'} else: self.data_cleaner_args = data_cleaner_args self.cleaner = None self.covariables_ = None self.dorp_nan_columns = []
[docs] def fit(self, X, y=None, **kwargs): tb = get_tool_box(X) null_num = X[self.covariables].isnull().sum().to_dict() for k, v in null_num.items(): if v > len(X)*consts.NAN_DROP_SIZE: self.dorp_nan_columns.append(k) X = X.drop(columns=self.dorp_nan_columns) self.covariables = tb.list_diff(self.covariables, self.dorp_nan_columns) self.cleaner = tb.data_cleaner(**self.data_cleaner_args) covariates, _ = self.cleaner.fit_transform(X[self.covariables]) self.covariables_ = covariates.columns.to_list() return self
[docs] def transform(self, X, y=None, **kwargs): tb = get_tool_box(X) X = X.drop(columns=self.dorp_nan_columns) covariates = self.cleaner.transform(X[self.covariables]) X = X.drop(columns=self.covariables_ if self.dorp_nan_columns else self.covariables) X = tb.concat_df([X, covariates], axis=1) return X
[docs]class OutliersTransformer(BaseEstimator, TransformerMixin): """Remove outliers. Parameters ---------- method: str, 'clip' or 'fill'. std_threshold: int, the threshold of std. freq: str, DateOffset or None, default None. copy : bool, default=True. Set to False to perform inplace row normalization and avoid a copy. """ def __init__(self, method='clip', std_threshold=3, freq=None, copy=True): super(OutliersTransformer, self).__init__() self.method = method self.std_threshold = std_threshold self.freq = freq self.copy = copy
[docs] def fit(self, X, y=None, **kwargs): return self
[docs] def transform(self, X, y=None, **kwargs): tb = get_tool_box(X) if self.copy: X = copy.deepcopy(X) # periods = [tb.fft_infer_period(X[:, col]) for col in range(X.shape[1])] # period = int(np.argmax(np.bincount(periods))) if self.method == 'fill': X = tb.nan_to_outliers(X, std_threshold=self.std_threshold) if self.freq is not None: # X = tb.forward_period_imputer(X, period) X = tb.multi_period_loop_imputer(X, freq=self.freq) else: X = tb.simple_numerical_imputer(X) else: X = tb.clip_to_outliers(X, std_threshold=self.std_threshold) X = tb.df_to_array(X) return X
[docs] def inverse_transform(self, X, y=None, **kwargs): return X
##################################### Define Hyper Transformer #####################################
[docs]class TimeSeriesHyperTransformer(HyperTransformer): def __init__(self, space=None, name=None, **kwargs): HyperTransformer.__init__(self, TimeSeriesTransformer, space, name, **kwargs)
[docs]class LogXplus1HyperTransformer(HyperTransformer): def __init__(self, space=None, name=None, **kwargs): HyperTransformer.__init__(self, LogXplus1Transformer, space, name, **kwargs)