import collections
import numpy as np
import pandas as pd
from hyperts.utils import get_tool_box
from hyperts.framework.meta_learning import tsfeatures
from sklearn.neighbors import KDTree
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_array
from sklearn.utils.validation import check_random_state
from sklearn.utils.random import sample_without_replacement
from hyperts.utils.tstoolbox import TSToolBox
[docs]def from_3d_array_to_nested_df(data: np.ndarray,
columns: str = None,
cells_as_array: bool = False):
"""Convert Numpy ndarray with shape (nb_samples, series_length, nb_variables)
into nested pandas DataFrame (with time series as numpy array or pandas Series in cells)
Parameters
----------
data : np.ndarray
3-dimensional Numpy array to convert to nested pandas DataFrame format
columns: list-like, default = None
Optional list of names to use for naming nested DataFrame's columns
cells_as_array : bool, default = False
If True, then nested cells contain Numpy array
If False, then nested cells contain pandas Series
Returns
----------
df : pd.DataFrame
"""
tb = get_tool_box(data)
return tb.from_3d_array_to_nested_df(data, columns, cells_as_array)
[docs]def from_nested_df_to_3d_array(data: pd.DataFrame):
"""Convert nested pandas DataFrame (with time series as numpy array or pandas Series in cells)
into Numpy ndarray with shape (nb_samples, series_length, nb_variables).
Parameters
----------
data : pd.DataFrame
Nested pandas DataFrame
Returns
-------
data_3d : np.arrray
3-dimensional NumPy array
"""
tb = get_tool_box(data)
return tb.from_nested_df_to_3d_array(data)
[docs]def random_train_test_split(*arrays,
test_size=None,
train_size=None,
random_state=None,
shuffle=True,
stratify=None):
"""Split arrays or matrices into random train and test subsets. This
is a wrapper of scikit-learn's ``train_test_split`` that has shuffle.
"""
tb = get_tool_box(arrays[0])
return tb.random_train_test_split(*arrays,
test_size=test_size,
train_size=train_size,
random_state=random_state,
shuffle=shuffle,
stratify=stratify)
[docs]def temporal_train_test_split(*arrays,
test_size=None,
train_size=None,
test_horizon=None):
"""Split arrays or matrices into sequential train and test subsets.This
is a wrapper of scikit-learn's ``train_test_split`` that does not shuffle.
Parameters
----------
*arrays : sequence of indexables with same length / shape[0] Allowed inputs
are lists, numpy arrays, scipy-sparse matrices or pandas dataframes.
test_size : float, int or None, optional (default=None)
If float, should be between 0.0 and 1.0 and represent the proportion
of the dataset to include in the test split. If int, represents the
absolute number of test samples. If None, the value is set to the
complement of the train size. If ``train_size`` is also None, it will
be set to 0.25.
train_size : float, int, or None, (default=None)
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the train split. If
int, represents the absolute number of train samples. If None,
the value is automatically set to the complement of the test size.
test_horizon: int or None, (default=None)
If int, represents the forecast horizon length.
Returns
-------
splitting : list, length=2 * len(arrays)
List containing train-test split of inputs.
"""
tb = get_tool_box(arrays[0])
return tb.temporal_train_test_split(*arrays,
test_size=test_size,
test_horizon=test_horizon,
train_size=train_size)
[docs]def generate_anomaly_pseudo_ground_truth(
X_train,
X_test=None,
local_region_size: int=30,
local_max_features: float=1.0,
local_region_iterations: int=20,
generate_train_label_type: str='iforest',
contamination: float=0.05,
random_state=None):
"""Genrate pseudo ground truth for anomaly detection.
Parameters
----------
X_train : numpy array of shape (n_samples, n_features).
X_test : numpy array of shape (n_samples, n_features).
local_region_size : int, optional (default=30)
Number of training points to consider in each iteration of the local
region generation process (30 by default).
local_max_features : float in (0.5, 1.), optional (default=1.0)
Maximum proportion of number of features to consider when defining the
local region (1.0 by default).
local_region_iterations : int, optional (default=20)
Number of iteration of the local region generation process.
generate_train_label_type : str, optional (default='iforest')
The method of genetating training pseudo labels.
contamination : 'auto' or float, optional (default=0.05)
The amount of contamination of the data set, i.e. the proportion
of outliers in the data set. Used when fitting to define the threshold
on the scores of the samples.
random_state : RandomState, optional (default=None)
A random number generator instance to define the state of the random
permutations generator.
References
----------
"""
if not isinstance(X_train, np.ndarray):
X_train = np.array(X_train)
if len(X_train.shape) == 1:
X_train = X_train.reshape(-1, 1)
check_array(X_train)
scaler = StandardScaler()
X_train_norm = scaler.fit_transform(X_train)
# Generate train pseudo ground truth process
if generate_train_label_type == 'iforest':
from sklearn.ensemble import IsolationForest
detector = IsolationForest(contamination=contamination)
detector.fit(X_train_norm)
decision_func = detector.decision_function(X_train_norm)
train_pseudo_labels = np.zeros_like(decision_func, dtype=int)
train_pseudo_labels[decision_func < 0] = 1
else:
raise ValueError(f'This type is not spported.')
if X_test is None:
return train_pseudo_labels.reshape(-1, 1), None
else:
if not isinstance(X_test, np.ndarray):
X_test = np.array(X_test)
# Generate test pseudo ground truth process
check_array(X_test)
random_state = check_random_state(random_state)
local_region_list = [[]] * X_test.shape[0]
final_local_region_list = [[]] * X_test.shape[0]
local_region_threshold = int(local_region_iterations / 2)
if len(X_test.shape) == 1:
X_test = X_test.reshape(-1, 1)
n_features = X_train.shape[1]
if local_max_features > 1.0:
local_max_features = 1.0
local_min_features = 0.5
if n_features * local_min_features < 1:
local_min_features = 1.0
X_test_norm = scaler.transform(X_test)
min_features = n_features * local_min_features
max_features = n_features * local_max_features
for _ in range(local_region_iterations):
if local_min_features == local_max_features:
feature_indices = range(0, n_features)
else:
random_n_features = random_state.randint(min_features, max_features)
feature_indices = sample_without_replacement(n_population=n_features,
n_samples=random_n_features, random_state=random_state)
# Bulid KDTree out of training subspace
tree = KDTree(X_train_norm[:, feature_indices])
# Find neighbors of each test instance
_, index_arr = tree.query(X_test_norm[:, feature_indices], k=local_region_size)
# Add neighbors to local region list
for i in range(X_test_norm.shape[0]):
local_region_list[i] = local_region_list[i] + index_arr[i, :].tolist()
# Keep nearby points which occur at least local_region_threshold times
for j in range(X_test_norm.shape[0]):
tmp = []
for item, count in collections.Counter(local_region_list[j]).items():
if count > local_region_threshold:
tmp.append(item)
decrease_value = 0
while len(tmp) < 2:
decrease_value += 1
assert decrease_value < local_region_threshold
tmp = []
for item, count in collections.Counter(local_region_list[j]).items():
if count > local_region_threshold - decrease_value:
tmp.append(item)
final_local_region_list[j] = tmp
# Generate test pseudo ground truth
test_pseudo_labels = np.zeros(shape=(X_test.shape[0], 1), dtype=int)
for k in range(X_test_norm.shape[0]):
train_local_pseudo_labels = train_pseudo_labels[final_local_region_list[k]]
if np.sum(train_local_pseudo_labels) > 1:
test_pseudo_labels[k] = 1
train_pseudo_labels = train_pseudo_labels.reshape(-1, 1)
test_pseudo_labels = test_pseudo_labels.reshape(-1, 1)
return train_pseudo_labels, test_pseudo_labels