Source code for hyperts.framework.dl.models.vae

# -*- coding:utf-8 -*-

import numpy as np
import tensorflow as tf
import tensorflow.keras.backend as K

from hyperts.utils import consts
from hyperts.framework.dl import layers
from hyperts.framework.dl import BaseDeepEstimator
from hyperts.framework.dl import BaseDeepDetectionMixin

from hypernets.utils import logging
logger = logging.get_logger(__name__)


[docs]def vae_loss_funcion(input_x, decoder, z_mean, z_log_var, reconstruct_loss='mse', beta=0.1): """Loss = Reconstruction loss + Kullback-Leibler loss Parameters ---------- input_x: Tensor, input time series. shape (batch_size, timestamp, features). decoder: Tensor, decoder reconstruct output. shape (batch_size, timestamp, features). z_mean: Tensor, latent representation mean of encoder. shape (batch_size, latent_dim). z_log_var: Tensor, latent representation log_var of encoder. shape (batch_size, latent_dim). reconstruct_loss: str, optional {'binary_crossentropy', ''mse'}. beta: positive float, a coefficient. """ assert K.int_shape(input_x) == K.int_shape(decoder) if reconstruct_loss == 'binary_crossentropy': reconstruction_loss = K.sum(K.binary_crossentropy(input_x, decoder), axis=[1, 2]) elif reconstruct_loss == 'mse': reconstruction_loss = K.sum(K.square(input_x - decoder), axis=[1, 2]) else: raise ValueError(f'Unsupported {reconstruct_loss} loss function.') kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1) vae_loss = K.mean(reconstruction_loss + beta * kl_loss) return vae_loss
[docs]def ConvVAEModel(task, window, nb_outputs, continuous_columns, categorical_columns, latent_dim=2, conv_type='general', cnn_filters=16, kernel_size=1, strides=1, nb_layers=2, activation='relu', drop_rate=0.0, out_activation='linear', **kwargs): """Variational AutoEncoder (VAE). Parameters ---------- task : Str - Only support anomaly detection. See hyperts.utils.consts for details. window : Positive Int - Length of the time series sequences for a sample. nb_outputs : Int, default 1. continuous_columns : CategoricalColumn class. Contains some information(name, column_names, input_dim, dtype, input_name) about continuous variables. categorical_columns : CategoricalColumn class. Contains some information(name, vocabulary_size, embedding_dim, dtype, input_name) about categorical variables. latent_dim : Int - Latent representation of encoder, default 2. conv_type : Str - Type of 1D convolution, optional {'general', 'separable'}, default 'general'. cnn_filters : Positive Int - The dimensionality of the output space (i.e. the number of filters in the convolution). kernel_size : Positive Int - A single integer specifying the spatial dimensions of the filters, strides : Int or tuple/list of a single integer - Specifying the stride length of the convolution. nb_layers : Int - The layers of encoder and decoder, default 2. activation : Str - The activation of hidden layers, default 'relu'. drop_rate : Float between 0 and 1 - The rate of Dropout for neural nets. out_activation : Str - Forecast the task output activation function, optional {'linear', 'sigmoid', 'tanh'}, default 'linear'. """ if task not in consts.TASK_LIST_DETECTION: raise ValueError(f'Unsupported task type {task}.') if conv_type.lower() == 'general': ConvCell = layers.Conv1D elif conv_type.lower() == 'separable': ConvCell = layers.SeparableConv1D else: raise ValueError(f"Unsupported task type {conv_type}, optional {'general', 'separable'}") K.clear_session() continuous_inputs, categorical_inputs = layers.build_input_head(window, continuous_columns, categorical_columns) denses = layers.build_denses(continuous_columns, continuous_inputs) embeddings = layers.build_embeddings(categorical_columns, categorical_inputs) if embeddings is not None: denses = layers.LayerNormalization(name='layer_norm')(denses) x = layers.Concatenate(axis=-1, name='concat_embeddings_dense_inputs')([denses, embeddings]) else: x = denses if kernel_size > window: kernel_size = max(int(window // 3), 1) logger.warning(f'kernel_size cannot be larger than window, so it is reset to {kernel_size}.') conv_filters_list = [] hidden_units = cnn_filters for i in range(nb_layers): conv_filters_list.append(cnn_filters) cnn_filters = cnn_filters // 2 if cnn_filters > 4 else cnn_filters # Encoder e = x for i, filters in enumerate(conv_filters_list): e = ConvCell(filters=filters, kernel_size=kernel_size, strides=strides, activation=activation, padding='same', name=f'encoder_conv1d_{i}')(e) e = layers.Dropout(rate=drop_rate, name=f'encoder_dropout_rate{drop_rate}_{i}')(e) inter_shape = K.int_shape(e) e = layers.Flatten(name='encoder_flatten_conv')(e) e = layers.Dense(units=hidden_units, activation=activation, name='encoder_hidden_dense')(e) z_mean = layers.Dense(units=latent_dim, name='z_mean')(e) z_log_var = layers.Dense(units=latent_dim, name='z_log_var')(e) encoder = layers.Sampling(name='sampling')([z_mean, z_log_var]) # Decoder d = layers.Dense(units=np.prod(inter_shape[1:]), name='decoder_inter')(encoder) d = layers.Reshape(target_shape=inter_shape[1:], name='decoder_reshape_inter')(d) for j, filters in enumerate(conv_filters_list[::-1]): d = layers.Conv1DTranspose(filters=filters, kernel_size=kernel_size, strides=strides, activation=activation, padding='same', name=f'decoder_conv1dtranspose_{j}')(d) d = layers.Dropout(rate=drop_rate, name=f'decoder_dropout_rate{drop_rate}_{j}')(d) d = layers.Conv1DTranspose(filters=nb_outputs, kernel_size=kernel_size, padding='same', name='decoder_outputs')(d) decoder = layers.Activation(activation=out_activation, name=f'decoder_activation_{out_activation}')(d) all_inputs = list(continuous_inputs.values()) + list(categorical_inputs.values()) all_outputs = [decoder] model = tf.keras.models.Model(inputs=all_inputs, outputs=all_outputs, name='ConvVAE') # Loss if out_activation == 'sigmoid': reconstruct_loss = 'binary_crossentropy' else: reconstruct_loss = 'mse' vae_loss = vae_loss_funcion(input_x=x[:, :, :nb_outputs], decoder=decoder, z_mean=z_mean, z_log_var=z_log_var, reconstruct_loss=reconstruct_loss, beta=0.1) model.add_loss(vae_loss) return model
[docs]class ConvVAE(BaseDeepEstimator, BaseDeepDetectionMixin): """Convolution Variational AutoEncoder Estimator (VAE). Parameters ---------- task : Str - Only support anomaly detection. See hyperts.utils.consts for details. timestamp : Str or None - Timestamp name, the forecast task must be given, default None. window : Positive Int - Length of the time series sequences for a sample. horizon : Positive Int - Length of the prediction horizon, default = 1. forecast_length : Positive Int - Step of the forecast outputs, default = 1. latent_dim : Int - Latent representation of encoder, default 2. conv_type : Str - Type of 1D convolution, optional {'general', 'separable'}, default 'general'. cnn_filters : Positive Int - The dimensionality of the output space (i.e. the number of filters in the convolution). kernel_size : Positive Int - A single integer specifying the spatial dimensions of the filters, strides : Int or tuple/list of a single integer - Specifying the stride length of the convolution. nb_layers : Int - The layers of encoder and decoder, default 2. activation : Str - The activation of hidden layers, default 'relu'. drop_rate : Float between 0 and 1 - The rate of Dropout for neural nets. out_activation : Str - Forecast the task output activation function, optional {'linear', 'sigmoid', 'tanh'}, default 'linear'. metrics : Str - List of metrics to be evaluated by the model during training and testing, default = 'auto'. monitor_metric : Str - Quality indicators monitored during neural network training. default = 'val_loss'. optimizer : Str or keras Instance - for example, 'adam', 'sgd', and so on. default = 'auto'. learning_rate : Positive Float - The optimizer's learning rate, default = 0.001. reducelr_patience : Positive Int - The number of epochs with no improvement after which learning rate will be reduced, default = 5. earlystop_patience : Positive Int - The number of epochs with no improvement after which training will be stopped, default = 5. summary : Bool - Whether to output network structure information, default = True. continuous_columns : CategoricalColumn class. Contains some information(name, column_names, input_dim, dtype, input_name) about continuous variables. categorical_columns : CategoricalColumn class. Contains some information(name, vocabulary_size, embedding_dim, dtype, input_name) about categorical variables. """ def __init__(self, task, timestamp, contamination=0.05, window=3, horizon=1, forecast_length=1, latent_dim=2, conv_type='separable', cnn_filters=16, kernel_size=1, strides=1, nb_layers=2, activation='relu', drop_rate=0.2, out_activation='linear', reconstract_dim=None, metrics='auto', monitor_metric='val_loss', optimizer='auto', learning_rate=0.001, reducelr_patience=5, earlystop_patience=10, summary=True, continuous_columns=None, categorical_columns=None, name='conv_vae', **kwargs): if task not in consts.TASK_LIST_DETECTION: raise ValueError(f'Unsupported task type {task}.') self.latent_dim = latent_dim self.conv_type = conv_type self.cnn_filters = cnn_filters self.kernel_size = kernel_size self.strides = strides self.nb_layers = nb_layers self.activation = activation self.drop_rate =drop_rate self.out_activation = out_activation self.reconstract_dim = reconstract_dim self.metrics = metrics self.optimizer = optimizer self.learning_rate = learning_rate self.summary = summary self.model_kwargs = kwargs.copy() super(ConvVAE, self).__init__(task=task, timestamp=timestamp, window=window, horizon=horizon, forecast_length=forecast_length, monitor_metric=monitor_metric, reducelr_patience=reducelr_patience, earlystop_patience=earlystop_patience, continuous_columns=continuous_columns, categorical_columns=categorical_columns) self._update_mixin_params(name=name, contamination=contamination) def _bulid_estimator(self, **kwargs): if self.reconstract_dim is None: nb_outputs = self.meta.classes_ else: nb_outputs = self.reconstract_dim model_params = { 'task': self.task, 'window': self.window, 'nb_outputs': nb_outputs, 'continuous_columns': self.continuous_columns, 'categorical_columns': self.categorical_columns, 'latent_dim': self.latent_dim, 'conv_type': self.conv_type, 'cnn_filters': self.cnn_filters, 'kernel_size': self.kernel_size, 'strides': self.strides, 'nb_layers': self.nb_layers, 'activation': self.activation, 'drop_rate': self.drop_rate, 'out_activation': self.out_activation, } model_params = {**model_params, **self.model_kwargs, **kwargs} return ConvVAEModel(**model_params) def _fit(self, X_train, y_train, X_valid, y_valid, **kwargs): timestamp, reconstract_dim = y_train.shape[1], y_train.shape[2] train_ds = self._from_tensor_slices(X=X_train, y=y_train, batch_size=kwargs['batch_size'], epochs=kwargs['epochs'], shuffle=True) valid_ds = self._from_tensor_slices(X=X_valid, y=y_valid, batch_size=kwargs.pop('batch_size'), epochs=kwargs['epochs'], shuffle=False) model = self._bulid_estimator(**kwargs) if self.summary and kwargs['verbose'] != 0: model.summary() else: logger.info(f'Number of current ConvVAE params: {model.count_params()}') model = self._compile_model(model, self.optimizer, self.learning_rate) history = model.fit(train_ds, validation_data=valid_ds, **kwargs) y_pred = model.predict(X_train) self.decision_scores_ = self._pairwise_distances( y_true=y_train, y_pred=y_pred, n_samples=self.n_samples_, timestamp=timestamp, reconstract_dim=reconstract_dim) self._get_decision_attributes() return model, history @tf.function(experimental_relax_shapes=True) def _predict(self, X): return self.model(X, training=False)