Source code for hyperts.framework.dl.models.lstnet

# -*- coding:utf-8 -*-

import tensorflow as tf
import tensorflow.keras.backend as K

from hyperts.utils import consts
from hyperts.framework.dl import layers
from hyperts.framework.dl import BaseDeepEstimator

from hypernets.utils import logging
logger = logging.get_logger(__name__)


[docs]def LSTNetModel(task, window, rnn_type, skip_rnn_type, continuous_columns, categorical_columns, cnn_filters, kernel_size, rnn_units, rnn_layers, skip_rnn_units, skip_rnn_layers, skip_period, ar_order, drop_rate=0., nb_outputs=1, nb_steps=1, out_activation='linear', **kwargs): """Long-and Short-term Time-series Network Model (LSTNet). Parameters ---------- task : Str - Support forecast, classification, and regression. See hyperts.utils.consts for details. window : Positive Int - Length of the time series sequences for a sample. rnn_type : Str - Type of recurrent neural network, optional {'basic', 'gru', 'lstm}. skip_rnn_type : Str - Type of skip recurrent neural network, optional {'simple_rnn', 'gru', 'lstm}, default = 'gru'. continuous_columns: CategoricalColumn class. Contains some information(name, column_names, input_dim, dtype, input_name) about continuous variables. categorical_columns: CategoricalColumn class. Contains some information(name, vocabulary_size, embedding_dim, dtype, input_name) about categorical variables. cnn_filters: Positive Int - The dimensionality of the output space (i.e. the number of filters in the convolution). kernel_size: Positive Int - A single integer specifying the spatial dimensions of the filters, rnn_units : Positive Int - The dimensionality of the output space for RNN. rnn_layers : Positive Int - The number of the layers for RNN. skip_rnn_units : Positive Int - The dimensionality of the output space for skip RNN. skip_rnn_layers : Positive Int - The number of the layers for skip RNN. skip_period: Positive Int or 0 - The length of skip for recurrent neural network. ar_order : Positive Int or None - The window size of the autoregressive component. drop_rate : Float between 0 and 1 - The rate of Dropout for neural nets. nb_outputs : Int, default 1. nb_steps : Int, The step length of forecast, default 1. out_activation : Str - Forecast the task output activation function, optional {'linear', 'sigmoid', 'tanh'}, default = 'linear'. """ K.clear_session() continuous_inputs, categorical_inputs = layers.build_input_head(window, continuous_columns, categorical_columns) denses = layers.build_denses(continuous_columns, continuous_inputs) embeddings = layers.build_embeddings(categorical_columns, categorical_inputs) if embeddings is not None: denses = layers.LayerNormalization(name='layer_norm')(denses) x = layers.Concatenate(axis=-1, name='concat_embeddings_dense_inputs')([denses, embeddings]) else: x = denses # backbone if ar_order > window: ar_order = max(int(window // 3), 1) logger.warning(f'ar_order cannot be larger than window, so it is reset to {ar_order}.') if kernel_size > window: kernel_size = max(int(window // 3), 1) logger.warning(f'kernel_size cannot be larger than window, so it is reset to {kernel_size}.') pt = int((window - kernel_size + 1) / skip_period) if skip_period > 0 else 0 c = layers.SeparableConv1D(cnn_filters, kernel_size, activation='relu', name='conv1d')(x) c = layers.Dropout(rate=drop_rate, name='conv1d_dropout')(c) r = layers.rnn_forward(c, rnn_units, rnn_layers, rnn_type, name=rnn_type, drop_rate=drop_rate) r = layers.Lambda(lambda k: K.reshape(k, (-1, rnn_units)), name=f'lambda_{rnn_type}')(r) r = layers.Dropout(rate=drop_rate, name=f'lambda_{rnn_type}_dropout')(r) if skip_period*pt > 0: s = layers.Lambda(lambda k: k[:, int(-pt*skip_period):, :], name=f'lambda_skip_{rnn_type}_0')(c) s = layers.Lambda(lambda k: K.reshape(k, (-1, pt, skip_period, cnn_filters)), name=f'lambda_skip_{rnn_type}_1')(s) s = layers.Lambda(lambda k: K.permute_dimensions(k, (0, 2, 1, 3)), name=f'lambda_skip_{rnn_type}_2')(s) s = layers.Lambda(lambda k: K.reshape(k, (-1, pt, cnn_filters)), name=f'lambda_skip_{rnn_type}_3')(s) s = layers.rnn_forward(s, skip_rnn_units, skip_rnn_layers, skip_rnn_type, name='skip_'+skip_rnn_type, drop_rate=drop_rate) s = layers.Lambda(lambda k: K.reshape(k, (-1, skip_period*skip_rnn_units)), name=f'lambda_skip_{rnn_type}_4')(s) s = layers.Dropout(rate=drop_rate, name=f'lambda_skip_{rnn_type}_dropout')(s) r = layers.Concatenate(name=f'{rnn_type}_concat_skip_{rnn_type}')([r, s]) else: s = layers.GRU(skip_rnn_units, return_sequences=True, name='attention_gru')(c) s = layers.FeedForwardAttention(name='ffattention')(s) r = layers.Concatenate(name=f'{rnn_type}_concat_attention_gru')([r, s]) outputs = layers.build_output_tail(r, task, nb_outputs, nb_steps) if task in consts.TASK_LIST_FORECAST and nb_steps == 1 and ar_order > 0 and ar_order < window: z = layers.Lambda(lambda k: k[:, :, :nb_outputs], name='lambda_ar_0')(denses) z = layers.AutoRegressive(order=ar_order, nb_variables=nb_outputs, name='ar')(z) z = layers.Lambda(lambda k: K.reshape(k, (-1, 1, nb_outputs)), name='lambda_ar_1')(z) outputs = layers.Add(name='rnn_add_ar')([outputs, z]) if task in consts.TASK_LIST_FORECAST: outputs = layers.Activation(out_activation, name=f'output_activation_{out_activation}')(outputs) all_inputs = list(continuous_inputs.values()) + list(categorical_inputs.values()) model = tf.keras.models.Model(inputs=all_inputs, outputs=[outputs], name=f'LSTNet') return model
[docs]class LSTNet(BaseDeepEstimator): """Long-and Short-term Time-series Network Estimator (LSTNet). Parameters ---------- task : Str - Support forecast, classification, and regression. See hyperts.utils.consts for details. rnn_type : Str - Type of recurrent neural network, optional {'simple_rnn', 'gru', 'lstm}, default = 'gru'. skip_rnn_type : Str - Type of skip recurrent neural network, optional {'basic', 'gru', 'lstm}, default = 'gru'. cnn_filters: Positive Int - The dimensionality of the output space (i.e. the number of filters in the convolution), default = 16. kernel_size: Positive Int - A single integer specifying the spatial dimensions of the filters, default = 1. rnn_units : Positive Int - The dimensionality of the output space for recurrent neural network, default = 16. rnn_layers : Positive Int - The number of the layers for recurrent neural network, default = 1. skip_rnn_units : Positive Int - The dimensionality of the output space for skip recurrent neural network, default = 16. skip_rnn_layers : Positive Int - The number of the layers for skip recurrent neural network, default = 1. skip_period: Positive Int or 0 - The length of skip for recurrent neural network, default = 0. ar_order : Positive Int or None - The window size of the autoregressive component, default = None. drop_rate : Float between 0 and 1 - The rate of Dropout for neural nets, default = 0. out_activation : Str - Forecast the task output activation function, optional {'linear', 'sigmoid', 'tanh'}, default = 'linear'. timestamp : Str or None - Timestamp name, the forecast task must be given, default None. window : Positive Int - Length of the time series sequences for a sample, default = 7. horizon : Positive Int - Length of the prediction horizon, default = 1. forecast_length : Positive Int - Step of the forecast outputs, default = 1. metrics : Str - List of metrics to be evaluated by the model during training and testing, default = 'auto'. monitor_metric : Str - Quality indicators monitored during neural network training. default = 'val_loss'. optimizer : Str or keras Instance - for example, 'adam', 'sgd', and so on. default = 'auto'. learning_rate : Positive Float - The optimizer's learning rate, default = 0.001. loss : Str - Loss function, for forecsting or regression, optional {'auto', 'mae', 'mse', 'huber_loss', 'mape'}, for classification, optional {'auto', 'categorical_crossentropy', 'binary_crossentropy}, default = 'auto'. reducelr_patience : Positive Int - The number of epochs with no improvement after which learning rate will be reduced, default = 5. earlystop_patience : Positive Int - The number of epochs with no improvement after which training will be stopped, default = 5. summary : Bool - Whether to output network structure information, default = True. """ def __init__(self, task, rnn_type='gru', skip_rnn_type='gru', cnn_filters=16, kernel_size=1, rnn_units=16, rnn_layers=1, skip_rnn_units=16, skip_rnn_layers=1, skip_period=0, ar_order=0, drop_rate=0., out_activation='linear', timestamp=None, window=7, horizon=1, forecast_length=1, metrics='auto', monitor_metric='val_loss', optimizer='auto', learning_rate=0.001, loss='auto', reducelr_patience=5, earlystop_patience=10, summary=True, continuous_columns=None, categorical_columns=None, **kwargs): if task in consts.TASK_LIST_FORECAST and timestamp is None: raise ValueError('The forecast task requires [timestamp] name.') self.rnn_type = rnn_type self.skip_rnn_type = skip_rnn_type self.cnn_filters = cnn_filters self.kernel_size = kernel_size self.rnn_units = rnn_units self.rnn_layers = rnn_layers self.skip_rnn_units = skip_rnn_units self.skip_rnn_layers = skip_rnn_layers self.skip_period = skip_period self.ar_order = ar_order self.drop_rate = drop_rate self.out_activation = out_activation self.metrics = metrics self.optimizer = optimizer self.learning_rate = learning_rate self.loss = loss self.summary = summary self.model_kwargs = kwargs.copy() super(LSTNet, self).__init__(task=task, timestamp=timestamp, window=window, horizon=horizon, forecast_length=forecast_length, monitor_metric=monitor_metric, reducelr_patience=reducelr_patience, earlystop_patience=earlystop_patience, continuous_columns=continuous_columns, categorical_columns=categorical_columns) def _build_estimator(self, **kwargs): model_params = { 'task': self.task, 'window': self.window, 'rnn_type': self.rnn_type, 'skip_rnn_type': self.skip_rnn_type, 'continuous_columns': self.continuous_columns, 'categorical_columns': self.categorical_columns, 'cnn_filters': self.cnn_filters, 'kernel_size': self.kernel_size, 'rnn_units': self.rnn_units, 'rnn_layers': self.rnn_layers, 'skip_rnn_units': self.skip_rnn_units, 'skip_rnn_layers': self.skip_rnn_layers, 'skip_period': self.skip_period, 'ar_order': self.ar_order, 'drop_rate': self.drop_rate, 'nb_outputs': self.meta.classes_, 'nb_steps': self.forecast_length, 'out_activation': self.out_activation } model_params = {**model_params, **self.model_kwargs, **kwargs} return LSTNetModel(**model_params) def _fit(self, train_X, train_y, valid_X, valid_y, **kwargs): train_ds = self._from_tensor_slices(X=train_X, y=train_y, batch_size=kwargs['batch_size'], epochs=kwargs['epochs'], shuffle=True) valid_ds = self._from_tensor_slices(X=valid_X, y=valid_y, batch_size=kwargs.pop('batch_size'), epochs=kwargs['epochs'], shuffle=False) model = self._build_estimator(**kwargs) if self.summary and kwargs['verbose'] != 0: model.summary() else: logger.info(f'Number of current LSTNet params: {model.count_params()}') model = self._compile_model(model, self.optimizer, self.learning_rate) history = model.fit(train_ds, validation_data=valid_ds, **kwargs) return model, history @tf.function(experimental_relax_shapes=True) def _predict(self, X): return self.model(X, training=False)