Skip to content

Instantly share code, notes, and snippets.

@YikaiLL
Created May 12, 2018 10:11
Show Gist options
  • Save YikaiLL/c7508810cca84a46abff38475ba77061 to your computer and use it in GitHub Desktop.
Save YikaiLL/c7508810cca84a46abff38475ba77061 to your computer and use it in GitHub Desktop.
def knn_detect_outlier(self):
"""
Using knn_detect_outlier to output the outlier score for different sequence.
The idea is that for knn_detect_outlier, each sequence in the old cluster are representative of normal sequence.
Distance between
:return:
"""
# Normal
from sklearn.neighbors import NearestNeighbors
neigh = NearestNeighbors(n_neighbors=self.K_NEIGHBOR, metric=self.mydist)
neigh.fit(self.X)
dist, index = neigh.kneighbors(self.Y)
min_distance = np.mean(dist, axis=1)
filter_threshold = np.percentile(min_distance, self.FILTER_PERCENTILE)
cluster_threshold = np.percentile(min_distance, self.FINAL_PERCENTILE)
if self.DEBUG_PRINT:
print "[INFO] cluster_threshold set to", cluster_threshold, "for", self.METRIC, self.METHOD
print "[INFO] filter_threshold set to", filter_threshold, "for", self.METRIC, self.METHOD
self.write_to_output("[INFO] threshold set to", cluster_threshold, "for", self.METRIC, self.METHOD)
self.write_to_output("[INFO] filter_threshold set to", filter_threshold, "for", self.METRIC, self.METHOD)
# print dist
if self.DEBUG_PRINT:
print "dist shape", dist.shape
print "index shape", index.shape
self.print_array_info(min_distance, save_flag=True,
savename="min_dis_" + self.METHOD + "_" + str(self.K_NEIGHBOR) + "_" + self.METRIC)
self.general_output_detect_outlier(min_distance, filter_threshold, cluster_threshold)
return
def mydist(self, x, y):
# Weight array
if self.METRIC in BINARY_METRIC_SET:
return cdist([x], [y], metric=self.METRIC, w=self.weight_array)[0][0]
else:
return cdist([x], [y], metric=self.METRIC)[0][0]
def general_output_detect_outlier(self, min_distance, filter_threshold, cluster_threshold):
"""
The general output function.
:param min_distance: the distance list of each point in self.Y matrix.
:param filter_threshold:
:param cluster_threshold:
:return:
"""
temp_where = np.where(min_distance > filter_threshold)
outlier_Xindex_list = temp_where[0]
outlier_distance = min_distance[temp_where]
outlier_weight_list = [self.calculate_weight_by_index(i, self.Y_unweight)
for i in outlier_Xindex_list]
self.write_to_output("Old sequence num", self.X.shape[0])
self.write_to_output("New sequence num", self.Y.shape[0])
# Sort the distance according to the weight, then to the distance sort them at the same time
another_list_tuple = zip(outlier_Xindex_list, outlier_distance, outlier_weight_list)
X_index = range(0, self.Y.shape[0])
full_weight_list = [self.calculate_weight_by_index(i, self.Y_unweight) for i in X_index]
if self.DEBUG_PRINT:
assert len(outlier_Xindex_list) == len(outlier_distance) == len(outlier_weight_list)
if self.SORT_WITH == DISTANCE:
another_list_tuple.sort(key=lambda x: (x[1], x[2]), reverse=True)
elif self.SORT_WITH == WEIGHT:
another_list_tuple.sort(key=lambda x: (x[2], x[1]), reverse=True)
else:
raise ValueError("Incorrect value for", self.SORT_WITH)
# Need to get the index after sorting
del outlier_Xindex_list
outlier_Xindex_list = [item[0] for item in another_list_tuple]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment