import numpy as np
import pandas as pd
from sklearn.metrics import *
from hypernets.tabular import metrics
from hyperts.utils import consts as const
from hypernets.utils import logging
logger = logging.get_logger(__name__)
[docs]def check_is_array(y_true, y_pred):
"""Check whether the value is array-like.
If not, convert the value to array-like.
"""
if not isinstance(y_true, np.ndarray):
y_true = np.array(y_true)
if not isinstance(y_pred, np.ndarray):
y_pred = np.array(y_pred)
if y_true.ndim == 1:
y_true = y_true.reshape((-1, 1))
if y_pred.ndim == 1:
y_pred = y_pred.reshape((-1, 1))
return y_true, y_pred
[docs]def mse(y_true, y_pred, axis=None):
"""Mean squared error.
Note that this implementation can handle NaN.
Parameters
----------
y_true : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Ground truth (correct) target values.
y_pred : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Estimated target values.
Returns
-------
loss : float or ndarray of floats
A non-negative floating point value (the best value is 0.0), or an
array of floating point values, one for each individual target.
"""
y_true, y_pred = check_is_array(y_true, y_pred)
return np.nanmean((y_true - y_pred)**2, axis=axis)
[docs]def mae(y_true, y_pred, axis=None):
"""Mean absolute error.
Note that this implementation can handle NaN.
Parameters
----------
y_true : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Ground truth (correct) target values.
y_pred : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Estimated target values.
Returns
-------
loss : float or ndarray of floats
A non-negative floating point value (the best value is 0.0), or an
array of floating point values, one for each individual target.
"""
y_true, y_pred = check_is_array(y_true, y_pred)
return np.nanmean(np.abs(y_pred - y_true), axis=axis)
[docs]def rmse(y_true, y_pred):
"""Root mean squared error.
Note that this implementation can handle NaN.
Parameters
----------
y_true : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Ground truth (correct) target values.
y_pred : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Estimated target values.
Returns
-------
loss : float or ndarray of floats
A non-negative floating point value (the best value is 0.0), or an
array of floating point values, one for each individual target.
"""
y_true, y_pred = check_is_array(y_true, y_pred)
return np.sqrt(mse(y_true, y_pred))
[docs]def mape(y_true, y_pred, epsihon=1e-06, mask=False, axis=None):
"""Mean absolute percentage error.
Note that this implementation can handle NaN.
Parameters
----------
y_true : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Ground truth (correct) target values.
y_pred : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Estimated target values.
epsihon: float, threshold to avoid division by zero. Default is 1e-06.
mask: bool, if True, the mask removes y_ture=0. Default is False.
Returns
-------
loss : float or ndarray of floats
A non-negative floating point value (the best value is 0.0), or an
array of floating point values, one for each individual target.
"""
y_true, y_pred = check_is_array(y_true, y_pred)
masks = y_true!=0. if mask else y_true==y_true
diff = np.abs((y_pred[masks] - y_true[masks]) / np.clip(np.abs(y_true[masks]), epsihon, None))
return np.nanmean(diff, axis=axis)
[docs]def smape(y_true, y_pred, axis=None):
"""Symmetric mean absolute percentage error.
Note that this implementation can handle NaN.
Parameters
----------
y_true : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Ground truth (correct) target values.
y_pred : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Estimated target values.
Returns
-------
loss : float or ndarray of floats
A non-negative floating point value (the best value is 0.0), or an
array of floating point values, one for each individual target.
"""
y_true, y_pred = check_is_array(y_true, y_pred)
diff = np.nanmean(np.abs(y_pred - y_true) / (np.abs(y_pred) + np.abs(y_true)), axis=axis)
return 2.0 * diff
[docs]def msle(y_true, y_pred, epsihon=1e-06, axis=None):
"""Mean squared logarithmic error regression loss.
Note that this implementation can handle NaN and y_pred contains negative values.
Parameters
----------
y_true : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Ground truth (correct) target values.
y_pred : pd.DataFrame or array-like of shape (n_samples,) or (n_samples, n_outputs)
Estimated target values.
Returns
-------
loss : float or ndarray of floats
A non-negative floating point value (the best value is 0.0), or an
array of floating point values, one for each individual target.
"""
y_true, y_pred = check_is_array(y_true, y_pred)
if (y_true < 0).any():
y_true = np.clip(y_true, a_min=epsihon, a_max=abs(y_true))
if (y_pred < 0).any():
y_pred = np.clip(y_pred, a_min=epsihon, a_max=abs(y_pred))
return mse(np.log1p(y_true), np.log1p(y_pred), axis)
[docs]def auc(y_true, y_score, average="macro", sample_weight=None,
max_fpr=None, multi_class="raise", labels=None):
"""Compute Area Under the Receiver Operating Characteristic Curve (ROC AUC)
from prediction scores.
Note: this implementation can be used with binary, multiclass and
multilabel classification, but some restrictions apply (see sklearn.metrics.roc_auc_score).
"""
return roc_auc_score(y_true, y_score, average=average, sample_weight=sample_weight,
max_fpr=max_fpr, multi_class=multi_class, labels=labels)
def _task_to_average(task):
if 'binary' in task or task in const.TASK_LIST_DETECTION:
average = 'binary'
elif 'multiclass' in task:
average = 'macro'
else:
average = None
return average
def _infer_pos_label(y):
if isinstance(y, np.ndarray):
y = y.tolist()
elif isinstance(y, pd.Series):
y = y.to_list()
elif isinstance(y, pd.DataFrame):
y = y.values.squeeze().tolist()
else:
raise RuntimeError('Unknown data type.')
y_count_dict = {k: y.count(k) for k in set(y)}
pos_label = sorted(y_count_dict.items(), key=lambda x: x[1])[0][0]
return pos_label
[docs]def calc_score(y_true, y_preds, y_proba=None, metrics=('accuracy',), task=const.TASK_BINARY,
pos_label=None, classes=None, average=None):
score = {}
if y_proba is None:
y_proba = y_preds
if len(y_true.shape) == 2 and y_true.shape[-1] == 1:
y_true = y_true.reshape(-1)
if len(y_preds.shape) == 2 and y_preds.shape[-1] == 1:
y_preds = y_preds.reshape(-1)
if len(y_proba.shape) == 2 and y_proba.shape[-1] == 1:
y_proba = y_proba.reshape(-1)
if average is None:
average = _task_to_average(task)
recall_options = dict(average=average, labels=classes)
if task in [const.TASK_BINARY, const.TASK_MULTICLASS] and pos_label is None:
if 1 in y_true:
recall_options['pos_label'] = 1
elif 'yes' in y_true:
recall_options['pos_label'] = 'yes'
elif 'true' in y_true:
recall_options['pos_label'] = 'true'
else:
recall_options['pos_label'] = _infer_pos_label(y_true)
logger.info(f"pos_label is not specified and defaults to {recall_options['pos_label']}.")
elif task in [const.TASK_BINARY, const.TASK_MULTICLASS] and pos_label is not None:
if pos_label in y_true:
recall_options['pos_label'] = pos_label
else:
recall_options['pos_label'] = _infer_pos_label(y_true)
logger.warning(f"pos_label is incorrect and defaults to {recall_options['pos_label']}.")
else:
recall_options['pos_label'] = None
for metric in metrics:
if callable(metric):
if metric.__name__ in ['auc', 'roc_auc_score']:
if len(y_proba.shape) == 2:
if 'multiclass' in task:
score[metric.__name__] = metric(y_true, y_proba, multi_class='ovo', labels=classes)
else:
score[metric.__name__] = metric(y_true, y_proba[:, 1])
else:
score[metric.__name__] = metric(y_true, y_proba)
else:
try:
score[metric.__name__] = metric(y_true, y_preds)
except:
score[metric.__name__] = metric(y_true, y_preds, **recall_options)
else:
metric_lower = metric.lower()
if metric_lower in ['auc', 'roc_auc_score']:
if len(y_proba.shape) == 2:
if 'multiclass' in task:
score[metric] = roc_auc_score(y_true, y_proba, multi_class='ovo', labels=classes)
else:
score[metric] = roc_auc_score(y_true, y_proba[:, 1])
else:
score[metric] = roc_auc_score(y_true, y_proba)
elif metric_lower == 'accuracy':
if y_preds is None:
score[metric] = 0
else:
score[metric] = accuracy_score(y_true, y_preds)
elif metric_lower in ['recall']:
score[metric] = recall_score(y_true, y_preds, **recall_options)
elif metric_lower in ['precision']:
score[metric] = precision_score(y_true, y_preds, **recall_options)
elif metric_lower in ['f1']:
score[metric] = f1_score(y_true, y_preds, **recall_options)
elif metric_lower in ['mse', 'mean_squared_error', 'neg_mean_squared_error']:
try:
score[metric] = mean_squared_error(y_true, y_preds)
except:
score[metric] = mse(y_true, y_preds)
elif metric_lower in ['mae', 'mean_absolute_error', 'neg_mean_absolute_error']:
try:
score[metric] = mean_absolute_error(y_true, y_preds)
except:
score[metric] = mae(y_true, y_preds)
elif metric_lower in ['msle', 'mean_squared_log_error', 'neg_mean_squared_log_error']:
try:
score[metric] = mean_squared_log_error(y_true, y_preds)
except:
score[metric] = msle(y_true, y_preds)
elif metric_lower in ['rmse', 'root_mean_squared_error', 'neg_root_mean_squared_error']:
try:
score[metric] = mean_squared_error(y_true, y_preds, squared=False)
except:
score[metric] = rmse(y_true, y_preds)
elif metric_lower in ['mape', 'mean_absolute_percentage_error']:
try:
score[metric] = mean_absolute_percentage_error(y_true, y_preds)
except:
score[metric] = mape(y_true, y_preds)
elif metric_lower in ['smape']:
score[metric] = smape(y_true, y_preds)
elif metric_lower in ['r2', 'r2_score']:
score[metric] = r2_score(y_true, y_preds)
elif metric_lower in ['logloss', 'log_loss']:
score[metric] = log_loss(y_true, y_proba, labels=classes)
else:
logger.error(f'{metric_lower} is not supported. Therefore, reset reward_metric.')
return score
metric2scoring = {
'auc': 'roc_auc_ovo',
'roc_auc_score': 'roc_auc_ovo',
'accuracy': 'accuracy',
'accuracy_score': 'accuracy',
'recall': 'recall',
'recall_score': 'recall',
'precision': 'precision',
'precision_score': 'precision',
'f1': 'f1',
'f1_score': 'f1',
'mse': 'neg_mean_squared_error',
'neg_mean_squared_error': 'neg_mean_squared_error',
'mean_squared_error': 'neg_mean_squared_error',
'mae': 'neg_mean_absolute_error',
'neg_mean_absolute_error': 'neg_mean_absolute_error',
'mean_absolute_error': 'neg_mean_absolute_error',
'neg_mean_squared_log_error': 'neg_mean_squared_log_error',
'mean_squared_log_error': 'neg_mean_squared_log_error',
'rmse': 'neg_root_mean_squared_error',
'neg_root_mean_squared_error': 'neg_root_mean_squared_error',
'root_mean_squared_error': 'neg_root_mean_squared_error',
'mean_absolute_percentage_error': 'mean_absolute_percentage_error',
'r2': 'r2',
'r2_score': 'r2',
'logloss': 'neg_log_loss',
'log_loss': 'neg_log_loss',
'mape': mape,
'smape': smape,
'msle': msle,
# ...
}
greater_is_better = {
'mse': False,
'mae': False,
'rmse': False,
'mape': False,
'smape': False,
'msle': False,
'r2_score': True,
'explained_variance_score': True,
'max_error': False,
'mean_absolute_error': False,
'mean_squared_error': False,
'mean_squared_log_error': False,
'median_absolute_error': False,
'mean_absolute_percentage_error': False,
'mean_pinball_loss': False,
'mean_tweedie_deviance': False,
'mean_poisson_deviance': False,
'mean_gamma_deviance': False,
'accuracy_score': True,
'balanced_accuracy_score': True,
'top_k_accuracy': True,
'roc_auc': True,
# ...
}
[docs]def metric_to_scorer(metric, task, pos_label=None, **options):
optimize_direction = options.pop('optimize_direction')
if isinstance(metric, str) and isinstance(metric2scoring[metric], str):
scorer = get_scorer(metric2scoring[metric])
elif isinstance(metric, str) and callable(metric2scoring[metric]):
options.update({'greater_is_better': greater_is_better[metric]})
scorer = make_scorer(metric2scoring[metric], **options)
elif callable(metric) and metric.__name__ in metric2scoring.keys():
if isinstance(metric2scoring[metric.__name__], str):
scorer = get_scorer(metric2scoring[metric.__name__])
else:
options.update({'greater_is_better': greater_is_better[metric.__name__]})
scorer = make_scorer(metric2scoring[metric.__name__], **options)
elif callable(metric) and metric.__name__ not in metric2scoring.keys():
if optimize_direction is not None:
options.update({'greater_is_better': True
if optimize_direction.lower() == 'max' else False})
scorer = make_scorer(metric, **options)
else:
raise ValueError('Note that custom reward_metric need to provide '
'optimize_direction.')
else:
raise ValueError('The reward_metric definition might be wrong.')
if (isinstance(metric, str) and metric in const.POSLABEL_REQUIRED) or \
(callable(metric) and metric.__name__ in const.POSLABEL_REQUIRED):
average = _task_to_average(task)
scorer._kwargs['average'] = average
if average == 'binary':
scorer._kwargs['pos_label'] = pos_label
logger.info(f"pos_label is {pos_label}.")
return scorer
[docs]class Metrics(metrics.Metrics):
calc_score = calc_score
metric_to_scorer = metric_to_scorer