Source code for hyperts.utils.tstoolbox
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split as sklearn_tts
from hypernets.tabular.toolbox import ToolBox
from hyperts.utils import tscvsplit, ensemble
from hyperts.utils import consts, metrics as metrics_
from hyperts.utils.holidays import get_holidays
[docs]class TSToolBox(ToolBox):
[docs] @staticmethod
def DataFrame(data=None, index = None, columns = None, dtype = None, copy = False):
"""Two-dimensional, size-mutable, potentially heterogeneous tabular data.
Parameters
----------
data : ndarray (structured or homogeneous), Iterable, dict, or DataFrame
Dict can contain Series, arrays, constants, or list-like objects.
.. versionchanged:: 0.23.0
If data is a dict, column order follows insertion-order for
Python 3.6 and later.
.. versionchanged:: 0.25.0
If data is a list of dicts, column order follows insertion-order
for Python 3.6 and later.
index : Index or array-like
Index to use for resulting frame. Will default to RangeIndex if
no indexing information part of input data and no index provided.
columns : Index or array-like
Column labels to use for resulting frame. Will default to
RangeIndex (0, 1, 2, ..., n) if no column labels are provided.
dtype : dtype, default None
Data type to force. Only a single dtype is allowed. If None, infer.
copy : bool, default False
Copy data from inputs. Only affects DataFrame / 2d ndarray input.
"""
return pd.DataFrame(data=data, index=index, columns=columns, dtype=dtype, copy=copy)
[docs] @staticmethod
def join_df(df1: pd.DataFrame, df2: pd.DataFrame, on: None):
"""Join columns of another DataFrame.
Parameters
----------
on : str, list of str, or array-like, optional
Column or index level name(s) in the caller to join on the index
in `other`, otherwise joins index-on-index. If multiple
values given, the `other` DataFrame must have a MultiIndex. Can
pass an array as the join key if it is not already contained in
the calling DataFrame. Like an Excel VLOOKUP operation.
Returns
-------
DataFrame
A dataframe containing columns from both the caller and `other`.
"""
return df1.join(df2.set_index(on), on=on)
[docs] @staticmethod
def to_datetime(df: pd.DataFrame, **kwargs):
"""Convert argument to datetime.
"""
return pd.to_datetime(df, **kwargs)
[docs] @staticmethod
def date_range(start=None, end=None, periods=None, freq=None, **kwargs):
"""Return a fixed frequency DatetimeIndex.
Parameters
----------
start : str or datetime-like, optional
Left bound for generating dates.
end : str or datetime-like, optional
Right bound for generating dates.
periods : int, optional
Number of periods to generate.
freq : str or DateOffset, default 'D'
Frequency strings can have multiples, e.g. '5H'. See
:ref:`here <timeseries.offset_aliases>` for a list of
frequency aliases.
"""
return pd.date_range(start=start, end=end, periods=periods, freq=freq, **kwargs)
[docs] @staticmethod
def datetime_format(df: pd.DataFrame, format='%Y-%m-%d %H:%M:%S'):
"""Convert datetime format.
"""
if format != None:
return pd.to_datetime(df.astype('str')).dt.strftime(format)
else:
return pd.to_datetime(df.astype('str'))
[docs] @staticmethod
def select_1d_forward(arr, indices):
"""
Select by indices from the first axis(0) with forward.
"""
if hasattr(arr, 'iloc'):
return arr.iloc[:indices]
else:
return arr[:indices]
[docs] @staticmethod
def select_1d_reverse(arr, indices):
"""
Select by indices from the first axis(0) with reverse.
"""
if hasattr(arr, 'iloc'):
return arr.iloc[-indices:]
else:
return arr[-indices:]
[docs] @staticmethod
def columns_values(df: pd.DataFrame):
"""
Get column values.
"""
return df.columns.values
[docs] @staticmethod
def sort_values(df: pd.DataFrame, ts_name: str = consts.TIMESTAMP):
"""
Sort in time order.
"""
return df.sort_values(by=[ts_name])
[docs] @staticmethod
def drop(df: pd.DataFrame, labels=None, index=None, columns=None, axis: int = 0, inplace: bool = False):
"""
Drop specified labels from rows or columns.
"""
return df.drop(labels=labels, axis=axis, index=index, columns=columns, inplace=inplace)
[docs] @staticmethod
def pop(df: pd.DataFrame, item):
"""
Return item and drop from frame. Raise KeyError if not found.
"""
assert item is not None
return df.pop(item)
[docs] @staticmethod
def columns_tolist(df: pd.DataFrame):
"""
Return a list of the DataFrame columns.
"""
return df.columns.tolist()
[docs] @staticmethod
def arange(*args):
"""
Return evenly spaced values within a given interval.
"""
return np.arange(*args)
[docs] @staticmethod
def infer_ts_freq(df: pd.DataFrame, ts_name: str = consts.TIMESTAMP):
""" Infer the frequency of the time series.
Parameters
----------
ts_name: 'str', time column name.
"""
return _infer_ts_freq(df, ts_name)
[docs] @staticmethod
def multi_period_loop_imputer(df: pd.DataFrame, freq: str, offsets: list = None, max_loops: int = 10):
"""Multiple period loop impute NAN.
Parameters
----------
freq: str
'S' - second
'T' - minute
'H' - hour
'D' - day
'M' - month
'Y','A', A-DEC' - year
offsets: list, offset lag.
max_loops: 'int', maximum number of loop imputed.
"""
if not isinstance(freq, str):
return df
if freq is consts.DISCRETE_FORECAST:
offsets = [-1, 1]
elif offsets is None and freq in 'W' or 'W-' in freq or 'WOM-' in freq:
offsets = [-1, -2, -3, -4, 1, 2, 3, 4]
elif offsets is None and freq in ['M', 'MS', 'BM', 'CBM', 'CBMS']:
offsets = [-1, -2, -3, -4, 1, 2, 3, 4]
elif offsets is None and freq in ['SM', '15D', 'SMS']:
offsets = [-1, -2, -4, -6, -8, 1, 2, 4, 6, 8]
elif offsets is None and 'Q' in freq or 'Q-' in freq or 'BQ' in freq or 'BQ-' in freq or 'QS-' in freq or 'BQS-' in freq:
offsets = [-1, -4, -8, -12, 1, 4, 8, 12]
elif offsets is None and freq in ['A', 'Y'] or 'A-' in freq or 'BA-' in freq or 'AS-' in freq or 'BAS-' in freq:
offsets = [-1, -2, -3, -4, 1, 2, 3, 4]
elif offsets is None and 'S' in freq or 'T' in freq or 'min' in freq:
offsets = [-60*4, -60*3, -60*2, -60*1, -1, 1, 60*1, 60*2, 60*3, 60*4]
elif offsets is None and 'H' in freq:
offsets = [-24*4, -24*3, -24*2, -24*1, -1, 1, 24*1, 24*2, 24*3, 24*4,
-168*4, -168*3, -168*2, -168*1, 168*1, 168*2, 168*3, 168*4]
elif offsets is None and 'BH' in freq or '8H' in freq:
offsets = [-8*4, -8*3, -8*2, -8*1, -1, 1, 8*1, 8*2, 8*3, 8*4,
-40*4, -40*3, -40*2, -40*1, 40*1, 40*2, 40*3, 40*4]
elif offsets is None and 'D' in freq:
offsets = [-1, -7, -7*2, 7*3, -7*4, 1, 7, 7*2, 7*3, 7*4]
elif offsets is None and freq in ['C', 'B']:
offsets = [-1, -5, -5*2, 5*3, -5*4, 1, 5, 5*2, 5*3, 5*4]
elif offsets is None and 'L' in freq or 'U' in freq or 'N' in freq or 'ms' in freq:
offsets = [-1, -50, -100, -200, -1000, 1, 50, 100, 200, 1000]
elif offsets == None:
offsets = [-1, 1]
if freq != consts.DISCRETE_FORECAST:
offsets = _expand_list(freq=freq, pre_list=offsets)
values = df.values.copy()
loop, missing_rate = 0, 1
while loop < max_loops and missing_rate > 0:
values, missing_rate = _impute(values, offsets)
loop += 1
values[np.where(np.isnan(values))] = np.nanmean(values)
fill_df = pd.DataFrame(values, columns=df.columns)
return fill_df
[docs] @staticmethod
def forward_period_imputer(df: pd.DataFrame, offset: int):
""" Forward period imputer.
Parameters
----------
offsets: 'int', offset lag.
"""
fill_df = df.fillna(df.rolling(window=offset, min_periods=1).agg(lambda x: x.iloc[0]))
return fill_df
[docs] @staticmethod
def simple_numerical_imputer(df: pd.DataFrame, mode='mean'):
"""Fill NaN with mean, mode, 0."""
if mode == 'mean':
df = df.fillna(df.mean().fillna(0).to_dict())
elif mode == 'mode':
df = df.fillna(df.mode().fillna(0).to_dict())
else:
df = df.fillna(0)
return df
[docs] @staticmethod
def drop_duplicated_ts_rows(df: pd.DataFrame, ts_name: str = consts.TIMESTAMP, keep_data: str = 'last'):
"""Returns without duplicate time series, the last be keeped by default.
Example:
TimeStamp y
2021-03-01 3.4
2021-03-02 5.2
2021-03-03 9.3
2021-03-03 9.5
2021-03-04 6.7
2021-03-05 2.3
>>
TimeStamp y
2021-03-01 3.4
2021-03-02 5.2
2021-03-03 9.5
2021-03-04 6.7
2021-03-05 2.3
"""
assert isinstance(df, pd.DataFrame)
drop_df = df.drop_duplicates(subset=[ts_name], keep=keep_data)
drop_df.reset_index(drop=True, inplace=True)
return drop_df
[docs] @staticmethod
def smooth_missed_ts_rows(df: pd.DataFrame, freq: str = None, ts_name: str = consts.TIMESTAMP):
"""Returns full time series.
Example:
TimeStamp y
2021-03-01 3.4
2021-03-02 5.2
2021-03-04 6.7
2021-03-05 2.3
>>
TimeStamp y
2021-03-01 3.4
2021-03-02 5.2
2021-03-03 NaN
2021-03-04 6.7
2021-03-05 2.3
"""
assert isinstance(df, pd.DataFrame)
if freq == None:
freq = _infer_ts_freq(df, ts_name)
if df[ts_name].dtypes == object:
df[ts_name] = pd.to_datetime(df[ts_name])
df = df.sort_values(by=ts_name)
if freq is not None and freq is not consts.DISCRETE_FORECAST:
start, end = df[ts_name].iloc[0], df[ts_name].iloc[-1]
full_ts = pd.DataFrame(pd.date_range(start=start, end=end, freq=freq), columns=[ts_name])
if full_ts[ts_name].iloc[-1] == df[ts_name].iloc[-1]:
df = full_ts.join(df.set_index(ts_name), on=ts_name)
return df
[docs] @staticmethod
def clip_to_outliers(df, std_threshold: int = 3):
"""Replace outliers above threshold with that threshold.
Parameters
----------
std_threshold: 'float', the number of standard deviations away from mean to count as outlier.
"""
if not isinstance(df, pd.DataFrame):
df = pd.DataFrame(df)
df_std = df.std(axis=0, skipna=True)
df_mean = df.mean(axis=0, skipna=True)
lower = df_mean - (df_std * std_threshold)
upper = df_mean + (df_std * std_threshold)
df_outlier = df.clip(lower=lower, upper=upper, axis=1)
return df_outlier
[docs] @staticmethod
def nan_to_outliers(df, std_threshold: int = 3):
"""Replace outliers above threshold with that threshold.
Parameters
----------
std_threshold: 'float', the number of standard deviations away from mean to count as outlier.
"""
if not isinstance(df, pd.DataFrame):
df = pd.DataFrame(df)
df_outlier = df.copy()
df_std = df.std(axis=0, skipna=True)
df_mean = df.mean(axis=0, skipna=True)
outlier_indices = np.abs(df - df_mean) > df_std * std_threshold
df_outlier = df_outlier.mask(outlier_indices, other=np.nan)
return df_outlier
[docs] @staticmethod
def infer_window_size(max_size: int, freq: str):
"""Infer window of neural net.
Parameters
----------
max_size: int, maximum time window allowed.
freq: str or DateOffset.
"""
if freq in 'W' or 'W-' in freq or 'WOM-' in freq:
window = list(filter(lambda x: x<=max_size, [7, 7*2, 7*3, 7*4, 52]))
elif freq in ['SM', 'M', 'MS', 'SMS', 'BM', 'CBM', 'CBMS', '15D']:
window = list(filter(lambda x: x <= max_size, [6, 12, 24, 36, 48]))
elif 'Q' in freq or 'Q-' in freq or 'BQ' in freq or 'BQ-' in freq or 'QS-' in freq or 'BQS-' in freq:
window = list(filter(lambda x: x <= max_size, [4, 8, 12, 16, 16*2, 16*3]))
elif freq in ['A', 'Y'] or 'A-' in freq or 'BA-' in freq or 'AS-' in freq or 'BAS-' in freq:
window = list(filter(lambda x: x<=max_size, [3, 6, 12, 24]))
elif 'S' in freq or 'T' in freq or 'min' in freq:
window = list(filter(lambda x: x<=max_size, [10, 30, 60, 60*2, 60*3]))
elif 'H' in freq:
window = list(filter(lambda x: x<=max_size, [24, 48, 48*2, 24*7]))
elif 'BH' in freq or '8H' in freq:
window = list(filter(lambda x: x<=max_size, [8, 16, 24, 24*2, 24*7]))
elif 'D' in freq:
window = list(filter(lambda x: x<=max_size, [7, 14, 21, 21*2, 21*3]))
elif freq in ['C', 'B']:
window = list(filter(lambda x: x<=max_size, [10, 15, 20, 20*2, 20*3]))
elif 'L' in freq or 'U' in freq or 'N' in freq or 'ms' in freq:
window = list(filter(lambda x: x <= max_size, [50, 100, 200, 500, 1000]))
else:
window = list(filter(lambda x: x <= max_size, [5, 7, 12, 24, 24*2, 24*3, 24*7]))
final_win_list = _expand_list(freq=freq, pre_list=window)
while 0 in final_win_list:
final_win_list.remove(0)
if len(final_win_list) != 0:
return final_win_list
else:
raise RuntimeError('Unable to infer the sliding window size of dl, please specify dl_forecast_window.')
[docs] @staticmethod
def fft_infer_period(data):
"""Fourier inference period.
References
----------
https://github.com/xuawai/AutoPeriod/blob/master/auto_period.ipynb
"""
try:
if isinstance(data, pd.DataFrame):
data = data.values.reshape(-1,)
ft = np.fft.rfft(data)
freqs = np.fft.rfftfreq(len(data), 1)
mags = abs(ft)
inflection = np.diff(np.sign(np.diff(mags)))
peaks = (inflection < 0).nonzero()[0] + 1
peak = peaks[mags[peaks].argmax()]
signal_freq = freqs[peak]
period = int(1 / signal_freq)
except:
period = 2
return period
[docs] @staticmethod
def generate_time_covariates(start_date, periods, freq='H'):
"""Generate covariates about time.
Parameters
----------
start_date: 'str' or datetime-like.
Left bound for generating dates.
periods: 'int'.
Number of periods to generate.
freq: str or DateOffset, default 'H'.
"""
dstime = pd.date_range(start_date, periods=periods, freq=freq)
fds = pd.DataFrame(dstime, columns={'TimeStamp'})
fds['Hour'] = fds['TimeStamp'].dt.hour
fds['WeekDay'] = fds['TimeStamp'].dt.weekday
period_dict = {
23: 0, 0: 0, 1: 0,
2: 1, 3: 1, 4: 1,
5: 2, 6: 2, 7: 2,
8: 3, 9: 3, 10: 3, 11: 3,
12: 4, 13: 4,
14: 5, 15: 5, 16: 5, 17: 5,
18: 6,
19: 7, 20: 7, 21: 7, 22: 7,
}
fds['TimeSegmnet'] = fds['Hour'].map(period_dict)
fds['MonthStart'] = fds['TimeStamp'].apply(lambda x: x.is_month_start * 1)
fds['MonthEnd'] = fds['TimeStamp'].apply(lambda x: x.is_month_end * 1)
fds['SeasonStart'] = fds['TimeStamp'].apply(lambda x: x.is_quarter_start * 1)
fds['SeasonEnd'] = fds['TimeStamp'].apply(lambda x: x.is_quarter_end * 1)
fds['Weekend'] = fds['TimeStamp'].apply(lambda x: 1 if x.dayofweek in [5, 6] else 0)
# public_holiday_list = get_holidays(year=int(start_date[:4]))
# public_holiday_list = public_holiday_list['Date'].to_list()
fds['Date'] = fds['TimeStamp'].apply(lambda x: x.strftime('%Y%m%d'))
# fds['Holiday'] = fds['Date'].apply(lambda x: 1 if x in public_holiday_list else 0)
fds.drop(['Date'], axis=1, inplace=True)
return fds
[docs] @staticmethod
def df_mean_std(data: pd.DataFrame):
"""Get the mean and standard deviation of the data.
"""
mean = data.mean()
std = data.std()
return mean, std
[docs] @staticmethod
def infer_forecast_interval(forecast, prior_mu, prior_sigma, n: int = 5, confidence_level: float = 0.9):
"""A corruption of Bayes theorem.
It will be sensitive to the transformations of the data.
"""
from scipy.stats import norm
p_int = 1 - ((1 - confidence_level) / 2)
adj = norm.ppf(p_int)
upper_forecast, lower_forecast = pd.DataFrame(), pd.DataFrame()
for index, row in forecast.iterrows():
data_mu = row
post_mu = ((prior_mu / prior_sigma ** 2) + ((n * data_mu) / prior_sigma ** 2)
) / ((1 / prior_sigma ** 2) + (n / prior_sigma ** 2))
lower = pd.DataFrame(post_mu - adj * prior_sigma).transpose()
lower = lower.where(lower <= data_mu, data_mu, axis=1)
upper = pd.DataFrame(post_mu + adj * prior_sigma).transpose()
upper = upper.where(upper >= data_mu, data_mu, axis=1)
lower_forecast = pd.concat([lower_forecast, lower], axis=0)
upper_forecast = pd.concat([upper_forecast, upper], axis=0)
lower_forecast.index = forecast.index
upper_forecast.index = forecast.index
return upper_forecast, lower_forecast
[docs] @staticmethod
def from_3d_array_to_nested_df(data: np.ndarray,
columns: str = None,
cells_as_array: bool = False):
"""Convert Numpy ndarray with shape (nb_samples, series_length, nb_variables)
into nested pandas DataFrame (with time series as numpy array or pandas Series in cells)
Parameters
----------
data : np.ndarray
3-dimensional Numpy array to convert to nested pandas DataFrame format
columns: list-like, default = None
Optional list of names to use for naming nested DataFrame's columns
cells_as_array : bool, default = False
If True, then nested cells contain Numpy array
If False, then nested cells contain pandas Series
Returns
----------
df : pd.DataFrame
References
----------
sktime_data_processing: https://github.com/Riyabelle25/sktime/blob/main/sktime/utils/data_processing.py
"""
df = pd.DataFrame()
nb_samples, series_length, nb_variables = data.shape
cell = np.array if cells_as_array else pd.Series
if columns is None:
columns = [f'Var_{i}' for i in range(nb_variables)]
else:
if len(columns) != nb_variables:
raise ValueError(f'The number of column names supplied [{len(columns)}] \
does not match the number of data variables [{nb_variables}].')
for i, columns_name in enumerate(columns):
df[columns_name] = [cell(data[j, :, i]) for j in range(nb_samples)]
return df
[docs] @staticmethod
def from_nested_df_to_3d_array(data: pd.DataFrame):
"""Convert nested pandas DataFrame (with time series as numpy array or pandas Series in cells)
into Numpy ndarray with shape (nb_samples, series_length, nb_variables).
Parameters
----------
data : pd.DataFrame
Nested pandas DataFrame
Returns
-------
data_3d : np.arrray
3-dimensional NumPy array
References
----------from_nested_to_3d_numpy
sktime_data_processing: https://github.com/Riyabelle25/sktime/blob/main/sktime/utils/data_processing.py
"""
nested_col_mask = [*data.applymap(lambda cell: isinstance(cell, (np.ndarray, pd.Series))).any().values]
if nested_col_mask.count(True) == len(nested_col_mask):
res = np.stack(data.applymap(lambda cell: cell.to_numpy() if isinstance(cell, pd.Series) else cell)
.apply(lambda row: np.stack(row), axis=1)
.to_numpy())
else:
raise ValueError
return res.transpose(0, 2, 1)
[docs] @staticmethod
def is_nested_dataframe(data: pd.DataFrame):
"""Determines whether data is a nested Dataframe.
Returns
-------
bool : True or False.
"""
is_dataframe = isinstance(data, pd.DataFrame)
is_nested = isinstance(data.iloc[0, 0], (np.ndarray, pd.Series))
return is_dataframe and is_nested
[docs] @staticmethod
def random_train_test_split(*arrays,
test_size=None,
train_size=None,
random_state=None,
shuffle=True,
stratify=None):
"""Split arrays or matrices into random train and test subsets. This
is a wrapper of scikit-learn's ``train_test_split`` that has shuffle.
"""
results = sklearn_tts(*arrays,
test_size=test_size,
train_size=train_size,
random_state=random_state,
shuffle=shuffle,
stratify=stratify)
return results
[docs] @staticmethod
def temporal_train_test_split(*arrays,
test_size=None,
train_size=None,
test_horizon=None):
"""Split arrays or matrices into sequential train and test subsets.This
is a wrapper of scikit-learn's ``train_test_split`` that does not shuffle.
Parameters
----------
*arrays : sequence of indexables with same length / shape[0] Allowed inputs
are lists, numpy arrays, scipy-sparse matrices or pandas dataframes.
test_size : float, int or None, optional (default=None)
If float, should be between 0.0 and 1.0 and represent the proportion
of the dataset to include in the test split. If int, represents the
absolute number of test samples. If None, the value is set to the
complement of the train size. If ``train_size`` is also None, it will
be set to 0.25.
train_size : float, int, or None, (default=None)
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the train split. If
int, represents the absolute number of train samples. If None,
the value is automatically set to the complement of the test size.
test_horizon: int or None, (default=None)
If int, represents the forecast horizon length.
Returns
-------
splitting : list, length=2 * len(arrays)
List containing train-test split of inputs.
"""
test_size = test_horizon if test_horizon != None else test_size
if test_horizon != None and test_horizon > arrays[0].shape[0]:
raise ValueError(f'{test_horizon} is greater than data shape {arrays[0].shape[0]}.')
results = sklearn_tts(
*arrays,
test_size=test_size,
train_size=train_size,
shuffle=False,
stratify=None)
return [pd.DataFrame(item) if isinstance(item, pd.Series) else item for item in results]
[docs] @staticmethod
def list_diff(p: list, q: list):
"""Gets the difference set of two lists.
Parameters
----------
p: list.
q: list.
Returns
A list.
-------
Example
p = [1, 2, 3, 4, 5], q = [2, 4]
>> list_diff(p, q)
>> [1, 3, 5]
p = [1, 2, 3, 4, 5], q = []
>> list_diff(p, q)
>> [1, 2, 3, 4, 5]
"""
if q is not None and len(q) > 0:
# return list(set(p).difference(set(q)))
return list(filter(lambda x: x not in q, p))
else:
return p
[docs] @staticmethod
def infer_pos_label(y_true, task, label_name=None, pos_label=None):
if task in consts.TASK_LIST_DETECTION:
if label_name is not None:
label_name = label_name if isinstance(label_name, list) else [label_name]
y_true = y_true[label_name]
else:
pos_label = 1
return pos_label
y_true = np.array(y_true) if not isinstance(y_true, np.ndarray) else y_true
if task in consts.TASK_LIST_CLASSIFICATION + consts.TASK_LIST_DETECTION and pos_label is None:
if 1 in y_true:
pos_label = 1
elif 'yes' in y_true:
pos_label = 'yes'
elif 'true' in y_true:
pos_label = 'true'
else:
pos_label = _infer_pos_label(y_true)
elif task in consts.TASK_LIST_CLASSIFICATION + consts.TASK_LIST_DETECTION and pos_label is not None:
if pos_label in y_true:
pos_label = pos_label
else:
pos_label = _infer_pos_label(y_true)
else:
pos_label = None
return pos_label
metrics = metrics_.Metrics
_preqfold_cls = tscvsplit.PrequentialSplit
_greedy_ensemble_cls = ensemble.TSGreedyEnsemble
[docs] @classmethod
def preqfold(cls, strategy='preq-bls', base_size=None, n_splits=5, stride=1, *, max_train_size=None,
test_size=None, gap_size=0):
return cls._preqfold_cls(strategy=strategy, base_size=base_size, n_splits=n_splits, stride=stride,
max_train_size=max_train_size, test_size=test_size, gap_size=gap_size)
[docs] @classmethod
def greedy_ensemble(cls, task, estimators, need_fit=False, n_folds=5, method='soft', random_state=9527,
target_dims=1, scoring='neg_log_loss', ensemble_size=0):
return cls._greedy_ensemble_cls(task, estimators, need_fit=need_fit, n_folds=n_folds, method=method,
target_dims=target_dims, random_state=random_state, scoring=scoring,
ensemble_size=ensemble_size)
def _infer_ts_freq(df: pd.DataFrame, ts_name: str = consts.TIMESTAMP):
""" Infer the frequency of the time series.
Parameters
----------
ts_name: 'str', time column name.
"""
df[ts_name] = pd.to_datetime(df[ts_name])
df = df.sort_values([ts_name])
dateindex = pd.DatetimeIndex(df[ts_name])
freq = pd.infer_freq(dateindex)
if freq is not None:
return freq
else:
for i in range(len(df)):
freq = pd.infer_freq(dateindex[i:i + 3])
if freq != None:
return freq
return None
def _impute(values, offsets):
""" Index slide imputation.
Parameters
----------
offsets: list, offset lag.
"""
indices0, indices1 = np.where(np.isnan(values))
if len(indices0) > 0 and len(indices1) > 0:
padding = []
for offset in offsets:
offset_indices0 = indices0 + offset
start_bound_limit = np.where(indices0 + offset < 0)
end_bound_limit = np.where(indices0 + offset > len(values) - 1)
offset_indices0[start_bound_limit] = indices0[start_bound_limit]
offset_indices0[end_bound_limit] = indices0[end_bound_limit]
padding.append(values[(offset_indices0, indices1)])
values[(indices0, indices1)] = np.nanmean(padding, axis=0)
missing_rate = np.sum(np.isnan(values)) / values.size
else:
missing_rate = 0.
return values, missing_rate
def _infer_pos_label(y):
""" Infer pos label based on a few samples.
"""
y = y.tolist()
y_count_dict = {k: y.count(k) for k in set(y)}
pos_label = sorted(y_count_dict.items(), key=lambda x: x[1])[0][0]
return pos_label
def _expand_list(freq, pre_list):
try:
import re
s = int(re.findall(r'\d+', freq)[0])
return list(map(lambda x: x // s + 1, pre_list))
except:
return pre_list
__all__ = [
TSToolBox.__name__,
]