diff --git a/examples/cof_example.py b/examples/cof_example.py new file mode 100644 index 0000000000000000000000000000000000000000..57cb17558c8138299abd0ba503c70fd160aae10f --- /dev/null +++ b/examples/cof_example.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- +"""Example of using COF for outlier detection +""" +# Author: Yahya Almardeny <almardeny@gmail.com> +# License: MIT + +from __future__ import division +from __future__ import print_function + +import os +import sys + +# temporary solution for relative imports in case pyod is not installed +# if pyod is installed, no need to use the following line +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname("__file__"), '..'))) + +import matplotlib.pyplot as plt + +from pyod.models.cof import COF +from pyod.utils.data import generate_data +from pyod.utils.data import get_outliers_inliers +from pyod.utils.data import check_consistent_shape +from pyod.utils.data import evaluate_print + + +def visualize(clf_name, X_train, y_train, X_test, y_test, y_train_pred, + y_test_pred, show_figure=True, save_figure=False): + """Utility function for visualizing the results in examples. + Internal use only. + + Parameters + ---------- + clf_name : str + The name of the detector. + + X_train : numpy array of shape (n_samples, n_features) + The training samples. + + y_train : list or array of shape (n_samples,) + The ground truth of training samples. + + X_test : numpy array of shape (n_samples, n_features) + The test samples. + + y_test : list or array of shape (n_samples,) + The ground truth of test samples. + + y_train_pred : numpy array of shape (n_samples, n_features) + The predicted binary labels of the training samples. + + y_test_pred : numpy array of shape (n_samples, n_features) + The predicted binary labels of the test samples. + + show_figure : bool, optional (default=True) + If set to True, show the figure. + + save_figure : bool, optional (default=False) + If set to True, save the figure to the local. + + """ + + def _add_sub_plot(X_inliers, X_outliers, sub_plot_title, + inlier_color='blue', outlier_color='orange'): + """Internal method to add subplot of inliers and outliers. + + Parameters + ---------- + X_inliers : numpy array of shape (n_samples, n_features) + Outliers. + + X_outliers : numpy array of shape (n_samples, n_features) + Inliers. + + sub_plot_title : str + Subplot title. + + inlier_color : str, optional (default='blue') + The color of inliers. + + outlier_color : str, optional (default='orange') + The color of outliers. + + """ + plt.axis("equal") + plt.scatter(X_inliers[:, 0], X_inliers[:, 1], label='inliers', + color=inlier_color, s=40) + plt.scatter(X_outliers[:, 0], X_outliers[:, 1], + label='outliers', color=outlier_color, s=50, marker='^') + plt.title(sub_plot_title, fontsize=15) + plt.xticks([]) + plt.yticks([]) + plt.legend(loc=3, prop={'size': 10}) + return + + # check input data shapes are consistent + X_train, y_train, X_test, y_test, y_train_pred, y_test_pred = \ + check_consistent_shape(X_train, y_train, X_test, y_test, y_train_pred, + y_test_pred) + + if X_train.shape[1] != 2: + raise ValueError("Input data has to be 2-d for visualization. The " + "input data has {shape}.".format(shape=X_train.shape)) + + X_train_outliers, X_train_inliers = get_outliers_inliers(X_train, y_train) + X_train_outliers_pred, X_train_inliers_pred = get_outliers_inliers( + X_train, y_train_pred) + + X_test_outliers, X_test_inliers = get_outliers_inliers(X_test, y_test) + X_test_outliers_pred, X_test_inliers_pred = get_outliers_inliers( + X_test, y_test_pred) + + # plot ground truth vs. predicted results + fig = plt.figure(figsize=(12, 10)) + plt.suptitle("Demo of {clf_name} Detector".format(clf_name=clf_name), + fontsize=15) + + fig.add_subplot(221) + _add_sub_plot(X_train_inliers, X_train_outliers, 'Train Set Ground Truth', + inlier_color='blue', outlier_color='orange') + + fig.add_subplot(222) + _add_sub_plot(X_train_inliers_pred, X_train_outliers_pred, + 'Train Set Prediction', inlier_color='blue', + outlier_color='orange') + + fig.add_subplot(223) + _add_sub_plot(X_test_inliers, X_test_outliers, 'Test Set Ground Truth', + inlier_color='green', outlier_color='red') + + fig.add_subplot(224) + _add_sub_plot(X_test_inliers_pred, X_test_outliers_pred, + 'Test Set Prediction', inlier_color='green', + outlier_color='red') + + if save_figure: + plt.savefig('{clf_name}.png'.format(clf_name=clf_name), dpi=300) + + if show_figure: + plt.show() + + return + + +if __name__ == "__main__": + contamination = 0.1 # percentage of outliers + n_train = 200 # number of training points + n_test = 100 # number of testing points + + # Generate sample data + X_train, y_train, X_test, y_test = \ + generate_data(n_train=n_train, + n_test=n_test, + n_features=2, + contamination=contamination, + random_state=42) + + # train kNN detector + clf_name = 'COF' + clf = COF(n_neighbors=30) + clf.fit(X_train) + + # get the prediction labels and outlier scores of the training data + y_train_pred = clf.labels_ # binary labels (0: inliers, 1: outliers) + y_train_scores = clf.decision_scores_ # raw outlier scores + + # get the prediction on the test data + y_test_pred = clf.predict(X_test) # outlier labels (0 or 1) + y_test_scores = clf.decision_function(X_test) # outlier scores + + # evaluate and print the results + print("\nOn Training Data:") + evaluate_print(clf_name, y_train, y_train_scores) + print("\nOn Test Data:") + evaluate_print(clf_name, y_test, y_test_scores) + + # visualize the results + visualize(clf_name, X_train, y_train, X_test, y_test, y_train_pred, + y_test_pred, show_figure=True, save_figure=False) + diff --git a/pyod/models/cof.py b/pyod/models/cof.py new file mode 100644 index 0000000000000000000000000000000000000000..5572fe3c49d0df9548f01a77679802c131a3ff52 --- /dev/null +++ b/pyod/models/cof.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +"""Connectivity-Based Outlier Factor (COF) Algorithm +""" +# Author: Yahya Almardeny <almardeny@gmail.com> +# License: MIT +from operator import itemgetter +import numpy as np +from scipy.spatial import distance_matrix +from sklearn.utils import check_array +from pyod.utils import check_parameter +from .base import BaseDetector + + +class COF(BaseDetector): + """ + Algorithm to calculate the Connectivity-Based Outlier Factor (COF) + as an outlier score for observations. + The implementation is based on the work of: + Tang, J., Chen, Z., Fu, A. W. C., & Cheung, D. W. (2002). + Enhancing Effectiveness of Outlier Detections for Low Density Patterns. + In Pacific-Asia Conf. on Knowledge Discovery and Data Mining (PAKDD). + Taipei. pp. 535-548. DOI: 10.1007/3-540-47887-6_53 + + Parameters + ---------- + contamination : float in (0., 0.5), optional (default=0.1) + The amount of contamination of the data set, i.e. + the proportion of outliers in the data set. Used when fitting to + define the threshold on the decision function. + + n_neighbors : int, optional (default=20) + Number of neighbors to use by default for k neighbors queries. + Note that n_neighbors should be less than the number of samples. + If n_neighbors is larger than the number of samples provided, + all samples will be used. + + Attributes + ---------- + decision_scores_ : numpy array of shape (n_samples,) + The outlier scores of the training data. + The higher, the more abnormal. Outliers tend to have higher + scores. This value is available once the detector is + fitted. + + threshold_ : float + The threshold is based on ``contamination``. It is the + ``n_samples * contamination`` most abnormal samples in + ``decision_scores_``. The threshold is calculated for generating + binary outlier labels. + + labels_ : int, either 0 or 1 + The binary labels of the training data. 0 stands for inliers + and 1 for outliers/anomalies. It is generated by applying + ``threshold_`` on ``decision_scores_``. + + n_neighbors_: int + Number of neighbors to use by default for k neighbors queries. + """ + def __init__(self, contamination=0.1, n_neighbors=20): + super(COF, self).__init__(contamination=contamination) + if isinstance(n_neighbors, int): + check_parameter(n_neighbors, + low=1, + param_name='n_neighbors') + else: + raise TypeError("n_neighbors should be int. Got %s" % type(n_neighbors)) + self.n_neighbors_ = n_neighbors + self.decision_scores_ = None + + def fit(self, X, y=None): + """Fit detector. y is optional for unsupervised methods. + + Parameters + ---------- + X : numpy array of shape (n_samples, n_features) + The input samples. + + y : numpy array of shape (n_samples,), optional (default=None) + The ground truth of the input samples (labels). + """ + X = check_array(X) + if self.n_neighbors_ >= X.shape[0]: + self.n_neighbors_ = X.shape[0] - 1 + self._set_n_classes(y) + self.decision_scores_ = self.decision_function(X) + self._process_decision_scores() + + return self + + def decision_function(self, X): + """Predict raw anomaly score of X using the fitted detector. + The anomaly score of an input sample is computed based on different + detector algorithms. For consistency, outliers are assigned with + larger anomaly scores. + + Parameters + ---------- + X : numpy array of shape (n_samples, n_features) + The training input samples. Sparse matrices are accepted only + if they are supported by the base estimator. + + Returns + ------- + anomaly_scores : numpy array of shape (n_samples,) + The anomaly score of the input samples. + """ + return self._cof(X) + + def _cof(self, X): + """ + Connectivity-Based Outlier Factor (COF) Algorithm + This function is called internally to calculate the + Connectivity-Based Outlier Factor (COF) as an outlier + score for observations. + :return: numpy array containing COF scores for observations. + The greater the COF, the greater the outlierness. + """ + dist_matrix = np.array(distance_matrix(X, X)) + sbn_path_index, ac_dist, cof_ = [], [], [] + for i in range(X.shape[0]): + sbn_path = sorted(range(len(dist_matrix[i])), + key=dist_matrix[i].__getitem__) + sbn_path_index.append(sbn_path[1: self.n_neighbors_ + 1]) + cost_desc = [] + for j in range(self.n_neighbors_): + cost_desc.append(np.min(dist_matrix[sbn_path[j + 1]][sbn_path][:j + 1])) + acd = [] + for _h, cost_ in enumerate(cost_desc): + acd.append(((2. * (self.n_neighbors_ + 1 - (_h + 1))) / + ((self.n_neighbors_ + 1) * self.n_neighbors_)) * cost_) + ac_dist.append(np.sum(acd)) + for _g in range(X.shape[0]): + cof_.append((ac_dist[_g] * self.n_neighbors_) / + np.sum(itemgetter(*sbn_path_index[_g])(ac_dist))) + return np.nan_to_num(cof_) diff --git a/pyod/test/test_cof.py b/pyod/test/test_cof.py new file mode 100644 index 0000000000000000000000000000000000000000..c84d704d46add0b8e674ed268ac14565953e5d92 --- /dev/null +++ b/pyod/test/test_cof.py @@ -0,0 +1,144 @@ +# -*- coding: utf-8 -*- +from __future__ import division +from __future__ import print_function + +import os +import sys + +import unittest +# noinspection PyProtectedMember +from sklearn.utils.testing import assert_allclose +from sklearn.utils.testing import assert_array_less +from sklearn.utils.testing import assert_equal +from sklearn.utils.testing import assert_greater +from sklearn.utils.testing import assert_greater_equal +from sklearn.utils.testing import assert_less_equal +from sklearn.utils.testing import assert_raises +from sklearn.utils.testing import assert_true +from sklearn.utils.estimator_checks import check_estimator + +from sklearn.metrics import roc_auc_score +from scipy.stats import rankdata + +# temporary solution for relative imports in case pyod is not installed +# if pyod is installed, no need to use the following line +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from pyod.models.cof import COF +from pyod.utils.data import generate_data + + +class TestCOF(unittest.TestCase): + def setUp(self): + self.n_train = 100 + self.n_test = 50 + self.contamination = 0.1 + self.roc_floor = 0.6 + self.X_train, self.y_train, self.X_test, self.y_test = generate_data( + n_train=self.n_train, n_test=self.n_test, + contamination=self.contamination, random_state=42) + + self.clf = COF(contamination=self.contamination) + self.clf.fit(self.X_train) + + def test_sklearn_estimator(self): + #check_estimator(self.clf) + pass + + def test_parameters(self): + assert_true(hasattr(self.clf, 'decision_scores_') and + self.clf.decision_scores_ is not None) + assert_true(hasattr(self.clf, 'labels_') and + self.clf.labels_ is not None) + assert_true(hasattr(self.clf, 'threshold_') and + self.clf.threshold_ is not None) + assert_true(hasattr(self.clf, 'n_neighbors_') and + self.clf.n_neighbors_ is not None) + + def test_train_scores(self): + assert_equal(len(self.clf.decision_scores_), self.X_train.shape[0]) + + def test_prediction_scores(self): + pred_scores = self.clf.decision_function(self.X_test) + + # check score shapes + assert_equal(pred_scores.shape[0], self.X_test.shape[0]) + + # check performance + assert_greater(roc_auc_score(self.y_test, pred_scores), self.roc_floor) + + def test_prediction_labels(self): + pred_labels = self.clf.predict(self.X_test) + assert_equal(pred_labels.shape, self.y_test.shape) + + def test_prediction_proba(self): + pred_proba = self.clf.predict_proba(self.X_test) + assert_greater_equal(pred_proba.min(), 0) + assert_less_equal(pred_proba.max(), 1) + + def test_prediction_proba_linear(self): + pred_proba = self.clf.predict_proba(self.X_test, method='linear') + assert_greater_equal(pred_proba.min(), 0) + assert_less_equal(pred_proba.max(), 1) + + def test_prediction_proba_unify(self): + pred_proba = self.clf.predict_proba(self.X_test, method='unify') + assert_greater_equal(pred_proba.min(), 0) + assert_less_equal(pred_proba.max(), 1) + + def test_prediction_proba_parameter(self): + with assert_raises(ValueError): + self.clf.predict_proba(self.X_test, method='something') + + def test_fit_predict(self): + pred_labels = self.clf.fit_predict(self.X_train) + assert_equal(pred_labels.shape, self.y_train.shape) + + def test_fit_predict_score(self): + self.clf.fit_predict_score(self.X_test, self.y_test) + self.clf.fit_predict_score(self.X_test, self.y_test, + scoring='roc_auc_score') + self.clf.fit_predict_score(self.X_test, self.y_test, + scoring='prc_n_score') + with assert_raises(NotImplementedError): + self.clf.fit_predict_score(self.X_test, self.y_test, + scoring='something') + + def test_predict_rank(self): + pred_scores = self.clf.decision_function(self.X_test) + pred_ranks = self.clf._predict_rank(self.X_test) + print(pred_ranks) + + # assert the order is reserved + assert_allclose(rankdata(pred_ranks), rankdata(pred_scores), atol=2) + assert_array_less(pred_ranks, self.X_train.shape[0] + 1) + assert_array_less(-0.1, pred_ranks) + + def test_predict_rank_normalized(self): + pred_socres = self.clf.decision_function(self.X_test) + pred_ranks = self.clf._predict_rank(self.X_test, normalized=True) + + # assert the order is reserved + assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=2) + assert_array_less(pred_ranks, 1.01) + assert_array_less(-0.1, pred_ranks) + + def test_check_parameters(self): + with assert_raises(ValueError): + COF(contamination=0.1, n_neighbors=-1) + with assert_raises(ValueError): + COF(contamination=10., n_neighbors=5) + with assert_raises(TypeError): + COF(contamination=0.1, n_neighbors='not int') + with assert_raises(TypeError): + COF(contamination='not float', n_neighbors=5) + cof_ = COF(contamination=0.1, n_neighbors=10000) + cof_.fit(self.X_train) + assert self.X_train.shape[0] > cof_.n_neighbors_ + + def tearDown(self): + pass + + +if __name__ == '__main__': + unittest.main()