# -*- coding:utf-8 -*-
import os
import time
import math
import collections
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import save_model, load_model
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from hyperts.framework.dl import layers, losses, metrics, optimizers
from hyperts.framework.dl.dl_utils.timeseries import from_array_to_timeseries
from hyperts.framework.dl.dl_utils.metainfo import MetaTSFprocessor, MetaTSCprocessor
from hyperts.utils import consts, get_tool_box
from scipy.stats import binom
from scipy.special import erf
from sklearn.preprocessing import MinMaxScaler
from hypernets.utils import logging, fs
logger = logging.get_logger(__name__)
import warnings
warnings.filterwarnings("ignore")
tf.get_logger().setLevel('ERROR')
[docs]class Metrics(collections.UserDict):
"""A User Dict class to store metrics required for model configuration.
Usage with `compile()` API.
Returns
-------
A tf.keras.metrics.Metirc object.
"""
def __init__(self, *args, **kwargs):
super(Metrics, self).__init__(*args, **kwargs)
self.data = {
'mae': metrics.MeanAbsoluteError(name='mae'),
'mean_absolute_error': metrics.MeanAbsoluteError(name='mae'),
'mape': metrics.MeanAbsolutePercentageError(name='mape'),
'mean_absolute_percentage_error': metrics.MeanAbsolutePercentageError(name='mape'),
'smape': metrics.SymmetricMeanAbsolutePercentageError(name='smape'),
'mse': metrics.MeanSquaredError(name='mse'),
'mean_squared_error': metrics.MeanSquaredError(name='mse'),
'rmse': metrics.RootMeanSquaredError(name='rmse'),
'msle': metrics.MeanSquaredLogarithmicError(name='msle'),
'accuracy': metrics.CategoricalAccuracy(name='acc'),
'auc': metrics.AUC(name='auc'),
'roc_auc_score': metrics.AUC(name='auc'),
'precision': metrics.Precision(name='precison'),
'precision_score': metrics.Precision(name='precison'),
'recall': metrics.Recall(name='recall'),
'recall_score': metrics.Recall(name='recall'),
}
[docs]class Losses(collections.UserDict):
"""A User Dict class to store loss required for model configuration.
Usage with `compile()` API.
Returns
-------
A tf.keras.metrics.Loss object.
"""
def __init__(self, *args, **kwargs):
super(Losses, self).__init__(*args, **kwargs)
self.data = {
'mse': losses.MeanSquaredError(),
'mean_squared_error': losses.MeanSquaredError(),
'mae': losses.MeanAbsoluteError(),
'mean_absolute_error': losses.MeanAbsoluteError(),
'huber_loss': losses.Huber(),
'log_gaussian_loss': losses.LogGaussianLoss(),
'mape': losses.MeanAbsolutePercentageError(),
'mean_absolute_percentage_error': losses.MeanAbsolutePercentageError(),
'smape': losses.SymmetricMeanAbsolutePercentageError(),
'symmetric_mean_absolute_percentage_error': losses.SymmetricMeanAbsolutePercentageError(),
'log_cosh': losses.LogCosh(),
'categorical_crossentropy': losses.CategoricalCrossentropy(),
'binary_crossentropy': losses.BinaryCrossentropy(),
}
[docs]class BaseDeepEstimator(object):
"""Abstract base class representing deep estimator object.
Parameters
----------
task: 'str' or None, default None.
Task could be 'univariate-forecast', 'multivariate-forecast', and 'univariate-binaryclass', etc.
See consts.py for details.
timestamp: 'str' or None, default None, representing time column name (in DataFrame).
window: 'int' or None, default None, length of the time series sequences for a sample.
This must be specified for a forecast task.
horizon: 'int' or None, default None, representing the time interval between the start point
of prediction time and the end point of observation time.
This must be specified for a forecast task.
forecast_length: 'int', default 1.
A forecast field of vision during a forecast task.
monitor_metric: 'str' or None, default None.
Quantity to be monitored.
reducelr_patience: 'int' or None, default None.
Number of epochs with no improvement after which learning rate will be reduced.
earlystop_patience: 'str' or None, default None.
Number of epochs with no improvement after which training will be stopped.
embedding_output_dim: 'int', default 4.
The dimension in which categorical variables are embedded.
continuous_columns: CategoricalColumn class.
Contains some information(name, column_names, input_dim, dtype,
input_name) about continuous variables.
categorical_columns: CategoricalColumn class.
Contains some information(name, vocabulary_size, embedding_dim,
dtype, input_name) about categorical variables.
"""
def __init__(self, task,
timestamp=None,
window=None,
horizon=None,
forecast_length=1,
monitor_metric=None,
reducelr_patience=None,
earlystop_patience=None,
embedding_output_dim=4,
continuous_columns=None,
categorical_columns=None):
self.task = task
self.timestamp = timestamp
self.window = window
self.horizon = horizon
self.forecast_length = forecast_length
self.monitor_metric = monitor_metric
self.reducelr_patience = reducelr_patience
self.earlystop_patience = earlystop_patience
self.embedding_output_dim = embedding_output_dim
self.continuous_columns = continuous_columns
self.categorical_columns = categorical_columns
self.time_columns = None
self.forecast_start = None
self.model = None
self.meta = None
def _build_estimator(self, **kwargs):
"""Build a time series deep neural net model.
Returns
-------
A tf.keras.Model object.
"""
raise NotImplementedError(
'_build_estimator is a protected abstract method, it must be implemented.'
)
def _fit(self, train_X, train_y, valid_X, valid_y, **kwargs):
"""Fit time series model to training data.
Returns
-------
A fitted model and history.
"""
raise NotImplementedError(
'_fit is a protected abstract method, it must be implemented.'
)
def _predict(self, X):
"""Predict target for sequences in X.
Returns
-------
y: 2D numpy.
"""
raise NotImplementedError(
'_predict is a protected abstract method, it must be implemented.'
)
def _reset_parameters(self, **kwargs):
"""Reset init parameters based on fit-kwargs.
"""
if kwargs.get('window') is not None and kwargs.get('window') > 0:
self.window = kwargs.get('window')
if kwargs.get('horizon') is not None and kwargs.get('horizon') > 0:
self.horizon = kwargs.get('horizon')
if kwargs.get('forecast_length') is not None and kwargs.get('forecast_length') > 0:
self.forecast_length = kwargs.get('forecast_length')
if kwargs.get('monitor_metric') is not None and isinstance(kwargs.get('monitor_metric'), str):
self.monitor_metric = kwargs.get('monitor_metric')
if kwargs.get('reducelr_patience') is not None and kwargs.get('reducelr_patience') > 0:
self.reducelr_patience = kwargs.get('reducelr_patience')
if kwargs.get('earlystop_patience') is not None and kwargs.get('earlystop_patience') > 0:
self.earlystop_patience = kwargs.get('earlystop_patience')
if kwargs.get('embedding_output_dim') is not None and kwargs.get('embedding_output_dim') > 0:
self.embedding_output_dim = kwargs.get('embedding_output_dim')
if kwargs.get('loss') is not None and isinstance(kwargs.get('loss'), str):
self.loss = kwargs.get('loss')
if kwargs.get('learning_rate') is not None and kwargs.get('learning_rate') > 0:
self.learning_rate = kwargs.get('learning_rate')
if kwargs.get('lr') is not None and kwargs.get('lr') > 0:
self.learning_rate = kwargs.get('lr')
if kwargs.get('optimizer') is not None and isinstance(kwargs.get('optimizer'), str):
self.optimizer = kwargs.get('optimizer')
if kwargs.get('summary') is not None and isinstance(kwargs.get('summary'), bool):
self.summary = kwargs.get('summary')
if kwargs.get('reconstract_dim') is not None and isinstance(kwargs.get('reconstract_dim'), int):
self.reconstract_dim = kwargs.get('reconstract_dim')
self.model_kwargs = {**self.model_kwargs, **kwargs}
[docs] def fit(self,
X,
y,
batch_size=None,
epochs=1,
verbose=1,
callbacks=None,
validation_split=0.2,
validation_data=None,
shuffle=True,
class_weight=None,
sample_weight=None,
initial_epoch=0,
steps_per_epoch=None,
validation_steps=None,
validation_freq=1,
max_queue_size=10,
workers=1,
use_multiprocessing=False,
**kwargs):
"""Trains the model for a fixed number of epochs (iterations on a dataset).
Parameters
----------
X: 2D DataFrame of shape: (series_length, 1+(n_covariates)) for forecast task,
2D nested DataFrame shape: (sample, nb_covariables(series_length)) for
classification or regression task, 2D DataFrame of shape
(n_samples, 1+(n_covariates)) for anomaly detection.
y: 2D DataFrame of shape: (series_length, n_target_variables) for forecast task,
2D DataFrame of shape: (nb_samples, 1) for classification or regression task,
2D DataFrame of shape (n_samples, reconstract_dim).
batch_size: Integer or `None`.
Number of samples per gradient update.
If unspecified, `batch_size` will default to self-adaption based on number of samples.
Do not specify the `batch_size` if your data is in the
form of datasets, generators, or `keras.utils.Sequence` instances
(since they generate batches).
epochs: Integer. Number of epochs to train the model.
An epoch is an iteration over the entire `x` and `y`
data provided.
Note that in conjunction with `initial_epoch`,
`epochs` is to be understood as "final epoch".
The model is not trained for a number of iterations
given by `epochs`, but merely until the epoch
of index `epochs` is reached.
verbose: 'auto', 0, 1, or 2. Verbosity mode.
0 = silent, 1 = progress bar, 2 = one line per epoch.
'auto' defaults to 1 for most cases, but 2 when used with
`ParameterServerStrategy`. Note that the progress bar is not
particularly useful when logged to a file, so verbose=2 is
recommended when not running interactively (eg, in a production
environment).
callbacks: List of `keras.callbacks.Callback` instances.
List of callbacks to apply during training.
See `tf.keras.callbacks`. Note `tf.keras.callbacks.ProgbarLogger`
and `tf.keras.callbacks.History` callbacks are created automatically
and need not be passed into `model.fit`.
`tf.keras.callbacks.ProgbarLogger` is created or not based on
`verbose` argument to `model.fit`.
Callbacks with batch-level calls are currently unsupported with
`tf.distribute.experimental.ParameterServerStrategy`, and users are
advised to implement epoch-level calls instead with an appropriate
`steps_per_epoch` value.
validation_split: Float between 0 and 1.
Fraction of the training data to be used as validation data.
The model will set apart this fraction of the training data,
will not train on it, and will evaluate
the loss and any model metrics
on this data at the end of each epoch.
The validation data is selected from the last samples
in the `x` and `y` data provided, before shuffling. This argument is
not supported when `x` is a dataset, generator or
`keras.utils.Sequence` instance.
`validation_split` is not yet supported with
`tf.distribute.experimental.ParameterServerStrategy`.
validation_data: Data on which to evaluate
the loss and any model metrics at the end of each epoch.
The model will not be trained on this data. Thus, note the fact
that the validation loss of data provided using `validation_split`
or `validation_data` is not affected by regularization layers like
noise and dropout.
`validation_data` will override `validation_split`.
`validation_data` could be:
- A tuple `(x_val, y_val)` of Numpy arrays or tensors.
- A tuple `(x_val, y_val, val_sample_weights)` of NumPy arrays.
- A `tf.data.Dataset`.
- A Python generator or `keras.utils.Sequence` returning
`(inputs, targets)` or `(inputs, targets, sample_weights)`.
`validation_data` is not yet supported with
`tf.distribute.experimental.ParameterServerStrategy`.
shuffle: Boolean (whether to shuffle the training data
before each epoch) or str (for 'batch'). This argument is ignored
when `x` is a generator or an object of tf.data.Dataset.
'batch' is a special option for dealing
with the limitations of HDF5 data; it shuffles in batch-sized
chunks. Has no effect when `steps_per_epoch` is not `None`.
class_weight: Optional dictionary mapping class indices (integers)
to a weight (float) value, used for weighting the loss function
(during training only).
This can be useful to tell the model to
"pay more attention" to samples from
an under-represented class.
sample_weight: Optional Numpy array of weights for
the training samples, used for weighting the loss function
(during training only). You can either pass a flat (1D)
Numpy array with the same length as the input samples
(1:1 mapping between weights and samples),
or in the case of temporal data,
you can pass a 2D array with shape
`(samples, sequence_length)`,
to apply a different weight to every timestep of every sample. This
argument is not supported when `x` is a dataset, generator, or
`keras.utils.Sequence` instance, instead provide the sample_weights
as the third element of `x`.
initial_epoch: Integer.
Epoch at which to start training
(useful for resuming a previous training run).
steps_per_epoch: Integer or `None`.
Total number of steps (batches of samples)
before declaring one epoch finished and starting the
next epoch. When training with input tensors such as
TensorFlow data tensors, the default `None` is equal to
the number of samples in your dataset divided by
the batch size, or 1 if that cannot be determined. If x is a
`tf.data` dataset, and 'steps_per_epoch'
is None, the epoch will run until the input dataset is exhausted.
When passing an infinitely repeating dataset, you must specify the
`steps_per_epoch` argument. If `steps_per_epoch=-1` the training
will run indefinitely with an infinitely repeating dataset.
This argument is not supported with array inputs.
When using `tf.distribute.experimental.ParameterServerStrategy`:
* `steps_per_epoch=None` is not supported.
validation_steps: Only relevant if `validation_data` is provided and
is a `tf.data` dataset. Total number of steps (batches of
samples) to draw before stopping when performing validation
at the end of every epoch. If 'validation_steps' is None, validation
will run until the `validation_data` dataset is exhausted. In the
case of an infinitely repeated dataset, it will run into an
infinite loop. If 'validation_steps' is specified and only part of
the dataset will be consumed, the evaluation will start from the
beginning of the dataset at each epoch. This ensures that the same
validation samples are used every time.
validation_freq: Only relevant if validation data is provided. Integer
or `collections.abc.Container` instance (e.g. list, tuple, etc.).
If an integer, specifies how many training epochs to run before a
new validation run is performed, e.g. `validation_freq=2` runs
validation every 2 epochs. If a Container, specifies the epochs on
which to run validation, e.g. `validation_freq=[1, 2, 10]` runs
validation at the end of the 1st, 2nd, and 10th epochs.
max_queue_size: Integer. Used for generator or `keras.utils.Sequence`
input only. Maximum size for the generator queue.
If unspecified, `max_queue_size` will default to 10.
workers: Integer. Used for generator or `keras.utils.Sequence` input
only. Maximum number of processes to spin up
when using process-based threading. If unspecified, `workers`
will default to 1.
use_multiprocessing: Boolean. Used for generator or
`keras.utils.Sequence` input only. If `True`, use process-based
threading. If unspecified, `use_multiprocessing` will default to
`False`. Note that because this implementation relies on
multiprocessing, you should not pass non-picklable arguments to
the generator as they can't be passed easily to children processes.
See `tf.keras.model.Model.fit` for details.
"""
start = time.time()
self._reset_parameters(**kwargs)
X, y = self._preprocessor(X, y)
tb = get_tool_box(X)
if validation_data is not None:
validation_data = self.meta.transform(*validation_data)
if len(validation_data) != 2:
raise ValueError(f'Unexpected validation_data length, expected 2 but {len(validation_data)}.')
else:
X_val, y_val = validation_data[0], validation_data[1]
X = tb.concat_df([X, X_val], axis=0)
y = tb.concat_df([y, y_val], axis=0)
if self.window < self.forecast_length:
logger.warning('window must not be smaller than forecast_length, reset forecast_length=1.')
self.forecast_length = 1
X_train, y_train = self._dataloader(X=X, y=y, task=self.task, window=self.window,
horizon=self.horizon, forecast_length=self.forecast_length,
reset_forecast_start=True)
validation_length = int(len(y_train) * validation_split)
if validation_length <= 0:
raise RuntimeError(f'The train set must not be less than {int(1 / validation_split)}.')
if self.task in consts.TASK_LIST_FORECAST + consts.TASK_LIST_DETECTION:
if isinstance(X_train, list):
X_valid = [x[-validation_length:] for x in X_train]
else:
X_valid = X_train[-validation_length:]
y_valid = y_train[-validation_length:]
else:
X_train, X_valid, y_train, y_valid = \
tb.random_train_test_split(X_train, y_train, test_size=validation_length)
if batch_size is None:
data_num = int(len(y_train))
if data_num <= 128:
batch_size = max(int(2 ** (0 + int(math.log(data_num, 4)))), 2)
elif data_num <= 500:
batch_size = max(int(2 ** (1 + int(math.log(data_num, 4)))), 8)
elif data_num <= 1000:
batch_size = max(int(2 ** (2 + int(math.log(data_num, 5)))), 16)
elif data_num <= 60000:
batch_size = min(int(2 ** (3 + int(math.log(data_num, 10)))), 128)
elif data_num <= 240000:
batch_size = min(int(2 ** (4 + int(math.log(data_num, 10)))), 512)
else:
batch_size = 1024
if steps_per_epoch is None:
steps_per_epoch = len(y_train) // batch_size - 1
if steps_per_epoch == 0:
steps_per_epoch = 1
if validation_steps is None:
validation_steps = validation_length // batch_size - 1
if validation_steps <= 1:
validation_steps = 1
if epochs == consts.FINAL_TRAINING_EPOCHS:
if steps_per_epoch <= 16:
epochs = int(epochs * 2)
self.reducelr_patience = 40
self.earlystop_patience = 120
elif steps_per_epoch <= 64:
self.reducelr_patience = 15
self.earlystop_patience = 30
else:
epochs = int(epochs / 1.5)
self.reducelr_patience = 5
self.earlystop_patience = 10
self.learning_rate = self.learning_rate / 2
logger.info(f'Fit epochs is {epochs}, batch_size is {batch_size}.')
callbacks = self._inject_callbacks(callbacks=callbacks, epochs=epochs,
reducelr_patience=self.reducelr_patience,
earlystop_patience=self.earlystop_patience,
verbose=verbose)
model, history = self._fit(X_train, y_train, X_valid, y_valid, epochs=epochs, batch_size=batch_size,
initial_epoch=initial_epoch, verbose=verbose, callbacks=callbacks,
shuffle=shuffle, class_weight=class_weight, sample_weight=sample_weight,
steps_per_epoch=steps_per_epoch, validation_steps=validation_steps,
validation_freq=validation_freq, max_queue_size=max_queue_size, workers=workers,
use_multiprocessing=use_multiprocessing)
self.model = model
logger.info(f'Training finished, total taken {time.time() - start}s.')
return history
[docs] def predict(self, X, batch_size=128):
"""Inference Function.
Task: time series classification or regression.
"""
start = time.time()
probs = self.predict_proba(X, batch_size)
preds = self.proba2predict(probs, encode_to_label=True)
logger.info(f'predict taken {time.time() - start}s')
return preds
[docs] def forecast(self, X):
"""Inference Function.
Task: time series forecast.
"""
start = time.time()
if self.timestamp in X.columns:
steps = X.shape[0]
X = X.drop([self.timestamp], axis=1)
else:
raise ValueError('X is missing the timestamp columns.')
forecast_steps = steps
if steps < self.forecast_length:
aligning_steps = self.forecast_length - steps
steps = self.forecast_length
tail_data = np.reshape(X.values[-1, :], (1, -1))
aligning_X = np.repeat(tail_data, repeats=aligning_steps, axis=0)
aligning_X = pd.DataFrame(aligning_X, columns=X.columns)
X = pd.concat([X, aligning_X], axis=0)
if X.shape[1] >= 1:
X = self.meta.transform_X(X)
X_cont_cols, X_cat_cols = [], []
for c in X.columns:
if c in self.meta.cont_column_names:
X_cont_cols.append(c)
elif c in self.meta.cat_column_names:
X_cat_cols.append(c)
else:
raise ValueError('Unknown column.')
X = X[X_cont_cols + X_cat_cols].values.astype(consts.DATATYPE_TENSOR_FLOAT)
futures = []
data = self.forecast_start.copy()
if X.shape[1] >= 1:
continuous_length = len(self.meta.cont_column_names)
categorical_length = len(self.meta.cat_column_names)
for i in range(math.ceil(steps / self.forecast_length)):
pred = self._predict(data)
futures.append(pred.numpy())
covariable = np.expand_dims(X[i:i + self.forecast_length], 0)
forcast = np.concatenate([pred.numpy(), covariable], axis=-1)
if categorical_length > 0:
data[0] = np.append(data[0], forcast[:, :, :continuous_length]).reshape((1, -1, continuous_length))
data[1] = np.append(data[1], forcast[:, :, continuous_length:]).reshape((1, -1, categorical_length))
data = [data[0][:, -self.window:, :], data[1][:, -self.window:, :]]
else:
data = np.append(data, forcast).reshape((1, -1, continuous_length))
data = data[:, -self.window:, :]
else:
for i in range(math.ceil(steps / self.forecast_length)):
pred = self._predict(data)
futures.append(pred.numpy())
forcast = pred.numpy()
data = np.append(data, forcast).reshape((1, -1, self.meta.classes_))
data = data[:, -self.window:, :]
futures = np.concatenate(futures, axis=0)
futures = futures.reshape(-1, len(self.meta.target_columns))[:steps]
futures = futures[:forecast_steps, :]
logger.info(f'forecast taken {time.time() - start}s')
return futures
[docs] def predict_proba(self, X, batch_size=128):
"""Inference Function.
Task: time series classification/regression.
"""
X = self.meta.transform_X(X)
sample_size = X.shape[0]
if batch_size >= sample_size:
probs = self._predict(X).numpy()
else:
probs = []
iters = sample_size // batch_size + 1
for idx in range(iters):
proba = self._predict(X[idx * batch_size:min((idx + 1) * batch_size, sample_size)])
probs.append(proba.numpy())
probs = np.concatenate(probs, axis=0)
if probs.shape[-1] == 1 and self.task in consts.TASK_LIST_CLASSIFICATION:
probs = np.hstack([1 - probs, probs])
elif probs.shape[-1] == 1 and self.task in consts.TASK_LIST_REGRESSION:
probs = self.meta.inverse_transform_y(probs)
return probs
[docs] def proba2predict(self, proba, encode_to_label=True):
"""Transition Function.
Task: time series classification.
"""
if self.task in consts.TASK_LIST_REGRESSION:
return proba
if proba is None:
raise ValueError('[proba] can not be none.')
if len(proba.shape) == 1:
proba = proba.reshape((-1, 1))
if proba.shape[-1] > 2:
predict = np.zeros(shape=proba.shape)
argmax = proba.argmax(axis=-1)
predict[np.arange(len(argmax)), argmax] = 1
elif proba.shape[-1] == 2:
predict = proba.argmax(axis=-1)
else:
predict = (proba > 0.5).astype('int32').reshape((-1, 1))
if encode_to_label:
logger.info('reverse indicators to labels.')
predict = self.meta.inverse_transform_y(predict)
return predict
def _inject_callbacks(self, callbacks, epochs, reducelr_patience=5, earlystop_patience=10, verbose=1):
"""Inject callbacks. including ReduceLROnPlateau and EarlyStopping.
Parameters
----------
reducelr_patience: 'int' or None, default None.
Number of epochs with no improvement after which learning rate will be reduced.
earlystop_patience: 'str' or None, default None.
Number of epochs with no improvement after which training will be stopped.
verbose: 'int'. 0: quiet, 1: update messages.
"""
lr, es = None, None
if callbacks is not None:
for callback in callbacks:
if isinstance(callback, ReduceLROnPlateau):
lr = callback
if isinstance(callback, EarlyStopping):
es = callback
else:
callbacks = []
if epochs <= 10:
return []
else:
if lr is None and isinstance(reducelr_patience, int) and reducelr_patience > 0:
lr = ReduceLROnPlateau(monitor=self.monitor, factor=0.5,
patience=reducelr_patience, min_lr=0.0001, verbose=verbose)
callbacks.append(lr)
logger.info(f'Injected a callback [ReduceLROnPlateau]. monitor:{lr.monitor}, '
f'patience:{lr.patience}')
if es is None and isinstance(earlystop_patience, int) and earlystop_patience > 0:
es = EarlyStopping(monitor=self.monitor, min_delta=1e-5,
patience=earlystop_patience, verbose=verbose)
callbacks.append(es)
logger.info(f'Injected a callback [EarlyStopping]. monitor:{es.monitor}, '
f'patience:{es.patience}')
return callbacks
def _compile_model(self, model, optimizer, learning_rate=0.001):
"""Configures the model for training.
Parameters
----------
model: `Model` groups layers into an object with training and
inference features. See `tf.keras.models.Model`.
optimizer: String (name of optimizer). See `tf.keras.optimizers`.
learning_rate: 'float', default 0.001.
"""
if self.task in consts.TASK_LIST_FORECAST + consts.TASK_LIST_REGRESSION:
if self.loss == 'auto':
self.loss = 'huber_loss'
if self.metrics == 'auto':
self.metrics = ['rmse']
elif self.task in consts.TASK_LIST_MULTICLASS:
if self.loss == 'auto':
self.loss = 'categorical_crossentropy'
if self.metrics == 'auto':
self.metrics = ['accuracy']
elif self.task in consts.TASK_LIST_BINARYCLASS:
if self.loss == 'auto':
self.loss = 'binary_crossentropy'
if self.metrics == 'auto':
self.metrics = ['auc']
elif self.task in consts.TASK_LIST_DETECTION:
self.loss = None
self.metrics = ['mse']
else:
logger.info('Unsupport this task: {}, Apart from [multiclass, binary, \
forecast, regression, and anomaly detection].'.format(self.task))
if self.loss is None:
loss = None
else:
loss = Losses()[self.loss]
if self.metrics is None:
metrics = None
elif set(self.metrics) < set(Metrics().keys()):
metrics = [Metrics()[m] for m in self.metrics]
else:
if self.task in consts.TASK_LIST_BINARYCLASS:
metrics = [Metrics()['auc']]
elif self.task in consts.TASK_LIST_MULTICLASS:
metrics = [Metrics()['accuracy']]
else:
metrics = [Metrics()['rmse']]
logger.warning(f"In dl model, {self.metrics} is not supported, "
f"so ['{metrics[0].name}'] will be called.")
if loss is not None and metrics is not None:
logger.info(f'The compile loss is `{loss.name}`, metrics is `{metrics[0].name}`.')
elif loss is None and metrics is not None:
logger.info(f'metrics is `{metrics[0].name}`.')
if optimizer.lower() in ['auto', consts.OptimizerADAM]:
optimizer = optimizers.Adam(lr=learning_rate, decay=1e-8, clipnorm=10.)
elif optimizer.lower() == consts.OptimizerADAMP:
optimizer = optimizers.AdamP(lr=learning_rate, weight_decay=0.025)
elif optimizer.lower() == consts.OptimizerLion:
learning_rate = 1e-4 if learning_rate > 1e-4 else learning_rate
optimizer = optimizers.Lion(lr=learning_rate, wd_ratio=1e-2)
elif optimizer.lower() == consts.OptimizerRMSPROP:
optimizer = optimizers.RMSprop(lr=learning_rate, momentum=0.9, decay=1e-8, clipnorm=10.)
elif optimizer.lower() == consts.OptimizerSGD:
optimizer = optimizers.SGD(lr=learning_rate, momentum=0.9, nesterov=True, decay=1e-8, clipnorm=10.)
else:
raise ValueError(f'Unsupport this optimizer: {optimizer}.')
logger.info(f'The compile optimizer is `{optimizer._name}`, learning rate is {learning_rate}.')
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
return model
def _dataloader(self, X, y, task, window=1, horizon=1, forecast_length=1, reset_forecast_start=False):
""" Load data set.
Parameters
----------
Forecast task data format:
X - 2D DataFrame, shape: (series_length, nb_covariables)
y - 2D DataFrame, shape: (series_length, nb_target_variables)
Classification or Regression task data format:
X - 3D array-like, shape: (nb_samples, series_length, nb_variables)
y - 2D array-like, shape: (nb_samples, nb_classes)
window: 'int' or None, default 1, length of the time series sequences for a sample.
This must be specified for a forecast task.
horizon: 'int' or None, default 1, representing the time interval between the start point
of prediction time and the end point of observation time.
This must be specified for a forecast task.
forecast_length: 'int', default 1.
A forecast field of vision during a forecast task.
"""
if task in consts.TASK_LIST_FORECAST + consts.TASK_LIST_DETECTION:
tb = get_tool_box(X, y)
target_length = tb.get_shape(y)[1]
continuous_length = len(self.meta.cont_column_names)
categorical_length = len(self.meta.cat_column_names)
column_names = self.meta.cont_column_names + self.meta.cat_column_names
data = tb.concat_df([y, X], axis=1).drop([self.timestamp], axis=1)
data = tb.df_to_array(data[column_names]).astype(consts.DATATYPE_TENSOR_FLOAT)
target_start = window + horizon - 1
inputs = data.copy()
targets = data[target_start:].copy()
targets_app = np.zeros(inputs.size - targets.size, dtype=consts.DATATYPE_TENSOR_FLOAT)
targets = np.append(targets, targets_app).reshape(inputs.shape)
sequences = from_array_to_timeseries(inputs, targets, window, forecast_length)
try:
X_data, y_data = [], []
for _, batch in enumerate(sequences):
X_batch, y_batch = batch
X_data.append(X_batch.numpy())
y_data.append(y_batch.numpy())
X_data = np.concatenate(X_data, axis=0)
y_data = np.concatenate(y_data, axis=0)[:, :, :target_length]
except:
raise ValueError(f'Reset slide window, which should be less than {len(X)//2}.')
if reset_forecast_start:
self.forecast_start = data[-window:].reshape(1, window, data.shape[1])
if categorical_length != 0:
X_cont = X_data[:, :, :continuous_length]
X_cat = X_data[:, :, continuous_length:]
X_data = [X_cont, X_cat]
if reset_forecast_start:
X_cont_start = self.forecast_start[:, :, :continuous_length]
X_cat_start = self.forecast_start[:, :, continuous_length:]
self.forecast_start = [X_cont_start, X_cat_start]
if task in consts.TASK_LIST_DETECTION:
if categorical_length != 0:
y_data = X_data[0][:, :, :self.reconstract_dim]
else:
y_data = X_data[:, :, :self.reconstract_dim]
else:
tb = get_tool_box(X)
self.window = tb.get_shape(X)[1]
X_data = X.astype(consts.DATATYPE_TENSOR_FLOAT)
y_data = y
return X_data, y_data
def _from_tensor_slices(self, X, y, batch_size, epochs=None, shuffle=False, drop_remainder=True):
"""Creates a `Dataset` whose elements are slices of the given tensors.
Returns
----------
Dataset: A `Dataset`.
"""
data = {}
for c in self.continuous_columns:
if isinstance(X, list):
data[c.name] = X[0].astype(consts.DATATYPE_TENSOR_FLOAT)
else:
data[c.name] = X.astype(consts.DATATYPE_TENSOR_FLOAT)
if self.categorical_columns is not None and len(self.categorical_columns) > 0:
data['input_categorical_vars_all'] = X[1].astype(consts.DATATYPE_TENSOR_FLOAT)
dataset = tf.data.Dataset.from_tensor_slices((data, y))
if epochs is not None:
dataset = dataset.repeat(epochs+1)
if shuffle:
dataset = dataset.shuffle(y.shape[0])
dataset = dataset.batch(batch_size, drop_remainder=drop_remainder and y.shape[0] >= batch_size)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
return dataset
def _preprocessor(self, X, y):
""" The feature is preprocessed and the continuous columns and categorical columns
related information is obtained.
Notes
----------
continuous_columns: CategoricalColumn class.
Contains some information(name, column_names, input_dim, dtype,
input_name) about continuous variables.
categorical_columns: CategoricalColumn class.
Contains some information(name, vocabulary_size, embedding_dim,
dtype, input_name) about categorical variables.
"""
self.n_samples_ = len(X)
if isinstance(X.iloc[0, 0], (np.ndarray, pd.Series)):
self.meta = MetaTSCprocessor(task=self.task)
X, y = self.meta.fit_transform(X, y)
self.continuous_columns = self.meta.continuous_columns
self.categorical_columns = self.meta.categorical_columns
else:
self.meta = MetaTSFprocessor(timestamp=self.timestamp, embedding_output_dim=self.embedding_output_dim)
X, y = self.meta.fit_transform(X, y)
self.continuous_columns = self.meta.continuous_columns
self.categorical_columns = self.meta.categorical_columns
return X, y
@property
def monitor(self):
"""Gets monitor for ReduceLROnPlateau and EarlyStopping.
"""
monitor = self.monitor_metric
if monitor is None:
if self.metrics is not None and len(self.metrics) > 0:
monitor = 'val_' + self.first_metric_name
return monitor
@property
def first_metric_name(self):
"""Get first metric name.
"""
if self.metrics is None or len(self.metrics) <= 0:
raise ValueError('`metrics` is none or empty.')
first_metric = self.metrics[0]
if isinstance(first_metric, str):
return first_metric
if callable(first_metric):
return first_metric.__name__
raise ValueError('`metric` must be string or callable object.')
[docs] def plot_net(self, model_file):
"""Plot net model architecture.
"""
plot_model(self.model, to_file=f'{model_file}/model.png', show_shapes=True)
[docs] def save_model(self, model_file, name='dl_model', external=False):
"""Save the instance object.
"""
import h5py, io
if external:
open_func = open
else:
open_func = fs.open
if model_file.endswith('.pkl'):
model_file = os.path.splitext(model_file)[0]
with open_func(f'{model_file}_{name}.h5', "wb") as fw:
buf = io.BytesIO()
with h5py.File(buf, 'w') as h:
save_model(self.model, h, save_format='h5')
data = buf.getvalue()
buf.close()
fw.write(data)
# del self.model
# self.model = None
tf.keras.backend.clear_session()
logger.info('Save model to disk.')
[docs] @staticmethod
def load_model(model_file, name='dl_model', external=False):
"""Load the instance object.
"""
import h5py, io
if external:
open_func = open
else:
open_func = fs.open
try:
from tensorflow.python import keras
from hyperts.framework.dl.dl_utils.saveconfig import compile_args_from_training_config
keras.saving.saving_utils.compile_args_from_training_config = compile_args_from_training_config
except:
raise ValueError('Perhaps updating version Tensorflow above 2.3.0 will solve the issue.')
custom_objects = {}
custom_objects.update(layers.layers_custom_objects)
custom_objects.update(losses.losses_custom_objects)
custom_objects.update(metrics.metrics_custom_objects)
custom_objects.update(optimizers.optimizer_custom_objects)
if model_file.endswith('.pkl'):
model_file = os.path.splitext(model_file)[0]
with open_func(f'{model_file}_{name}.h5', "rb") as fp:
data = fp.read()
buf = io.BytesIO(data)
del data
with h5py.File(buf, 'r') as h:
model = load_model(h, custom_objects=custom_objects)
logger.info('Loaded model from disk.')
return model
def __getstate__(self):
states = dict(self.__dict__)
if 'model' in states:
del states['model']
return states
[docs]class BaseDeepDetectionMixin:
"""Mixin class for anomaly detection in estimator wrapper.
Parameters
----------
name: str, the name of detection algorithm.
contamination: float, the range in (0., 0.5), optional (default=0.05).
The amount of contamination of the data set, i.e. the proportion of
outliers in the data set. Used when fitting to define the threshold
on the decision function.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,).
The outlier scores of the training data.
threshold_ : float, the threshold is based on `contamination`.
It is the `n_samples * contamination` most abnormal samples in
`decision_scores_`. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1.
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
`threshold_` on `decision_scores_`.
classes_: int, default 2.
Default as binary classification.
"""
def __init__(self, name=None, contamination=0.05):
self.name = name
self.contamination = contamination
self.classes_ = 2
self.decision_scores_ = None
self.threshold_ = None
self.labels_ = None
[docs] def decision_function(self, X, y, batch_size=128, **kwargs):
"""Predict raw anomaly score of X using the fitted detector.
Parameters
----------
X : numpy array of shape (n_samples, timestamp, continuous_dim) or
[(n_samples, timestamp, continuous_dim), (n_samples, timestamp, categorical_dim)]
The training input samples.
y : numpy array of shape (n_samples, timestamp, reconstract_dim).
The training input samples.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
self._check_is_fitted()
sample_size = X.shape[0]
if batch_size >= sample_size:
y_pred = self._predict(X).numpy()
else:
y_pred = []
iters = sample_size // batch_size + 1
for idx in range(iters):
pred = self._predict(X[idx * batch_size:min((idx + 1) * batch_size, sample_size)])
y_pred.append(pred.numpy())
y_pred = np.concatenate(y_pred, axis=0)
timestamp, reconstract_dim = y.shape[1], y.shape[2]
decision_scores = self._pairwise_distances(
y_true=y,
y_pred=y_pred,
n_samples=kwargs.get('n_samples'),
timestamp=timestamp,
reconstract_dim=reconstract_dim)
return decision_scores
[docs] def predict_outliers(self, X, y):
"""Predict labels for sequences in X.
Parameters
----------
X : DataFrame of shape (n_samples, 1+(n_covariates)).
y : DataFrame of shape (n_samples, reconstract_dim).
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
self._check_is_fitted()
X, y = self.meta.transform(X, y)
X_test, y_test = self._dataloader(X=X, y=y, task=self.task, window=self.window,
horizon=self.horizon, forecast_length=self.forecast_length)
decision_func = self.decision_function(X_test, y_test, n_samples=len(X))
is_outlier = np.zeros_like(decision_func, dtype=int)
is_outlier[decision_func > self.threshold_] = 1
return is_outlier
[docs] def predict_outliers_prob(self, X, y, methed='erf'):
"""Predict the probability for sequences in X.
Parameters
----------
X : DataFrame of shape (n_samples, 1+(n_covariates)).
y : DataFrame of shape (n_samples, reconstact_dim).
methed : str, optional {'erf', 'linear'}. Probability conversion method.
Returns
-------
outlier_probability : numpy array of shape (n_samples, n_classes)
For each observation, tells whether or not it should be considered
as an outlier according to the fitted model. Return the outlier
probability, ranging in [0,1]. Note it depends on the number of
classes, which is by default 2 classes ([proba of normal, proba of outliers]).
"""
self._check_is_fitted()
X, y = self.meta.transform(X, y)
X_test, y_test = self._dataloader(X=X, y=y, task=self.task, window=self.window,
horizon=self.horizon, forecast_length=self.forecast_length)
train_scores = self.decision_scores_
mu = np.mean(train_scores)
sigma = np.std(train_scores)
test_scores = self.decision_function(X_test, y_test, n_samples=len(X))
probas = np.zeros((X.shape[0], self.classes_))
if methed == 'linear':
scaler = MinMaxScaler((0, 1))
scaler.fit(train_scores.reshape(-1, 1))
pr = scaler.transform(test_scores.reshape(-1, 1))
else:
pre_erf_score = (test_scores - mu) / (sigma * np.sqrt(24))
pr = erf(pre_erf_score)
pr = pr.ravel().clip(0, 1)
probas[:, 0] = 1. - pr
probas[:, 1] = pr
return probas
[docs] def predict_outliers_confidence(self, X, y):
"""Predict the confidence of model in making the same prediction
under slightly different training sets.
Parameters
-------
X : DataFrame of shape (n_samples, 1+(n_covariates)).
y : DataFrame of shape (n_samples, reconstact_dim).
Returns
-------
confidence : numpy array of shape (n_samples,).
For each observation, tells how consistently the model would
make the same prediction if the training set was perturbed.
Return a probability, ranging in [0,1].
"""
self._check_is_fitted()
X, y = self.meta.transform(X, y)
X_test, y_test = self._dataloader(X=X, y=y, task=self.task, window=self.window,
horizon=self.horizon, forecast_length=self.forecast_length)
test_scores = self.decision_function(X_test, y_test, n_samples=len(X))
n_train_samples = len(self.decision_scores_)
count_instances = np.vectorize(lambda x: np.count_nonzero(self.decision_scores_ <= x))
nb_test_instances = count_instances(test_scores)
posterior_prob = np.vectorize(lambda x: (1+x)/(2+n_train_samples))(nb_test_instances)
confidence = np.vectorize(
lambda p: 1 - binom.cdf(n_train_samples - int(n_train_samples * self.contamination),
n_train_samples, p))(posterior_prob)
prediction = (test_scores > self.threshold_).astype('int32').ravel()
np.place(confidence, prediction == 0, 1 - confidence[prediction == 0])
return confidence
def _get_decision_attributes(self):
"""Calculate key attributes: threshold_ and labels_.
Returns
-------
self : object.
"""
self.threshold_ = np.percentile(self.decision_scores_, 100*(1-self.contamination))
self.labels_ = (self.decision_scores_ > self.threshold_).astype('int').ravel()
self.classes_ = len(np.unique(self.labels_))
logger.info(f'The anomaly threshold: {self.threshold_}')
return self
def _check_is_fitted(self):
"""Check if key attributes 'decision_scores_', 'threshold_',
and 'labels_' are None.
Returns
-------
True or False.
"""
if self.decision_scores_ is None:
return False
elif self.threshold_ is None:
return False
elif self.labels_ is None:
return False
else:
return True
def _update_mixin_params(self, **kwargs):
self.name = kwargs.get('name', None)
self.contamination = kwargs.get('contamination', 0.05)
def _pairwise_distances(self, y_true, y_pred, **kwargs):
"""Utility function to calculate row-wise euclidean distance of two matrix.
Different from pair-wise calculation, this function would not broadcast.
Parameters
----------
y_true : array of shape (n_samples, n_features)
First input samples
y_pred : array of shape (n_samples, n_features)
Second input samples
Returns
-------
distance : array of shape (n_samples,)
Row-wise euclidean distance of y_true and y_pred.
"""
n_samples = kwargs.get('n_samples')
timestamp = kwargs.get('timestamp')
reconstract_dim = kwargs.get('reconstract_dim')
indices = np.arange(len(y_true))[::timestamp]
out_true = y_true[indices].reshape(-1, reconstract_dim)
out_pred = y_pred[indices].reshape(-1, reconstract_dim)
assert out_true.shape == out_pred.shape
n_diff = n_samples - len(out_true)
if n_diff != 0:
out_true = np.concatenate([out_true, y_true[-1][-n_diff:].reshape(-1, reconstract_dim)])
out_pred = np.concatenate([out_pred, y_pred[-1][-n_diff:].reshape(-1, reconstract_dim)])
distance = np.sqrt(np.sum(np.square(out_true - out_pred), axis=1)).ravel()
return distance