# -*- coding:utf-8 -*-
"""
"""
import copy
import numpy as np
from hypernets.core import set_random_state
from hypernets.experiment.compete import SteppedExperiment, ExperimentStep, \
SpaceSearchStep, EnsembleStep, FinalTrainStep
from hypernets.utils import logging
from hyperts.utils import consts, get_tool_box
from hyperts.toolbox import generate_anomaly_pseudo_ground_truth
from hyperts.utils.plot import plot_plotly, plot_mpl, enable_plotly, enable_mpl
logger = logging.get_logger(__name__)
def _set_log_level(log_level):
logging.set_level(log_level)
[docs]class TSAFDataPreprocessStep(ExperimentStep):
"""Time Series Anomaly Detection or Forecasting Task Data Preprocess Step.
"""
def __init__(self, experiment, name, timestamp_col=None, freq=None,
cv=False, covariate_cols=None, covariate_cleaner=None,
ensemble_size=None, train_data_periods=None,
anomaly_label_col=None, contamination=0.05):
super().__init__(experiment, name)
timestamp_col = [timestamp_col] if isinstance(timestamp_col, str) else timestamp_col
covariate_cols = [covariate_cols] if isinstance(covariate_cols, str) else covariate_cols
self.cv = cv
self.ensemble_size = ensemble_size
self.freq = freq
self.target_cols = None
self.history_prior = None
self.covariate_cols = covariate_cols
self.covariate_cleaner = covariate_cleaner
self.train_data_periods = train_data_periods
self.anomaly_label_col = anomaly_label_col
self.contamination = contamination
self.timestamp_col = timestamp_col if timestamp_col is not None else consts.TIMESTAMP
[docs] def get_params(self, deep=True):
params = super(TSAFDataPreprocessStep, self).get_params()
return params
[docs] def get_fitted_params(self):
freq = self.freq if self.freq is not None else None
params = super().get_fitted_params()
data_shapes = self.data_shapes if self.data_shapes is not None else {}
return {**params, **data_shapes, 'freq': freq}
[docs]class TSRCDataPreprocessStep(ExperimentStep):
"""Time Series Classification or Regression Task Data Preprocess Step.
"""
def __init__(self, experiment, name, cv=False):
super().__init__(experiment, name)
self.cv = cv
[docs] def get_params(self, deep=True):
params = super(TSRCDataPreprocessStep, self).get_params()
return params
[docs] def get_fitted_params(self):
params = super().get_fitted_params()
data_shapes = self.data_shapes_ if self.data_shapes_ is not None else {}
return {**params, **data_shapes}
[docs]class TSSpaceSearchStep(SpaceSearchStep):
"""Time Series Space Searching.
"""
def __init__(self, experiment, name, cv=False, num_folds=3):
super().__init__(experiment, name, cv=cv, num_folds=num_folds)
[docs] def search(self, X_train, y_train, X_test=None, X_eval=None, y_eval=None, **kwargs):
model = copy.deepcopy(self.experiment.hyper_model)
es = self.find_early_stopping_callback(model.callbacks)
if es is not None and es.time_limit is not None and es.time_limit > 0:
es.time_limit = self.estimate_time_limit(es.time_limit)
model.search(X_train, y_train, X_eval, y_eval, cv=self.cv, num_folds=self.num_folds, **kwargs)
return model
[docs]class TSEnsembleStep(EnsembleStep):
"""Time Series Ensemble.
"""
def __init__(self, experiment, name, mode=None, scorer=None, ensemble_size=10, cv=False, retrain_on_wholedata=False):
super().__init__(experiment, name, scorer=scorer, ensemble_size=ensemble_size)
self.mode = mode
self.cv = cv
self.retrain_on_wholedata = retrain_on_wholedata
[docs] def build_estimator(self, hyper_model, X_train, y_train, X_eval=None, y_eval=None, **kwargs):
trials = self.select_trials(hyper_model)
estimators = [hyper_model.load_estimator(trial.model_file) for trial in trials]
ensemble = self.get_ensemble(estimators, X_train, y_train)
if self.task not in consts.TASK_LIST_FORECAST and \
all(['oof' in trial.memo.keys() for trial in trials]):
logger.info('ensemble with oofs')
oofs = self.get_ensemble_predictions(trials, ensemble)
assert oofs is not None
if hasattr(oofs, 'shape'):
tb = get_tool_box(y_train, oofs)
y_, oofs_ = tb.select_valid_oof(y_train, oofs)
ensemble.fit(None, y_, oofs_)
else:
ensemble.fit(None, y_train, oofs)
elif self.task in consts.TASK_LIST_FORECAST and self.cv:
tb = get_tool_box(X_train, y_train)
period = tb.fft_infer_period(y_train.iloc[:, 0])
if isinstance(self.experiment.eval_size, int):
eval_horizon = self.experiment.eval_size
elif int(X_train.shape[0] * consts.DEFAULT_MIN_EVAL_SIZE) <= period:
eval_horizon = period
else:
eval_horizon = consts.DEFAULT_MIN_EVAL_SIZE
X_train, X_eval, y_train, y_eval = \
tb.temporal_train_test_split(X_train, y_train, test_size=eval_horizon)
estimators = self.est_retrain(trials, hyper_model, X_train, y_train, **kwargs)
# self.retrain_on_wholedata = False
ensemble.estimators = list(estimators)
ensemble.classes_ = None
ensemble.fit(X_eval, y_eval)
elif X_eval is not None and y_eval is not None:
ensemble.fit(X_eval, y_eval)
else:
ensemble.fit(X_train, y_train)
if not isinstance(ensemble.weights_, list):
weights = np.sum(ensemble.weights_, axis=1)
else:
weights = ensemble.weights_
if self.retrain_on_wholedata:
estimators = self.est_retrain(trials, hyper_model, X_train, y_train, X_eval, y_eval, weights, **kwargs)
else:
estimators = np.where(np.array(weights) > 0, estimators, None)
ensemble.estimators = list(estimators)
return ensemble
[docs] def get_ensemble(self, estimators, X_train, y_train):
tb = get_tool_box(X_train, y_train)
target_dims = tb.get_shape(tb.DataFrame(y_train))[1]
if self.task in consts.TASK_LIST_FORECAST + consts.TASK_LIST_REGRESSION:
ensemble_task = 'regression'
elif 'binary' in self.task or self.task in consts.TASK_LIST_DETECTION:
ensemble_task = 'binary'
else:
ensemble_task = 'multiclass'
return tb.greedy_ensemble(ensemble_task, estimators, target_dims=target_dims,
scoring=self.scorer, ensemble_size=self.ensemble_size)
[docs] def est_retrain(self, trials, hyper_model, X_train, y_train, X_eval=None, y_eval=None, weights=None, **kwargs):
estimators = []
if X_eval is not None or y_eval is not None:
tb = get_tool_box(X_train, X_eval)
X_all = tb.concat_df([X_train, X_eval], axis=0)
y_all = tb.concat_df([y_train, y_eval], axis=0)
X_all, y_all = tb.reset_index(X_all), tb.reset_index(y_all)
else:
X_all, y_all = X_train, y_train
if self.mode is not consts.Mode_STATS:
kwargs.update({'epochs': kwargs.pop('final_train_epochs', consts.FINAL_TRAINING_EPOCHS)})
logger.info('Retrain the best trial model with all data ...')
weights = [1]*len(trials) if weights is None else weights
for trial, weight in zip(trials, weights):
if weight > 0:
estimator = hyper_model.final_train(trial.space_sample, X_all, y_all, **kwargs)
estimators.append(estimator)
else:
estimators.append(None)
return estimators
[docs]class TSFinalTrainStep(FinalTrainStep):
def __init__(self, experiment, name, mode=None, retrain_on_wholedata=False):
super().__init__(experiment, name)
self.mode = mode
self.retrain_on_wholedata = retrain_on_wholedata
[docs] def build_estimator(self, hyper_model, X_train, y_train, X_test=None, X_eval=None, y_eval=None, **kwargs):
if self.retrain_on_wholedata:
trial = hyper_model.get_best_trial()
if X_eval is not None or y_eval is not None:
tb = get_tool_box(X_train, X_eval)
X_all = tb.concat_df([X_train, X_eval], axis=0)
y_all = tb.concat_df([y_train, y_eval], axis=0)
X_all, y_all = tb.reset_index(X_all), tb.reset_index(y_all)
else:
X_all, y_all = X_train, y_train
if self.mode is not consts.Mode_STATS:
kwargs.update({'epochs': kwargs.pop('final_train_epochs', consts.FINAL_TRAINING_EPOCHS)})
logger.info('Retrain the best trial model with all data ...')
estimator = hyper_model.final_train(trial.space_sample, X_all, y_all, **kwargs)
else:
estimator = hyper_model.load_estimator(hyper_model.get_best_trial().model_file)
return estimator
[docs]class TSPipeline:
"""Pipeline Extension for Time Series Analysis.
Parameters
----------
sk_pipeline: sklearn pipeline, including data_preprocessing, space_searching, final_training and estimator
steps and so on.
freq: 'str', DateOffset or None, default None.
task: 'str' or None, default None.
Task could be 'univariate-forecast', 'multivariate-forecast', and 'univariate-binaryclass', etc.
See consts.py for details.
mode: str, default 'stats'. Optional {'stats', 'dl', 'nas'}, where,
'stats' indicates that all the models selected in the execution experiment are statistical models.
'dl' indicates that all the models selected in the execution experiment are deep learning models.
'nas' indicates that the selected model of the execution experiment will be a deep network model
for neural architecture search, which is not currently supported.
timestamp: str, forecast task 'timestamp' cannot be None, (default=None).
covariables: list[n*str], if the data contains covariables, specify the covariable column names, (default=None).
target: str or list, optional.
Target feature name for training, which must be one of the train_data columns for classification[str],
regression[str] or unvariate forecast task [list]. For multivariate forecast task, it is multiple columns
of training data.
"""
def __init__(self, sk_pipeline, freq, task, mode, timestamp, covariables, target, history=None, **kwargs):
self.freq = freq
self.task = task
self.mode = mode
self.target = target
self.timestamp = timestamp
self.covariables = covariables
self.sk_pipeline = sk_pipeline
if self.task in consts.TASK_LIST_FORECAST + consts.TASK_LIST_DETECTION:
self.prior = sk_pipeline.named_steps.data_preprocessing.history_prior
self.history = history
self.kwargs = kwargs.copy()
[docs] def predict(self, X, forecast_start=None):
"""Predicts target for sequences in X.
Parameters
----------
X: 'DataFrame'.
For forecast task, X.columns = ['timestamp', (covariate_1), (covariate_2),...].
(covariate_1) indicates that it may not exist.
For classification or regression tasks, X.columns = [variate_1, variate_2,...].
forecast_start : 'DataFrame'. This parameter applies only to 'dl' mode.
Forecast the start fragment, if None, by default the last window fragment of the
train data.
forecast_start.columns = ['timestamp', variate_1, variate_2, ..., (covariate_1), (covariate_2),...].
(covariate_1) indicates that it may not exist.
"""
tb = get_tool_box(X)
if self.task in consts.TASK_LIST_FORECAST + consts.TASK_LIST_DETECTION:
if self.mode == consts.Mode_DL:
if forecast_start is not None:
self.history = copy.deepcopy(forecast_start)
X_timestamp_start = tb.to_datetime(tb.df_to_array(X[self.timestamp])[0])
forecast_timestamp_end = tb.to_datetime(tb.df_to_array(forecast_start[self.timestamp])[-1])
if X_timestamp_start <= forecast_timestamp_end:
raise ValueError(f'The start date of X [{X_timestamp_start}] should be after '
f'the end date of forecast_start [{forecast_timestamp_end}].')
if self.task in consts.TASK_LIST_FORECAST:
self.__preprocess_forecast_start(self.history)
if X[self.timestamp].dtypes == object:
X[self.timestamp] = tb.to_datetime(X[self.timestamp])
if X.shape[0] >= 3:
X_transformed = tb.smooth_missed_ts_rows(X, ts_name=self.timestamp)
else:
X_transformed = X.copy()
y_pred = self.sk_pipeline.predict(X_transformed)
if self.task in consts.TASK_LIST_DETECTION:
y_proba = self.sk_pipeline.predict_proba(X_transformed)
forecast = tb.DataFrame(y_pred, columns=[consts.ANOMALY_LABEL])
forecast[consts.ANOMALY_CONFIDENCE] = y_proba[:, 1]
else:
forecast = tb.DataFrame(y_pred, columns=self.target)
forecast = tb.concat_df([X_transformed[[self.timestamp]], forecast], axis=1)
forecast = tb.join_df(X[[self.timestamp]], forecast, on=self.timestamp)
if self.kwargs.get('train_end_date') is not None:
forecast = tb.drop(forecast, columns=[self.timestamp])
return forecast
else:
y_pred = self.sk_pipeline.predict(X)
return y_pred
[docs] def predict_proba(self, X):
"""Predicts target probabilities for sequences in X for classification task.
Parameters
----------
X: 'DataFrame'.
X.columns = [variate_1, variate_2,...].
"""
if self.task in consts.TASK_LIST_CLASSIFICATION + consts.TASK_LIST_DETECTION:
y_proba = self.sk_pipeline.predict_proba(X)
else:
raise ValueError('predict_proba is used for classification only.')
return y_proba
[docs] def evaluate(self, y_true, y_pred, y_proba=None, metrics=None):
"""Evaluates model performance.
Parameters
----------
y_true: 'np.arrray'.
y_pred: 'pd.DataFrame' or 'np.arrray'.
For forecast task, 'pd.DataFrame', X.columns could be ['timestamp', (covariate_1),
(covariate_2),..., variate_1, variate_2,...].
(covariate_1) indicates that it may not exist.
For classification and regression tasks, 'np.arrray'.
y_proba: 'np.arrray' or None, some metrics should be given, such as AUC.
metrics: list, tuple or None. If None,
For forecast or regression tasks, metrics = ['mae', 'mse', 'rmse', 'mape', 'smape'],
For classification task, metrics = ['accuracy', 'f1', 'precision', 'recall'].
"""
import pandas as pd
from hyperts.utils.metrics import calc_score
pd.set_option('display.max_columns', 10,
'display.max_rows', 10,
'display.float_format', lambda x: '%.4f' % x)
y_true = np.array(y_true) if not isinstance(y_true, np.ndarray) else y_true
y_proba = np.array(y_proba) if not isinstance(y_proba, np.ndarray) else y_proba
if self.task in consts.TASK_LIST_FORECAST+consts.TASK_LIST_REGRESSION and metrics is None:
metrics = ['mae', 'mse', 'rmse', 'mape', 'smape']
elif self.task in consts.TASK_LIST_CLASSIFICATION and metrics is None:
metrics = ['accuracy', 'f1', 'precision', 'recall']
elif self.task in consts.TASK_LIST_DETECTION and metrics is None:
metrics = ['f1', 'precision', 'recall', 'roc_auc_score']
tb = get_tool_box(y_pred)
if self.task in consts.TASK_LIST_FORECAST:
y_pred = tb.df_to_array(y_pred[self.target])
if self.task in consts.TASK_LIST_DETECTION:
y_pred = tb.df_to_array(y_pred[[consts.ANOMALY_LABEL]])
if 'binaryclass' in self.task or self.task in consts.TASK_LIST_DETECTION:
task = 'binary'
elif 'multiclass' in self.task:
task = 'multiclass'
else:
task = self.task
scores = calc_score(y_true, y_pred, y_proba,
metrics=metrics,
task=task,
pos_label=self.kwargs.get('pos_label'))
scores = pd.DataFrame.from_dict(scores, orient='index', columns=['Score'])
scores = scores.reset_index().rename(columns={'index': 'Metirc'})
return scores
[docs] def make_future_dataframe(self, periods=None, start_date=None, covariate_df=None):
"""Simulate the trend using the extrapolated generative model.
Notes
----------
If covariate_df exists, the start time of the covariate_df must be the same as that of start_date.
In addition, the covariate_df length is not less than periods.
Parameters
----------
periods: 'int', number of periods to forecast forward.
start_date: 'str' or datetime-like, forecast the future start date.
Returns
-------
pd.Dataframe that extends forward from the end of history for the
requested number of periods.
"""
tb = get_tool_box(self.history)
if self.kwargs.get('train_end_date') is None:
start_date = self.history[self.timestamp].max() if start_date is None else start_date
else:
start_date = self.kwargs.get('train_end_date')
dates = tb.date_range(start=start_date, periods=periods+1, freq=self.freq)
dates = dates[dates > start_date]
dates = dates[:periods]
futures = tb.DataFrame({self.timestamp: dates})
if covariate_df is not None and len(covariate_df) >= len(futures):
covariate_df = tb.reset_index(covariate_df[self.covariables])
covariate_df = tb.select_1d_forward(covariate_df, periods)
futures = tb.concat_df([futures, covariate_df], axis=1)
return futures
[docs] def plot(self,
forecast,
actual=None,
history=None,
var_id=0,
show_forecast_interval=True,
interactive=True,
figsize=None):
"""Plots forecast trend curves for the forecst task.
Notes
----------
1. This function can plot the curve of only one target variable. If not specified,
index 0 is ploted by default.
2. This function supports ploting of historical observations, future actual values,
and forecast intervals.
Parameters
----------
forecast: 'DataFrame'. The columns need to include the timestamp column
and the target columns.
actual: 'DataFrame' or None. If it is not None, the column needs to include
the time column and the target column.
var_id: 'int' or 'str'. If int, it is the index of the target column. If str,
it is the name of the target column. default 0.
show_forecast_interval: 'bool'. Whether to show the forecast intervals.
Default True.
Returns
----------
fig : 'plotly.graph_objects.Figure'.
"""
history = history if history is not None else self.history
if self.task in consts.TASK_LIST_FORECAST:
forecast_interval = self.forecast_interval(forecast)
else:
forecast_interval = None
if self.task in consts.TASK_LIST_DETECTION:
assert actual is not None, 'actual can not be None in anomaly detection task.'
plot_options = {
'task': self.task,
'anomaly_detection_label': self.kwargs.get('anomaly_detection_label'),
}
if interactive and enable_plotly:
plot_plotly(forecast,
timestamp_col=self.timestamp,
target_col=self.target,
actual=actual,
var_id=var_id,
history=history,
forecast_interval=forecast_interval,
show_forecast_interval=show_forecast_interval,
include_history=False if history is None else True,
**plot_options)
elif not interactive and enable_mpl:
plot_mpl(forecast,
timestamp_col=self.timestamp,
target_col=self.target,
actual=actual,
var_id=var_id,
history=history,
forecast_interval=forecast_interval,
show_forecast_interval=show_forecast_interval,
include_history=False if history is None else True,
figsize=figsize,
**plot_options)
else:
raise ValueError('No install matplotlib or plotly.')
[docs] def forecast_interval(self, forecast, confidence_level=0.9):
"""infer forecast interval.
Parameters
----------
forecast: 'DataFrame'. The columns need to include the timestamp column
and the target columns.
confidence_level: float, default 0.9.
"""
tb = get_tool_box(forecast)
return tb.infer_forecast_interval(forecast[self.target], *self.prior, confidence_level=confidence_level)
[docs] def split_X_y(self, data, smooth=False, impute=False):
"""Splits the data into X and y.
Parameters
----------
data: 'DataFrame', including X and y.
smooth: Whether to smooth missed time series rows. Default False.
Example:
TimeStamp y
2021-03-01 3.4
2021-03-02 5.2
2021-03-04 6.7
2021-03-05 2.3
>>
TimeStamp y
2021-03-01 3.4
2021-03-02 5.2
2021-03-03 NaN
2021-03-04 6.7
2021-03-05 2.3
impute: Whether to impute in missing values. Default False.
Example:
TimeStamp y
2021-03-01 3.4
2021-03-02 5.2
2021-03-03 NaN
2021-03-04 6.7
2021-03-05 2.3
>>
TimeStamp y
2021-03-01 3.4
2021-03-02 5.2
2021-03-03 3.4
2021-03-04 6.7
2021-03-05 2.3
Returns
-------
X, y.
"""
if self.task in consts.TASK_LIST_FORECAST + consts.TASK_LIST_DETECTION:
tb = get_tool_box(data)
data = tb.reset_index(data)
is_skip = self.timestamp in tb.columns_values(data)
if self.kwargs.get('train_end_date') is not None and not is_skip:
pseudo_timestamp = tb.DataFrame({f'{self.timestamp}':
tb.date_range(start=self.kwargs.get('train_end_date'),
periods=len(data)+1,
freq=self.kwargs.get('generate_freq'))})
pseudo_timestamp = tb.select_1d_reverse(pseudo_timestamp, len(data))
pseudo_timestamp = tb.reset_index(pseudo_timestamp)
data = tb.concat_df([pseudo_timestamp, data], axis=1)
if self.covariables is not None:
excluded_variables = tb.list_diff(tb.columns_values(data), self.target)
else:
excluded_variables = [self.timestamp]
data = tb.sort_values(data, ts_name=self.timestamp)
data = tb.drop_duplicated_ts_rows(data, ts_name=self.timestamp)
if smooth is not False:
data = tb.smooth_missed_ts_rows(data, ts_name=self.timestamp, freq=self.freq)
if impute is not False:
data[self.target] = tb.multi_period_loop_imputer(data[self.target], freq=self.freq)
if self.task in consts.TASK_LIST_FORECAST:
X, y = data[excluded_variables], data[self.target]
else:
X = data
if self.kwargs.get('anomaly_label_col') is not None:
y = X.pop(self.kwargs.get('anomaly_label_col'))
else:
y = None
else:
tb = get_tool_box(data)
X = tb.reset_index(data)
y = X.pop(self.target)
return X, y
[docs] def get_pipeline_params(self):
"""Gets sklearn pipeline parameters.
"""
return self.sk_pipeline.get_params
def __preprocess_forecast_start(self, forecast_start):
"""Performs data preprocessing for the external forecast_start.
Parameters
----------
forecast_start : 'DataFrame'. This parameter applies only to 'dl' mode.
Forecast the start fragment, if None, by default the last window fragment of the
train data.
forecast_start.columns = ['timestamp', (covariate_1), (covariate_2),...].
(covariate_1) indicates that it may not exist.
"""
def preprocess(X, y, estimator):
X = copy.deepcopy(X)
y = copy.deepcopy(y)
X = estimator.data_pipeline.transform(X)
y = estimator.model.transform(y)
X, y = estimator.model.model.meta.transform(X, y)
forecast_start = tb.concat_df([X, y], axis=1)
window_length = estimator.model.model.window
cont_column_names = estimator.model.model.meta.cont_column_names
cat_column_names = estimator.model.model.meta.cat_column_names
continuous_length = len(cont_column_names)
categorical_length = len(cat_column_names)
column_names = cont_column_names + cat_column_names
data = forecast_start.drop([self.timestamp], axis=1)
data = tb.df_to_array(data[column_names]).astype(consts.DATATYPE_TENSOR_FLOAT)
forecast_start = data[-window_length:].reshape(1, window_length, data.shape[1])
if categorical_length != 0:
X_cont_start = forecast_start[:, :, :continuous_length]
X_cat_start = forecast_start[:, :, continuous_length:]
forecast_start = [X_cont_start, X_cat_start]
return forecast_start
# 1. transform
tb = get_tool_box(forecast_start)
forecast_start = self.sk_pipeline.named_steps.data_preprocessing.transform(forecast_start)
X, y = self.split_X_y(forecast_start, smooth=True, impute=True)
# 2. perprocessing
if hasattr(self.sk_pipeline.named_steps.estimator, 'ensemble_size'):
for estimator in self.sk_pipeline.named_steps.estimator.estimators:
if estimator is not None:
forecast_start = preprocess(X, y, estimator)
estimator.model.model.forecast_start = forecast_start
else:
forecast_start = preprocess(X, y, self.sk_pipeline.named_steps.estimator)
self.sk_pipeline.named_steps.estimator.model.model.forecast_start = forecast_start
[docs] def save(self, model_file):
"""Save TSPipeline.
Parameters
----------
Parameters
----------
model_file: str, the path where the file is saved. For example, "home/xxx/xxx/models".
"""
from hyperts.utils.models import save_model
save_model(model=self, model_file=model_file)
[docs] @staticmethod
def load(model_file):
"""Load TSPipeline.
Parameters
----------
model_file: str, the path where the file is saved. For example,
1. "home/xxx/xxx/models/stats_models"
2. "home/xxx/xxx/models/dl_models"
3. "home/xxx/xxx/models/nas_models"
"""
from hyperts.utils.models import load_model
load_model(model_file)
[docs]class TSCompeteExperiment(SteppedExperiment):
"""A powerful experiment strategy for Automatic Time Series with a set of advanced features.
Parameters
----------
hyper_model: hypernets.model.HyperModel
A `HyperModel` instance
X_train: Pandas or Dask DataFrame
Feature data for training
y_train: Pandas or Dask Series
Target values for training
X_eval: (Pandas or Dask DataFrame) or None
(default=None), Feature data for evaluation
y_eval: (Pandas or Dask Series) or None, default None.
Target values for evaluation
X_test: (Pandas or Dask Series) or None, default None.
Unseen data without target values for semi-supervised learning
eval_size: 'float' or 'int', default None.
Only valid when ``X_eval`` or ``y_eval`` is None. If float, should be between 0.0 and 1.0 and represent
the proportion of the dataset to include in the eval split. If int, represents the absolute number of
test samples. If None, the value is set to the complement of the train size.
freq: 'str', DateOffset or None, default None.
target_col: 'str' or list[str], default None.
timestamp_col: str or None, default None.
covariate_cols: list[list or None, list or None] or None, default None. covariate_cols needs to contain
a list of original covariates and a list of cleaned covariates.
covariate_data_cleaner_args: 'dict' or None, default None. Suitable for forecast task.
Dictionary of parameters to initialize the `DataCleaner` instance. If None, `DataCleaner` will initialized
with default values.
data_cleaner_args: 'dict' or None, default None. Suitable for classification/regression tasks.
Dictionary of parameters to initialize the `DataCleaner` instance. If None, `DataCleaner` will initialized
with default values.
cv: 'bool', default False.
If True, use cross-validation instead of evaluation set reward to guide the search process.
num_folds: 'int', default 3.
Number of cross-validated folds, only valid when cv is true.
task: 'str' or None, default None.
Task could be 'univariate-forecast', 'multivariate-forecast', and 'univariate-binaryclass', etc.
See consts.py for details.
mode : str, default 'stats'. Optional {'stats', 'dl', 'nas'}, where,
'stats' indicates that all the models selected in the execution experiment are statistical models.
'dl' indicates that all the models selected in the execution experiment are deep learning models.
'nas' indicates that the selected model of the execution experiment will be a deep network model
for neural architecture search, which is not currently supported.
id: trial id, default None.
callbacks: list of callback functions or None, default None.
List of callback functions that are applied at each experiment step. See `hypernets.experiment.ExperimentCallback`
for more information.
log_level: 'int', 'str', or None, default None,
Level of logging, possible values:
-logging.CRITICAL
-logging.FATAL
-logging.ERROR
-logging.WARNING
-logging.WARN
-logging.INFO
-logging.DEBUG
-logging.NOTSET
random_state: 'int' or RandomState instance, default None.
Controls the shuffling applied to the data before applying the split.
scorer: 'str', callable or None, default None.
Scorer to used for feature importance evaluation and ensemble. It can be a single string
(see [get_scorer](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.get_scorer.html))
or a callable (see [make_scorer](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.make_scorer.html)).
Will be inferred from *hyper_model.reward_metric* if it's None.
ensemble_size: 'int', default 10.
The number of estimator to ensemble. During the AutoML process, a lot of models will be generated with different
preprocessing pipelines, different models, and different hyperparameters. Usually selecting some of the models
that perform well to ensemble can obtain better generalization ability than just selecting the single best model.
final_retrain_on_wholedata: bool, after the search, whether to retrain the optimal model on the whole data set.
default True.
"""
def __init__(self, hyper_model, X_train, y_train, X_eval=None, y_eval=None, X_test=None,
eval_size=consts.DEFAULT_EVAL_SIZE,
freq=None,
target_col=None,
timestamp_col=None,
covariate_cols=None,
anomaly_label_col=None,
covariate_cleaner=None,
cv=False,
num_folds=3,
task=None,
mode='stats',
id=None,
forecast_train_data_periods=None,
hist_store_upper_limit=consts.HISTORY_UPPER_LIMIT,
contamination=consts.CONTAMINATION,
callbacks=None,
log_level=None,
random_state=None,
scorer=None,
optimize_direction=None,
ensemble_size=10,
final_retrain_on_wholedata=True,
**kwargs):
if random_state is None:
random_state = np.random.randint(0, 65535)
set_random_state(random_state)
if task is None:
task = hyper_model.task
self.cv = cv
self.freq = freq
self.task = task
self.mode = mode
self.target = target_col
self.timestamp = timestamp_col
self.max_window_length = hist_store_upper_limit
self.train_end_date = kwargs.pop('train_end_date', None)
self.generate_freq = kwargs.pop('generate_freq', None)
self.pos_label = kwargs.get('pos_label', None)
if self.task in consts.TASK_LIST_FORECAST + consts.TASK_LIST_DETECTION:
tb = get_tool_box(X_train, y_train)
all_data = tb.concat_df([X_train, y_train], axis=1)
if X_eval is not None or y_eval is not None:
eval_data = tb.concat_df([X_eval, y_eval], axis=1)
all_data = tb.concat_df([all_data, eval_data], axis=0)
self.history = tb.select_1d_reverse(all_data, hist_store_upper_limit)
else:
self.history = None
if covariate_cols is not None and len(covariate_cols) == 2:
self.covariables = covariate_cols[0]
cleaned_covariables = covariate_cols[1]
elif covariate_cols is not None and len(covariate_cols) != 2:
raise ValueError('covariate_cols needs to contain a list of original '
'covariates and a list of cleaned covariates.')
else:
self.covariables = None
cleaned_covariables = None
self.anomaly_label_col = anomaly_label_col
steps = []
# data clean and preprocessing
if task in consts.TASK_LIST_FORECAST + consts.TASK_LIST_DETECTION:
steps.append(TSAFDataPreprocessStep(self, consts.StepName_DATA_PREPROCESSING,
freq=freq,
cv=cv,
ensemble_size=ensemble_size,
timestamp_col=timestamp_col,
covariate_cols=cleaned_covariables,
covariate_cleaner=covariate_cleaner,
anomaly_label_col=anomaly_label_col,
contamination=contamination,
train_data_periods=forecast_train_data_periods))
else:
steps.append(TSRCDataPreprocessStep(self, consts.StepName_DATA_PREPROCESSING,
cv=cv))
# model selection and hyper-parameters search step
steps.append(TSSpaceSearchStep(self, consts.StepName_SPACE_SEARCHING,
cv=cv,
num_folds=num_folds))
if ensemble_size is not None and ensemble_size > 1:
# model ensemble step
tb = get_tool_box(X_train, y_train)
if scorer is None:
kwargs['pos_label'] = tb.infer_pos_label(y_train, task, kwargs.get('pos_label'))
scorer = tb.metrics.metric_to_scorer(hyper_model.reward_metric, task=task,
pos_label=kwargs.get('pos_label'), optimize_direction=optimize_direction)
steps.append(TSEnsembleStep(self, consts.StepName_FINAL_ENSEMBLE,
mode=mode,
scorer=scorer,
ensemble_size=ensemble_size,
cv=cv,
retrain_on_wholedata=final_retrain_on_wholedata))
else:
# final train step
steps.append(TSFinalTrainStep(self, consts.StepName_FINAL_TRAINING,
mode=mode,
retrain_on_wholedata=final_retrain_on_wholedata))
# ignore warnings
import warnings
warnings.filterwarnings('ignore')
if log_level is not None:
_set_log_level(log_level)
self.run_kwargs = kwargs
super(TSCompeteExperiment, self).__init__(steps,
hyper_model, X_train, y_train, X_eval=X_eval, y_eval=y_eval,
X_test=X_test, eval_size=eval_size, task=task,
id=id,
callbacks=callbacks,
random_state=random_state)
[docs] def run(self, **kwargs):
"""Related parameters that can be reset, mainly involving pipeline stap and model parameters.
Including but not limited to the following:
Parameters
----------
max_trials: int, maximum number of tests (model search), optional, (default=3).
final_train_epochs: int, the searched best DL model performs the final training epochs, optional, (default=120).
epochs: int, DL model training epochs in each trail, optional, (default=120).
batch_size: int, number of samples per gradient update, optional, (default is self-adaption).
optimizer: str, the optimizer of DL model, optional {'adam', 'sgd', 'rmsprop'}, (default='adam').
learning_rate: float, the optimizer's learning rate, optional, (default=0.001).
loss: str, the loss function of DL model, optional, (default is searching).
drop_rate: float, The rate of Dropout for DL model, optional, (default is searching).
dl_forecast_window : int, the sequence length of each sample in DL model, i.e., timestamp of
[batch_size, timestamp, dim], optional, (default is searching).
dl_forecast_length: int, the forecast length for the future in DL model, optional, (default is searching).
seasonality_mode: str, 'additive' (default) or 'multiplicative' in Prophet.
period_offset: int, s of seasonal_order(P,D,Q,s) in ARIMA, optional, (default is searching).
maxlags: int, maximum number of lags to check for order selection in VAR, optional, (default is searching).
...
Notes
----------
Once the above parameters are specified, the search space cannot be searched.
"""
kwargs = self._reset_run_kwargs(**kwargs)
run_kwargs = {**self.run_kwargs, **kwargs}
return super().run(**run_kwargs)
[docs] def to_estimator(self, X_train, y_train, X_test, X_eval, y_eval, steps):
sk_pipeline = super(TSCompeteExperiment, self).to_estimator(
X_train, y_train, X_test, X_eval, y_eval, steps)
return TSPipeline(sk_pipeline,
freq=self.freq,
task=self.task,
mode=self.mode,
timestamp=self.timestamp,
covariables=self.covariables,
target=self.target,
history=self.history,
train_end_date=self.train_end_date,
generate_freq=self.generate_freq,
pos_label=self.pos_label,
anomaly_label_col=self.anomaly_label_col)
[docs] def report_best_trial_params(self):
"""Gets experiment best trial parameters.
"""
if self.hyper_model_ is not None:
best_trial = self.hyper_model_.get_best_trial()
best_trial_params = best_trial.to_df(include_params=True).T
best_trial_params.rename(columns={0: 'value'}, inplace=True)
return best_trial_params
else:
raise RuntimeError('The best trial parameters can be obtained only after running.')
def _repr_html_(self):
return self.__repr__()
def _reset_run_kwargs(self, **kwargs):
if kwargs.get('dl_forecast_window') is not None:
if kwargs.get('dl_forecast_window') <= self.max_window_length:
kwargs['window'] = kwargs.pop('dl_forecast_window')
else:
raise ValueError(f'dl_window_length cannot be greater than {self.max_window_length}.')
if kwargs.get('dl_forecast_length', 0) > 0:
kwargs['forecast_length'] = kwargs.pop('dl_forecast_length')
if self.cv and kwargs.get('epochs') is None:
kwargs['epochs'] = consts.TRAINING_EPOCHS // 2
return kwargs