from hyperts.utils import consts
from collections import OrderedDict
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow.keras.backend as K
from hypernets.utils import logging
logger = logging.get_logger(__name__)
[docs]class MultiColEmbedding(layers.Layer):
"""Multi Columns Embedding base on Embedding.
Parameters
----------
input_dims: Integer. A list or tuple. Sizes of the vocabulary for per columns.
output_dims: A list or tuple. Dimension of the dense embedding.
input_length: Integer. Length of input sequences.
"""
def __init__(self, input_dims, output_dims, input_length=None, **kwargs):
super(MultiColEmbedding, self).__init__(**kwargs)
self.input_dims = input_dims
self.output_dims = output_dims
self.input_length = input_length
self.embeddings = {}
for i, (input_dim, output_dim) in enumerate(zip(self.input_dims, self.output_dims)):
self.embeddings[i] = layers.Embedding(input_dim=input_dim, output_dim=output_dim,
input_length=self.input_length)
[docs] def call(self, inputs, **kwargs):
inputs = tf.cast(inputs, dtype=tf.int32)
embeddings = []
for i in range(inputs.shape[-1]):
sub_inp = inputs[:, :, i]
embeddings.append(self.embeddings[i](sub_inp))
outputs = tf.concat(embeddings, axis=2)
return outputs
[docs] def compute_output_shape(self, input_shape):
assert isinstance(input_shape, list)
return (input_shape[0], sum(self.output_dims))
[docs] def get_config(self):
config = {'input_dims': self.input_dims,
'output_dims': self.output_dims,
'input_length': self.input_length}
base_config = super(MultiColEmbedding, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class WeightedAttention(layers.Layer):
"""Weighted Attention based on softmax-Dense.
Parameters
----------
timesteps: time steps.
input: (none, timesteps, nb_variables)
output: (none, timesteps, nb_variables)
"""
def __init__(self, timesteps, **kwargs):
super(WeightedAttention, self).__init__(**kwargs)
self.timesteps = timesteps
self.dense = layers.Dense(timesteps, activation='softmax')
[docs] def call(self, inputs, **kwargs):
x = tf.transpose(inputs, perm=[0, 2, 1])
x = self.dense(x)
x_weights = tf.transpose(x, perm=[0, 2, 1])
x = tf.multiply(inputs, x_weights)
return x
[docs] def get_config(self):
config = {'timesteps': self.timesteps}
base_config = super(WeightedAttention, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class FeedForwardAttention(layers.Layer):
"""Feed Forward Attention.
Follows the work of Raffel et al. [https://arxiv.org/abs/1512.08756]
input: (none, timesteps, nb_variables)
output: (none, nb_variables) or (none, timesteps, nb_variables)
Parameters
----------
return_sequences: Boolean. Whether to return the last output. in the output
sequence, or the full sequence. Default: `False`.
use_bias: Boolean. Whether the layer uses a bias vector. Default: `False`.
activation: Str. Activation function to use. Default: `tanh`.
Example:
model.add(LSTM(64, return_sequences=True))
model.add(FFAttention())
"""
def __init__(self, return_sequences=False, use_bias=True, activation='tanh', **kwargs):
super(FeedForwardAttention, self).__init__(**kwargs)
self.use_bias = use_bias
self.activation = activation
self.return_sequences = return_sequences
self.dense = layers.Dense(units=1, activation=activation, use_bias=use_bias)
[docs] def call(self, inputs, **kwargs):
x = self.dense(inputs)
x = tf.squeeze(x, axis=-1)
x = tf.nn.softmax(x)
x = tf.expand_dims(x, axis=-1)
x = tf.multiply(inputs, x)
if self.return_sequences:
return x
else:
return tf.reduce_sum(x, axis=1)
[docs] def get_config(self):
config = {'use_bias': self.use_bias,
'return_sequences': self.return_sequences,
'activation': self.activation}
base_config = super(FeedForwardAttention, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class AutoRegressive(layers.Layer):
"""AutoRegressive Layer based on Dense.
Parameters
----------
order: lookback step for original sequence.
input: (none, timesteps, nb_variables)
output: (none, nb_variables)
"""
def __init__(self, order, nb_variables, **kwargs):
super(AutoRegressive, self).__init__(**kwargs)
self.order = order
self.nb_variables = nb_variables
self.transform = models.Sequential([
layers.Lambda(self._cut_period),
layers.Lambda(self._permute_dimensions),
layers.Lambda(self._out_reshape)
])
self.dense = layers.Dense(1)
def _cut_period(self, x):
return x[:, -self.order:, :]
def _permute_dimensions(self, x):
return K.permute_dimensions(x, (0, 2, 1))
def _out_reshape(self, x):
return K.reshape(x, (-1, self.order))
[docs] def call(self, inputs, **kwargs):
x = self.transform(inputs)
x = self.dense(x)
x = tf.reshape(x, (-1, self.nb_variables))
return x
[docs] def get_config(self):
config = {'order': self.order, 'nb_variables': self.nb_variables}
base_config = super(AutoRegressive, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class Highway(layers.Layer):
"""Highway to implement skip conenction based on NIN and GlobalAveragePooling1D.
Parameters
----------
input: (none, timesteps, nb_variables)
output: (none, nb_variables)
"""
def __init__(self, nb_variables, **kwargs):
super(Highway, self).__init__(**kwargs)
self.nb_variables = nb_variables
self.nin = layers.Conv1D(nb_variables, 1, activation='relu', kernel_initializer='he_normal')
self.pool = layers.GlobalAveragePooling1D()
[docs] def call(self, inputs, **kwargs):
x = self.nin(inputs)
x = self.pool(x)
return x
[docs] def get_config(self):
config = {'nb_variables': self.nb_variables}
base_config = super(Highway, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class Time2Vec(layers.Layer):
"""The vector representation for time series.
Parameters
----------
kernel_size: int, dimension of vector embedding.
periodic_activation: str, optional {'sin', 'cos'}, default 'sin'.
input: (none, timesteps, nb_variables)
output: (none, timesteps, kernel_size+nb_variables)
"""
def __init__(self, kernel_size=32, periodic_activation='sin', **kwargs):
super(Time2Vec, self).__init__(**kwargs)
self.k = kernel_size
self.actvition = periodic_activation
[docs] def build(self, input_shape):
# trend
self.trend_weights = self.add_weight(name='trend_weights',
shape=(1, input_shape[-1]),
initializer='glorot_uniform',
trainable=True)
self.trend_bias = self.add_weight(name='trend_bias',
shape=(1, input_shape[-1]),
initializer='zeros',
trainable=True)
# periodic
self.periodic_weights = self.add_weight(name='periodic_weights',
shape=(input_shape[-1], self.k),
initializer='glorot_uniform',
trainable=True)
self.periodic_bias = self.add_weight(name='periodic_bias',
shape=(1, self.k),
initializer='zeros',
trainable=True)
super(Time2Vec, self).build(input_shape)
[docs] def call(self, inputs, **kwargs):
trend = inputs * self.trend_weights + self.trend_bias
if self.actvition.startswith('sin'):
periodic = K.sin(K.dot(inputs, self.periodic_weights) + self.periodic_bias)
elif self.actvition.startswith('cos'):
periodic = K.cos(K.dot(inputs, self.periodic_weights) + self.periodic_bias)
else:
periodic1 = K.sin(K.dot(inputs, self.periodic_weights) + self.periodic_bias)
periodic2 = K.cos(K.dot(inputs, self.periodic_weights) + self.periodic_bias)
periodic = periodic1 + periodic2
return K.concatenate([trend, periodic], -1)
[docs] def compute_output_shape(self, input_shape):
return (input_shape[0], input_shape[1] * (self.k + 1))
[docs] def get_config(self):
config = {'kernel_size': self.k}
base_config = super(Time2Vec, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class RevInstanceNormalization(layers.Layer):
"""Reversible Instance Normalization for Accurate Time-Series Forecasting
against Distribution Shift, ICLR2022.
Parameters
----------
eps: float, a value added for numerical stability, default 1e-5.
affine: bool, if True(default), RevIN has learnable affine parameters.
"""
def __init__(self, eps=1e-5, affine=True, **kwargs):
super(RevInstanceNormalization, self).__init__(**kwargs)
self.eps = eps
self.affine = affine
[docs] def build(self, input_shape):
self.affine_weight = self.add_weight(name='affine_weight',
shape=(1, input_shape[-1]),
initializer='ones',
trainable=True)
self.affine_bias = self.add_weight(name='affine_bias',
shape=(1, input_shape[-1]),
initializer='zeros',
trainable=True)
super(RevInstanceNormalization, self).build(input_shape)
[docs] def call(self, inputs, **kwargs):
mode = kwargs.get('mode', None)
if mode == 'norm':
self._get_statistics(inputs)
x = self._normalize(inputs)
elif mode == 'denorm':
x = self._denormalize(inputs)
else:
raise NotImplementedError('Only modes norm and denorm are supported.')
return x
def _get_statistics(self, x):
dim2reduce = tuple(range(1, len(x.shape) - 1))
self.mean = K.stop_gradient(K.mean(x, axis=dim2reduce, keepdims=True))
self.stdev = K.stop_gradient(K.sqrt(K.var(x, axis=dim2reduce, keepdims=True) + self.eps))
print(self.stdev)
def _normalize(self, x):
x = x - self.mean
x = x / self.stdev
if self.affine:
x = x * self.affine_weight
x = x + self.affine_bias
return x
def _denormalize(self, x):
if self.affine:
x = x - self.affine_bias
x = x / (self.affine_weight + self.eps*self.eps)
x = x * self.stdev
x = x + self.mean
return x
[docs] def get_config(self):
config = {'eps': self.eps,
'affine': self.affine}
base_config = super(RevInstanceNormalization, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class Identity(layers.Layer):
"""Identity Layer.
"""
def __init__(self, **kwargs):
super(Identity, self).__init__(**kwargs)
[docs] def call(self, inputs, **kwargs):
return inputs
[docs]class Shortcut(layers.Layer):
""" Shortcut Layer.
Parampers
----------
filters: the dimensionality of the output space for Conv1D.
"""
def __init__(self, filters, activation='relu', **kwargs):
super(Shortcut, self).__init__(**kwargs)
self.filters = filters
self.activation = activation
self.conv = layers.Conv1D(filters, kernel_size=1, padding='same', use_bias=False)
self.bn = layers.BatchNormalization()
[docs] def call(self, inputs, **kwargs):
x = self.conv(inputs)
x = self.bn(x)
return x
[docs] def get_config(self):
config = {'filters': self.filters}
base_config = super(Shortcut, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class InceptionBlock(layers.Layer):
"""InceptionBlock for time series.
Parampers
----------
filters: the dimensionality of the output space for Conv1D.
kernel_size_list: list or tuple, a list of kernel size for Conv1D.
strides: int or tuple, default 1.
use_bottleneck: bool, whether to use bottleneck, default True.
bottleneck_size: int, if use bottleneck, bottleneck_size is 32(default).
activation: str, activation function, default 'relu'.
"""
def __init__(self,
filters=32,
kernel_size_list=(1, 3, 5, 8, 12),
strides=1,
use_bottleneck=True,
bottleneck_size=32,
activation='linear',
**kwargs):
super(InceptionBlock, self).__init__(**kwargs)
self.filters = filters
self.kernel_size_list = kernel_size_list
self.strides = strides
self.use_bottleneck = use_bottleneck
self.bottleneck_size = bottleneck_size
self.activation = activation
if use_bottleneck:
self.head = layers.Conv1D(bottleneck_size, 1, padding='same', activation=activation, use_bias=False)
else:
self.head = Identity()
self.conv_list = []
for kernel_size in kernel_size_list:
self.conv_list.append(layers.Conv1D(filters=filters,
kernel_size=kernel_size,
padding='same',
activation=activation,
use_bias=False))
self.max_pool = layers.MaxPool1D(pool_size=3, strides=1, padding='same')
self.pool_conv = layers.Conv1D(filters, kernel_size=1, padding='same', activation=activation, use_bias=False)
self.concat = layers.Concatenate(axis=2)
self.bn = layers.BatchNormalization()
self.relu = layers.Activation(activation='relu')
[docs] def call(self, inputs, **kwargs):
x = self.head(inputs)
convs = [conv(x) for conv in self.conv_list]
pool = self.max_pool(x)
pool_conv = self.pool_conv(pool)
convs.append(pool_conv)
x = self.concat(convs)
x = self.bn(x)
x = self.relu(x)
return x
[docs] def get_config(self):
config = {'filters': self.filters,
'kernel_size_list': self.kernel_size_list,
'strides': self.strides,
'use_bottleneck': self.use_bottleneck,
'bottleneck_size': self.bottleneck_size,
'activation': self.activation}
base_config = super(InceptionBlock, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class FactorizedReduce(layers.Layer):
"""Factorized reduce for timestemp or variable.
Parampers
----------
period: lookback step for original sequence.
filters: the dimensionality of the output space for Conv1D.
strides: int or tuple, default 1.
"""
def __init__(self, period, filters, strides=1, **kwargs):
super(FactorizedReduce, self).__init__(**kwargs)
self.period = period
self.filters = filters
self.strides = strides
self.cropping1 = models.Sequential([
layers.Conv1D(filters, kernel_size=1, strides=strides, use_bias=False),
layers.Lambda(self._cut_period)
])
self.cropping2 = models.Sequential([
layers.ZeroPadding1D(padding=(0, 1)),
layers.Cropping1D(cropping=(1, 0)),
layers.Conv1D(filters, kernel_size=1, strides=strides, use_bias=False),
layers.Lambda(self._cut_period)
])
self.concat = layers.Concatenate(axis=-1)
self.bn = layers.BatchNormalization()
def _cut_period(self, x):
return x[:, -self.period:, :]
[docs] def call(self, inputs, **kwargs):
c1 = self.cropping1(inputs)
c2 = self.cropping2(inputs)
x = self.concat([c1, c2])
x = self.bn(x)
return x
[docs] def get_config(self):
config = {'period': self.period,
'filters': self.filters,
'strides': self.strides}
base_config = super(FactorizedReduce, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class Sampling(layers.Layer):
"""Reparametrisation by sampling from Gaussian N(0,I).
Parampers
----------
"""
def __init__(self, **kwargs):
super(Sampling, self).__init__(**kwargs)
[docs] def call(self, inputs, **kwargs):
z_mean, z_log_var = inputs
epsilon = K.random_normal(shape=K.shape(z_mean))
return z_mean + K.exp(0.5 * z_log_var) * epsilon
[docs] def get_config(self):
config = {}
base_config = super(Sampling, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]def build_denses(continuous_columns, continuous_inputs, use_layernormalization=False):
"""Concatenate continuous inputs.
Parameters
----------
continuous_columns: CategoricalColumn class.
Contains some information(name, column_names, input_dim, dtype,
input_name) about continuous variables.
continuous_inputs: list, tf.keras.layers.Input objects.
use_layernormalization: bool, default False.
"""
if len(continuous_inputs) > 1:
dense_layer = layers.Concatenate(name='concat_continuous_inputs')(
list(continuous_inputs.values()))
else:
dense_layer = list(continuous_inputs.values())[0]
if use_layernormalization:
dense_layer = layers.LayerNormalization(name='continuous_inputs_ln')(dense_layer)
return dense_layer
[docs]def build_embeddings(categorical_columns, categorical_inputs):
"""Build embeddings if there are categorical variables.
Parameters
----------
categorical_columns: CategoricalColumn class.
Contains some information(name, vocabulary_size, embedding_dim,
dtype, input_name) about categorical variables.
categorical_inputs: list, tf.keras.layers.Input objects.
"""
if 'all_categorical_vars' in categorical_inputs:
input_layer = categorical_inputs['all_categorical_vars']
input_dims = [column.vocabulary_size for column in categorical_columns]
output_dims = [column.embedding_dim for column in categorical_columns]
embeddings = MultiColEmbedding(input_dims, output_dims)(input_layer)
else:
embeddings = None
return embeddings
[docs]def build_output_tail(x, task, nb_outputs, nb_steps=1):
"""Build the output tail.
Parameters
----------
task: str, See hyperts.utils.consts for details.
nb_outputs: int, the number of output units.
nb_steps: int, the step length of forecast, default 1.
"""
if task in consts.TASK_LIST_REGRESSION + consts.TASK_LIST_BINARYCLASS:
outputs = layers.Dense(units=1, activation='sigmoid', name='dense_out')(x)
elif task in consts.TASK_LIST_MULTICLASS:
outputs = layers.Dense(units=nb_outputs, activation='softmax', name='dense_out')(x)
elif task in consts.TASK_LIST_FORECAST:
outputs = layers.Dense(units=nb_outputs*nb_steps, activation='linear', name='dense_out')(x)
outputs = layers.Lambda(lambda k: K.reshape(k, (-1, nb_steps, nb_outputs)), name='lambda_out')(outputs)
else:
raise ValueError(f'Unsupported task type {task}.')
return outputs
[docs]def rnn_forward(x, nb_units, nb_layers, rnn_type, name, drop_rate=0., i=0, activation='tanh', return_sequences=False):
"""Multi-RNN layers.
Parameters
----------
nb_units: int, the dimensionality of the output space for
recurrent neural network.
nb_layers: int, the number of the layers for recurrent neural network.
rnn_type: str, type of recurrent neural network,
including {'basic', 'gru', 'lstm'}.
name: recurrent neural network name.
drop_rate: float, the rate of Dropout for neural nets, default 0.
return_sequences: bool, whether to return the last output. in the output
sequence, or the full sequence. default False.
"""
RnnCell = {'lstm': layers.LSTM, 'gru': layers.GRU, 'basic': layers.SimpleRNN}[rnn_type]
for i in range(nb_layers - 1):
x = RnnCell(units=nb_units,
activation=activation,
return_sequences=True,
name=f'{name}_{i}')(x)
if drop_rate > 0. and drop_rate < 0.5:
x = layers.Dropout(rate=drop_rate, name=f'{name}_{i}_dropout')(x)
elif drop_rate >= 0.5:
x = layers.BatchNormalization(name=f'{name}_{i}_norm')(x)
x = RnnCell(units=nb_units,
activation=activation,
return_sequences=return_sequences,
name=f'{name}_{i+1}')(x)
return x
layers_custom_objects = {
'MultiColEmbedding': MultiColEmbedding,
'WeightedAttention': WeightedAttention,
'FeedForwardAttention': FeedForwardAttention,
'AutoRegressive': AutoRegressive,
'Highway': Highway,
'Time2Vec': Time2Vec,
'RevInstanceNormalization': RevInstanceNormalization,
'Identity': Identity,
'Shortcut': Shortcut,
'InceptionBlock': InceptionBlock,
'FactorizedReduce': FactorizedReduce,
'Sampling': Sampling,
}