Created
May 12, 2018 10:11
-
-
Save YikaiLL/c7508810cca84a46abff38475ba77061 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def knn_detect_outlier(self): | |
""" | |
Using knn_detect_outlier to output the outlier score for different sequence. | |
The idea is that for knn_detect_outlier, each sequence in the old cluster are representative of normal sequence. | |
Distance between | |
:return: | |
""" | |
# Normal | |
from sklearn.neighbors import NearestNeighbors | |
neigh = NearestNeighbors(n_neighbors=self.K_NEIGHBOR, metric=self.mydist) | |
neigh.fit(self.X) | |
dist, index = neigh.kneighbors(self.Y) | |
min_distance = np.mean(dist, axis=1) | |
filter_threshold = np.percentile(min_distance, self.FILTER_PERCENTILE) | |
cluster_threshold = np.percentile(min_distance, self.FINAL_PERCENTILE) | |
if self.DEBUG_PRINT: | |
print "[INFO] cluster_threshold set to", cluster_threshold, "for", self.METRIC, self.METHOD | |
print "[INFO] filter_threshold set to", filter_threshold, "for", self.METRIC, self.METHOD | |
self.write_to_output("[INFO] threshold set to", cluster_threshold, "for", self.METRIC, self.METHOD) | |
self.write_to_output("[INFO] filter_threshold set to", filter_threshold, "for", self.METRIC, self.METHOD) | |
# print dist | |
if self.DEBUG_PRINT: | |
print "dist shape", dist.shape | |
print "index shape", index.shape | |
self.print_array_info(min_distance, save_flag=True, | |
savename="min_dis_" + self.METHOD + "_" + str(self.K_NEIGHBOR) + "_" + self.METRIC) | |
self.general_output_detect_outlier(min_distance, filter_threshold, cluster_threshold) | |
return | |
def mydist(self, x, y): | |
# Weight array | |
if self.METRIC in BINARY_METRIC_SET: | |
return cdist([x], [y], metric=self.METRIC, w=self.weight_array)[0][0] | |
else: | |
return cdist([x], [y], metric=self.METRIC)[0][0] | |
def general_output_detect_outlier(self, min_distance, filter_threshold, cluster_threshold): | |
""" | |
The general output function. | |
:param min_distance: the distance list of each point in self.Y matrix. | |
:param filter_threshold: | |
:param cluster_threshold: | |
:return: | |
""" | |
temp_where = np.where(min_distance > filter_threshold) | |
outlier_Xindex_list = temp_where[0] | |
outlier_distance = min_distance[temp_where] | |
outlier_weight_list = [self.calculate_weight_by_index(i, self.Y_unweight) | |
for i in outlier_Xindex_list] | |
self.write_to_output("Old sequence num", self.X.shape[0]) | |
self.write_to_output("New sequence num", self.Y.shape[0]) | |
# Sort the distance according to the weight, then to the distance sort them at the same time | |
another_list_tuple = zip(outlier_Xindex_list, outlier_distance, outlier_weight_list) | |
X_index = range(0, self.Y.shape[0]) | |
full_weight_list = [self.calculate_weight_by_index(i, self.Y_unweight) for i in X_index] | |
if self.DEBUG_PRINT: | |
assert len(outlier_Xindex_list) == len(outlier_distance) == len(outlier_weight_list) | |
if self.SORT_WITH == DISTANCE: | |
another_list_tuple.sort(key=lambda x: (x[1], x[2]), reverse=True) | |
elif self.SORT_WITH == WEIGHT: | |
another_list_tuple.sort(key=lambda x: (x[2], x[1]), reverse=True) | |
else: | |
raise ValueError("Incorrect value for", self.SORT_WITH) | |
# Need to get the index after sorting | |
del outlier_Xindex_list | |
outlier_Xindex_list = [item[0] for item in another_list_tuple] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment