Source code for hyperts.framework.stats.iforest

# -*- coding:utf-8 -*-
"""

"""
import numpy as np
from sklearn.ensemble import IsolationForest
from hyperts.framework.wrappers import BaseAnomalyDetectorWrapper


[docs]class TSIsolationForest(BaseAnomalyDetectorWrapper): """Isolation Forest for anomaly detection. Parameters ---------- n_estimators : int, default=100 The number of base estimators in the ensemble. max_samples : "auto", int or float, default="auto" The number of samples to draw from X to train each base estimator. - If int, then draw `max_samples` samples. - If float, then draw `max_samples * X.shape[0]` samples. - If "auto", then `max_samples=min(256, n_samples)`. If max_samples is larger than the number of samples provided, all samples will be used for all trees (no sampling). contamination : float, default=0.05 The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the scores of the samples. - If 'auto', the threshold is determined as in the original paper. - If float, the contamination should be in the range (0, 0.5]. max_features : int or float, default=1.0 The number of features to draw from X to train each base estimator. - If int, then draw `max_features` features. - If float, then draw `max_features * X.shape[1]` features. bootstrap : bool, default=False If True, individual trees are fit on random subsets of the training data sampled with replacement. If False, sampling without replacement is performed. n_jobs : int, default=None The number of jobs to run in parallel for both :meth:`fit` and :meth:`predict`. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all processors. See :term:`Glossary <n_jobs>` for more details. random_state : int, RandomState instance or None, default=None Controls the pseudo-randomness of the selection of the feature and split values for each branching step and each tree in the forest. Pass an int for reproducible results across multiple function calls. See :term:`Glossary <random_state>`. verbose : int, default=0 Controls the verbosity of the tree building process. """ def __init__(self, n_estimators=100, max_samples="auto", contamination=0.05, max_features=1.0, bootstrap=False, n_jobs=None, random_state=None, verbose=0, name='isolation_forest'): super(TSIsolationForest, self).__init__(name=name, contamination=contamination) self.model = IsolationForest( n_estimators=n_estimators, max_samples=max_samples, contamination=contamination, max_features=max_features, bootstrap=bootstrap, n_jobs=n_jobs, random_state=random_state, verbose=verbose) def _fit(self, X, y=None, **kwargs): self.model.fit(X=X, y=None, sample_weight=kwargs.get('sample_weight', None)) self.decision_scores_ = self.model.decision_function(X) * -1 self._get_decision_attributes() def _predict(self, X, **kwargs): decision_func = self.decision_function(X) is_outlier = np.zeros_like(decision_func, dtype=int) is_outlier[decision_func > self.threshold_] = 1 return is_outlier
[docs] def decision_function(self, X): """Predict anomaly scores for sequences in X. Parameters ---------- X : numpy array of shape (n_samples, n_features). Returns ------- anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ self._check_is_fitted() if isinstance(X, np.ndarray): X = np.array(X) if len(X.shape) == 1: X = X.reshape(-1, 1) decision_func = self.model.decision_function(X) return decision_func * -1