Source code for hyperts.framework.dl.models.nbeats

# -*- coding:utf-8 -*-

import numpy as np
import tensorflow as tf
import tensorflow.keras.backend as K

from hyperts.utils import consts
from hyperts.framework.dl import layers
from hyperts.framework.dl import BaseDeepEstimator

from hypernets.utils import logging
logger = logging.get_logger(__name__)


[docs]def linear_space(backcast_length, forecast_length, is_forecast=True): horizon = forecast_length if is_forecast else backcast_length return K.arange(0, horizon) / horizon
[docs]def NBeatsModel(task, continuous_columns, categorical_columns, window=10, nb_steps=1, stack_types=('trend', 'seasonality'), thetas_dim=(4, 8), nb_blocks_per_stack=3, share_weights_in_stack=False, hidden_layer_units=256, nb_outputs=1, nb_harmonics=None, out_activation='linear', **kwargs): """N-BEATS: Neural basis expansion analysis for interpretable time series forecasting. Parameters ---------- task : Str - Support forecast, classification, and regression. See hyperts.utils.consts for details. continuous_columns: CategoricalColumn class. Contains some information(name, column_names, input_dim, dtype, input_name) about continuous variables. categorical_columns: CategoricalColumn class. Contains some information(name, vocabulary_size, embedding_dim, dtype, input_name) about categorical variables. window : Positive Int - Length of the time series sequences for a sample, i.e., backcast_length, default 10. nb_steps : Positive Int - The step length of forecast, i.e., forecast_length, default 1. stack_types : Tuple(Str) - Stack types, optional {'trend', 'seasonality', 'generic'}. thetas_dim : Tuple(Int) - The number of units that make up each dense layer in each block of every stack. nb_blocks_per_stack : Int - The number of block per stack. share_weights_in_stack : Bool - Whether to share weights in stack. hidden_layer_units : Int - The units of hidden layer. nb_outputs : Int, default 1. nb_harmonics : Int or None, -The number of harmonic terms for each stack type, default None. out_activation : Str - Forecast the task output activation function, optional {'linear', 'sigmoid'}, default = 'linear'. References ---------- [1] Boris N. Oreshkin, Dmitri Carpov, Nicolas Chapados, Yoshua Bengio, N-BEATS: Neural basis expansion analysis for interpretable time series forecasting, ICLR 2020. [2] https://github.com/philipperemy/n-beats. """ if task not in consts.TASK_LIST_FORECAST: raise ValueError(f'Unsupported task type {task}.') weights = {} def seasonality_model(thetas, backcast_length, forecast_length, is_forecast): p = thetas.get_shape().as_list()[-1] if p % 2 == 0: p1, p2 = p // 2, p // 2 else: p1, p2 = p // 2, p // 2 + 1 t = linear_space(backcast_length, forecast_length, is_forecast=is_forecast) s1 = K.stack([K.cos(2 * np.pi * i * t) for i in range(p1)]) s2 = K.stack([K.sin(2 * np.pi * i * t) for i in range(p2)]) if p == 1: s = s2 else: s = K.concatenate([s1, s2], axis=0) s = K.cast(s, np.float32) return K.dot(thetas, s) def trend_model(thetas, backcast_length, forecast_length, is_forecast): p = thetas.get_shape().as_list()[-1] t = linear_space(backcast_length, forecast_length, is_forecast=is_forecast) t = K.transpose(K.stack([t ** i for i in range(p)])) t = K.cast(t, np.float32) return K.dot(thetas, K.transpose(t)) def create_block(x, e, stack_id, block_id, stack_type, nb_poly): def n(layer_name): return '/'.join([str(stack_id), str(block_id), stack_type, layer_name]) def reg(layer_with_weights): if share_weights_in_stack: layer_name = layer_with_weights.name.split('/')[-1] try: reused_weights = weights[stack_id][layer_name] return reused_weights except KeyError: pass if stack_id not in weights: weights[stack_id] = {} weights[stack_id][layer_name] = layer_with_weights return layer_with_weights backcast_, forecast_ = {}, {} fc1 = reg(layers.Dense(hidden_layer_units, activation='relu', name=n('fc1'))) fc2 = reg(layers.Dense(hidden_layer_units, activation='relu', name=n('fc2'))) fc3 = reg(layers.Dense(hidden_layer_units, activation='relu', name=n('fc3'))) fc4 = reg(layers.Dense(hidden_layer_units, activation='relu', name=n('fc4'))) if stack_type == 'generic': theta_b = reg(layers.Dense(nb_poly, use_bias=False, name=n('theta_b'))) theta_f = reg(layers.Dense(nb_poly, use_bias=False, name=n('theta_f'))) backcast = reg(layers.Dense(window, name=n('generic_backcast'))) forecast = reg(layers.Dense(nb_steps, name=n('generic_forecast'))) elif stack_type == 'trend': theta_f = theta_b = reg(layers.Dense(nb_poly, use_bias=False, name=n('theta_f_b'))) backcast = layers.Lambda(trend_model, arguments={'is_forecast': False, 'backcast_length': window, 'forecast_length': nb_steps}, name=n('trend_backcast')) forecast = layers.Lambda(trend_model, arguments={'is_forecast': True, 'backcast_length': window, 'forecast_length': nb_steps}, name=n('trend_forecast')) else: # 'seasonality' if nb_harmonics: theta_b = reg(layers.Dense(nb_harmonics, use_bias=False, name=n('theta_b'))) else: theta_b = reg(layers.Dense(nb_steps, use_bias=False, name=n('theta_b'))) theta_f = reg(layers.Dense(nb_steps, use_bias=False, name=n('theta_f'))) backcast = layers.Lambda(seasonality_model, arguments={'is_forecast': False, 'backcast_length': window, 'forecast_length': nb_steps}, name=n('seasonality_backcast')) forecast = layers.Lambda(seasonality_model, arguments={'is_forecast': True, 'backcast_length': window, 'forecast_length': nb_steps}, name=n('seasonality_forecast')) for i in range(input_dim): if not bool(e) and exo_dim > 0: fc0 = layers.Concatenate(name=n(f'concat_x{i}_e'))([x[i]] + [e[j] for j in range(exo_dim)]) else: fc0 = x[i] fc1_ = fc1(fc0) fc2_ = fc2(fc1_) fc3_ = fc3(fc2_) fc4_ = fc4(fc3_) theta_f_ = theta_f(fc4_) theta_b_ = theta_b(fc4_) backcast_[i] = backcast(theta_b_) forecast_[i] = forecast(theta_f_) return backcast_, forecast_ K.clear_session() x_, y_, e_ = {}, {}, {} continuous_inputs, categorical_inputs = layers.build_input_head(window, continuous_columns, categorical_columns) x = layers.build_denses(continuous_columns, continuous_inputs) e = layers.build_embeddings(categorical_columns, categorical_inputs) input_dim = x.shape.as_list()[-1] if input_dim > nb_outputs: s = [nb_outputs, input_dim - nb_outputs] x, c = tf.split(x, num_or_size_splits=s, axis=-1, name='split_target_exo_inputs') input_dim = x.shape.as_list()[-1] else: c = None if e is not None: if c is not None: e = layers.Concatenate(axis=-1, name='concat_continuous_and_categorical_exo_inputs')([c, e]) exo_dim = e.shape.as_list()[-1] for i in range(exo_dim): e_[i] = layers.Lambda(lambda k: k[..., i], name=f'lambda_decompose_exo_dim_{i}')(e) else: exo_dim = 0 for i in range(input_dim): x_[i] = layers.Lambda(lambda k: k[..., i], name=f'lambda_decompose_input_dim_{i}')(x) for stack_id in range(len(stack_types)): stack_type = stack_types[stack_id] nb_poly = thetas_dim[stack_id] for block_id in range(nb_blocks_per_stack): backcast, forecast = create_block(x_, e_, stack_id, block_id, stack_type, nb_poly) for i in range(input_dim): x_[i] = layers.Subtract(name=f'x_sub_stack_{stack_id}_block_{block_id}_input_dim_{i}')([x_[i], backcast[i]]) if stack_id == 0 and block_id == 0: y_[i] = forecast[i] else: y_[i] = layers.Add(name=f'y_sub_stack_{stack_id}_block_{block_id}_input_dim_{i}')([y_[i], forecast[i]]) for i in range(input_dim): y_[i] = layers.Reshape((nb_steps, 1), name=f'y_reshape_input_dim_{i}')(y_[i]) if nb_outputs > 1: outputs = layers.Concatenate(name='concat_y')([y_[i] for i in range(nb_outputs)]) else: outputs = y_[0] if task in consts.TASK_LIST_FORECAST: outputs = layers.Activation(out_activation, name=f'output_activation_{out_activation}')(outputs) all_inputs = list(continuous_inputs.values()) + list(categorical_inputs.values()) model = tf.keras.models.Model(inputs=all_inputs, outputs=[outputs], name=f'N-BEATS') return model
[docs]class NBeats(BaseDeepEstimator): """NBeats Estimator . Parameters ---------- task : Str - Support forecast, classification, and regression. See hyperts.utils.consts for details. stack_types : Tuple(Str) - Stack types, optional {'trend', 'seasonality', 'generic'}. default = ('trend', 'seasonality'). thetas_dim : Tuple(Int) - The number of units that make up each dense layer in each block of every stack. default = (4, 8). nb_blocks_per_stack : Int - The number of block per stack. default = 3. share_weights_in_stack : Bool - Whether to share weights in stack. default = False. hidden_layer_units : Int - The units of hidden layer. default = 256. out_activation : Str - Forecast the task output activation function, optional {'linear', 'sigmoid', 'tanh'}, default = 'linear'. timestamp : Str or None - Timestamp name, the forecast task must be given, default None. window : Positive Int - Length of the time series sequences for a sample, default = 3. horizon : Positive Int - Length of the prediction horizon, default = 1. forecast_length : Positive Int - Step of the forecast outputs, default = 1. metrics : Str - List of metrics to be evaluated by the model during training and testing, default = 'auto'. monitor_metric : Str - Quality indicators monitored during neural network training. default = 'val_loss'. optimizer : Str or keras Instance - for example, 'adam', 'sgd', and so on. default = 'auto'. learning_rate : Positive Float - The optimizer's learning rate, default = 0.001. loss : Str - Loss function, for forecsting or regression, optional {'auto', 'mae', 'mse', 'huber_loss', 'mape'}, for classification, optional {'auto', 'categorical_crossentropy', 'binary_crossentropy}, default = 'auto'. reducelr_patience : Positive Int - The number of epochs with no improvement after which learning rate will be reduced, default = 5. earlystop_patience : Positive Int - The number of epochs with no improvement after which training will be stopped, default = 5. summary : Bool - Whether to output network structure information, default = True. continuous_columns: CategoricalColumn class. Contains some information(name, column_names, input_dim, dtype, input_name) about continuous variables. categorical_columns: CategoricalColumn class. Contains some information(name, vocabulary_size, embedding_dim, dtype, input_name) about categorical variables. """ def __init__(self, task, stack_types=('trend', 'seasonality'), thetas_dim=(4, 8), nb_blocks_per_stack=3, share_weights_in_stack=False, hidden_layer_units=256, out_activation='linear', timestamp=None, window=3, horizon=1, forecast_length=1, metrics='auto', monitor_metric='val_loss', optimizer='auto', learning_rate=0.001, loss='auto', reducelr_patience=5, earlystop_patience=10, summary=True, continuous_columns=None, categorical_columns=None, **kwargs): if task in consts.TASK_LIST_FORECAST and timestamp is None: raise ValueError('The forecast task requires [timestamp] name.') self.stack_types = stack_types self.thetas_dim = thetas_dim self.nb_blocks_per_stack = nb_blocks_per_stack self.share_weights_in_stack = share_weights_in_stack self.hidden_layer_units = hidden_layer_units self.out_activation = out_activation self.metrics = metrics self.optimizer = optimizer self.learning_rate = learning_rate self.loss = loss self.summary = summary self.model_kwargs = kwargs.copy() super(NBeats, self).__init__(task=task, timestamp=timestamp, window=window, horizon=horizon, forecast_length=forecast_length, monitor_metric=monitor_metric, reducelr_patience=reducelr_patience, earlystop_patience=earlystop_patience, continuous_columns=continuous_columns, categorical_columns=categorical_columns) def _build_estimator(self, **kwargs): model_params = { 'task': self.task, 'window': self.window, 'stack_types': self.stack_types, 'continuous_columns': self.continuous_columns, 'categorical_columns': self.categorical_columns, 'thetas_dim': self.thetas_dim, 'nb_blocks_per_stack': self.nb_blocks_per_stack, 'share_weights_in_stack': self.share_weights_in_stack, 'nb_outputs': self.meta.classes_, 'nb_steps': self.forecast_length, 'hidden_layer_units': self.hidden_layer_units, 'out_activation': self.out_activation, } model_params = {**model_params, **self.model_kwargs, **kwargs} return NBeatsModel(**model_params) def _fit(self, train_X, train_y, valid_X, valid_y, **kwargs): train_ds = self._from_tensor_slices(X=train_X, y=train_y, batch_size=kwargs['batch_size'], epochs=kwargs['epochs'], shuffle=True) valid_ds = self._from_tensor_slices(X=valid_X, y=valid_y, batch_size=kwargs.pop('batch_size'), epochs=kwargs['epochs'], shuffle=False) model = self._build_estimator() if self.summary and kwargs['verbose'] != 0: model.summary() else: logger.info(f'Number of current NBeats params: {model.count_params()}') model = self._compile_model(model, self.optimizer, self.learning_rate) history = model.fit(train_ds, validation_data=valid_ds, **kwargs) return model, history @tf.function(experimental_relax_shapes=True) def _predict(self, X): return self.model(X, training=False)