-
Notifications
You must be signed in to change notification settings - Fork 1
/
DBCV.py
310 lines (263 loc) · 12.1 KB
/
DBCV.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""
Implimentation of Density-Based Clustering Validation "DBCV"
Citation:
Moulavi, Davoud, et al. "Density-based clustering validation."
Proceedings of the 2014 SIAM International Conference on Data Mining.
Society for Industrial and Applied Mathematics, 2014.
"""
import numpy as np
from scipy.spatial.distance import euclidean, cdist
from scipy.sparse.csgraph import minimum_spanning_tree
from scipy.sparse import csgraph
def DBCV(fpath, labelpath, weightpath, dist_function=euclidean):
"""
Density Based clustering validation
Args:
X (np.ndarray): ndarray with dimensions [n_samples, n_features]
data to check validity of clustering
labels (np.array): clustering assignments for data X
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point
Returns: cluster_validity (float)
score in range[-1, 1] indicating validity of clustering assignments
"""
X = np.loadtxt(fpath, comments="#", delimiter=",")
labels = np.loadtxt(labelpath, comments="#", delimiter=",")
weights = np.loadtxt(weightpath, comments="#", delimiter=",")
#print(weights)
graph = _mutual_reach_dist_graph(X, labels, weights, dist_function)
#print("making MST")
graph = np.array(graph, dtype=object)
#e = np.savetxt('output.txt', graph, comments="#", delimiter=",")
mst = _mutual_reach_dist_MST(graph)
#print("assigning cluster validity")
cluster_validity = _clustering_validity_index(mst, labels)
return cluster_validity
def _core_dist(point, weight, neighbors, neighborweights, dist_function):
"""
Computes the core distance of a point.
Core distance is the inverse density of an object.
Args:
point (np.array): array of dimensions (n_features,)
point to compute core distance of
neighbors (np.ndarray): array of dimensions (n_neighbors, n_features):
array of all other points in object class
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point
Returns: core_dist (float)
inverse density of point
"""
n_features = np.shape(point)[0]
n_neighbors = np.sum(neighborweights) + weight - 1
"""n_neighbors = np.shape(neighbors)[0]
distance_vector = cdist(point.reshape(1, -1), neighbors)
distance_vector = distance_vector[distance_vector != 0]
numerator = ((1/distance_vector)**n_features).sum()
core_dist = (numerator / (n_neighbors - 1)) ** (-1/n_features)"""
numerator = 0
"""for i in range(np.shape(neighbors)[0]):
distance = cdist(point.reshape(1, -1), neighbors[i])
numerator = numerator + (((1/distance)**n_features)*neighborweights[i])"""
distance_vector = cdist(point.reshape(1, -1), neighbors)
numerator_vector = distance_vector
for i in range(np.shape(neighbors)[0]):
temp = distance_vector[0][i]
if ( temp == 0):
numerator_vector[0][i] = 0
else:
numerator_vector[0][i] = (((1/distance_vector[0][i])**n_features))
numerator = np.dot(numerator_vector, neighborweights)
core_dist = (numerator / (n_neighbors)) ** (-1/n_features)
return core_dist
def _mutual_reachability_dist(point_i, point_j, neighbors_i,
neighbors_j, weight_i, weight_j,memberweights_i, memberweights_j, dist_function):
""".
Computes the mutual reachability distance between points
Args:
point_i (np.array): array of dimensions (n_features,)
point i to compare to point j
point_j (np.array): array of dimensions (n_features,)
point i to compare to point i
neighbors_i (np.ndarray): array of dims (n_neighbors, n_features):
array of all other points in object class of point i
neighbors_j (np.ndarray): array of dims (n_neighbors, n_features):
array of all other points in object class of point j
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point
Returns: mutual_reachability (float)
mutual reachability between points i and j
"""
core_dist_i = _core_dist(point_i, weight_i, neighbors_i, memberweights_i, dist_function)
core_dist_j = _core_dist(point_j, weight_j, neighbors_j, memberweights_j, dist_function)
dist = (dist_function(point_i, point_j))*(weight_i/weight_j)
mutual_reachability = np.max([core_dist_i, core_dist_j, dist])
return mutual_reachability
def _mutual_reach_dist_graph(X, labels, weights, dist_function):
"""
Computes the mutual reach distance complete graph.
Graph of all pair-wise mutual reachability distances between points
Args:
X (np.ndarray): ndarray with dimensions [n_samples, n_features]
data to check validity of clustering
labels (np.array): clustering assignments for data X
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point
Returns: graph (np.ndarray)
array of dimensions (n_samples, n_samples)
Graph of all pair-wise mutual reachability distances between points.
"""
n_samples = np.shape(X)[0]
graph = []
counter = 0
for row in range(n_samples):
graph_row = []
for col in range(n_samples):
point_i = X[row]
point_j = X[col]
class_i = labels[row]
class_j = labels[col]
weight_i = weights[row]
weight_j = weights[col]
members_i = _get_label_members(X, labels, class_i)
members_j = _get_label_members(X, labels, class_j)
memberweights_i = _get_label_members(weights, labels, class_i)
memberweights_j = _get_label_members(weights, labels, class_j)
dist = _mutual_reachability_dist(point_i, point_j,
members_i, members_j, weight_i, weight_j,memberweights_i, memberweights_j,
dist_function)
graph_row.append(dist)
counter += 1
graph.append(graph_row)
graph = np.array(graph)
return graph
def _mutual_reach_dist_MST(dist_tree):
"""
Computes minimum spanning tree of the mutual reach distance complete graph
Args:
dist_tree (np.ndarray): array of dimensions (n_samples, n_samples)
Graph of all pair-wise mutual reachability distances
between points.
Returns: minimum_spanning_tree (np.ndarray)
array of dimensions (n_samples, n_samples)
minimum spanning tree of all pair-wise mutual reachability
distances between points.
"""
mst = minimum_spanning_tree(dist_tree).toarray()
return mst + np.transpose(mst)
def _cluster_density_sparseness(MST, labels, cluster):
"""
Computes the cluster density sparseness, the minimum density
within a cluster
Args:
MST (np.ndarray): minimum spanning tree of all pair-wise
mutual reachability distances between points.
labels (np.array): clustering assignments for data X
cluster (int): cluster of interest
Returns: cluster_density_sparseness (float)
value corresponding to the minimum density within a cluster
"""
indices = np.where(labels == cluster)[0]
cluster_MST = MST[indices][:, indices]
cluster_density_sparseness = np.max(cluster_MST)
return cluster_density_sparseness
def _cluster_density_separation(MST, labels, cluster_i, cluster_j):
"""
Computes the density separation between two clusters, the maximum
density between clusters.
Args:
MST (np.ndarray): minimum spanning tree of all pair-wise
mutual reachability distances between points.
labels (np.array): clustering assignments for data X
cluster_i (int): cluster i of interest
cluster_j (int): cluster j of interest
Returns: density_separation (float):
value corresponding to the maximum density between clusters
"""
indices_i = np.where(labels == cluster_i)[0]
indices_j = np.where(labels == cluster_j)[0]
shortest_paths = csgraph.dijkstra(MST, indices=indices_i)
relevant_paths = shortest_paths[:, indices_j]
density_separation = np.min(relevant_paths)
return density_separation
def _cluster_validity_index(MST, labels, cluster):
"""
Computes the validity of a cluster (validity of assignmnets)
Args:
MST (np.ndarray): minimum spanning tree of all pair-wise
mutual reachability distances between points.
labels (np.array): clustering assignments for data X
cluster (int): cluster of interest
Returns: cluster_validity (float)
value corresponding to the validity of cluster assignments
"""
min_density_separation = np.inf
for cluster_j in np.unique(labels):
if cluster_j != cluster:
cluster_density_separation = _cluster_density_separation(MST,
labels,
cluster,
cluster_j)
if cluster_density_separation < min_density_separation:
min_density_separation = cluster_density_separation
cluster_density_sparseness = _cluster_density_sparseness(MST,
labels,
cluster)
numerator = min_density_separation - cluster_density_sparseness
denominator = np.max([min_density_separation, cluster_density_sparseness])
cluster_validity = numerator / denominator
return cluster_validity
def _clustering_validity_index(MST, labels):
"""
Computes the validity of all clustering assignments for a
clustering algorithm
Args:
MST (np.ndarray): minimum spanning tree of all pair-wise
mutual reachability distances between points.
labels (np.array): clustering assignments for data X
Returns: validity_index (float):
score in range[-1, 1] indicating validity of clustering assignments
"""
n_samples = len(labels)
validity_index = 0
for label in np.unique(labels):
fraction = np.sum(labels == label) / float(n_samples)
cluster_validity = _cluster_validity_index(MST, labels, label)
validity_index += fraction * cluster_validity
return validity_index
def _get_label_members(X, labels, cluster):
"""
Helper function to get samples of a specified cluster.
Args:
X (np.ndarray): ndarray with dimensions [n_samples, n_features]
data to check validity of clustering
labels (np.array): clustering assignments for data X
cluster (int): cluster of interest
Returns: members (np.ndarray)
array of dimensions (n_samples, n_features) of samples of the
specified cluster.
"""
indices = np.where(labels == cluster)[0]
members = X[indices]
return members
def _get_label_weights(X, labels, cluster):
"""
Helper function to get samples of a specified cluster.
Args:
X (np.ndarray): ndarray with dimensions [n_samples, n_features]
data to check validity of clustering
labels (np.array): clustering assignments for data X
cluster (int): cluster of interest
Returns: members (np.ndarray)
array of dimensions (n_samples, n_features) of samples of the
specified cluster.
"""
indices = np.where(labels == cluster)[0]
members = X[indices]
return members
#print("time to start")
#fpath = "C:/Users/dhanu/Documents/DhanujG/Projects/Unsupervised Clustering Analysis for Competitive Species Traits/R_Python_C/New_Eco_Data_and_Validation_Suite/temp.txt"
#cpath = "C:/Users/dhanu/Documents/DhanujG/Projects/Unsupervised Clustering Analysis for Competitive Species Traits/R_Python_C/New_Eco_Data_and_Validation_Suite/clusters.txt"
#Y = np.loadtxt(cpath)
#tempDBCV = DBCV(fpath, Y)
#print(tempDBCV)
#exit()