"""
From the paper:
Evolutionary Clustering and
Community Detection Algorithms for Social Media Health Surveillance
Kyle Spurlock, Tanner Bogart, Heba Elgazzar
2020
Version 1.2 of an Evolutionary Aadaptation of DBSCAN clustering algorithm.
Example
-------
df = pd.read_csv(r"encoded_twitter_dataset.csv")
X = df.iloc[:1200,[3,4,5]]
t_labels = np.unique(X['Time'])
evo1 = EvoDBSCAN(min_samples = 2)
evo1.callSTATIC(X, beta)
# Evolutionary a = 0
evo2 = EvoDBSCAN(min_samples = 2)
noise0 = evo2.callDBSCAN(X, t_labels, alpha = 0.8, beta=1)
"""
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
from sklearn.metrics import pairwise_distances
[docs]class EvoDBSCAN(DBSCAN):
"""Implementation of Evolutionary DBSCAN with dynamic radius measure.
Notes
-----
Acts as a wrapper for sci-kit learn DBSCAN class.
Attributes
----------
eps : float
Radius measure for finding neighbourhood of core point
min_samples : int
Minimum number of neighbours to form a core point
clusters_gen : list
Stores cluster count per generation
noise_gen : list
Stores noise count per time generation
eps_gen : list
Stores eps parameter per time generation
metric : str
Distance metric (manhattan, euclidean, etc.)
metric_params : dict
Additional arguments for metric function
algorithm : str
Algorithm used to compute pointwise distances
leaf_size: int
Specific to BallTree or cKDTree algorithm
p : int
Power of Minkowski metric
n_jobs : int
Number of parallel jobs
"""
def __init__(self, min_samples=5,*,
eps=0,
metric='euclidean',
algorithm='auto',
leaf_size=30,
p=None,
n_jobs=None):
super().__init__(min_samples=min_samples,
eps=eps,
metric=metric,
algorithm=algorithm,
leaf_size=leaf_size,
p = p,
n_jobs=n_jobs)
self.clusters_gen_ = []
self.noise_gen_ = []
self.eps_gen_ = []
[docs] def showPlot(self, current_gen, time, labels, noise, alpha,
save_plot=None):
"""Performs plotting of clusters at generation"""
# Plot clusters
plt.scatter(current_gen[:, 0], current_gen[:, 1],
c = labels, cmap = 'tab20')
# Plot noise
plt.scatter(current_gen[labels == -1, 0], current_gen[labels == -1, 1],
c = 'black')
# Create graph title with params
title = 'DBSCAN Generation-{t} Alpha-{a} Noise-{n}'.format(t= time,
a= alpha,
n= noise)
plt.title(title)
if not save_plot == None:
# Save fig as PNG
plt.savefig('{p}{t}.png'.format(p=save_plot, t = title))
plt.show() # Show fig
[docs] def callDBSCAN(self, X, times, alpha, beta=1, show_eps=False,
plot_gens=None, save_plot=None):
"""Perform evolutionary DBSCAN clustering.
Parameters
----------
X : 'pd.DataFrame'
Dataframe of tabular data samples
times : list
List containing times for X samples
alpha : float
Parameter used to modulate epsilon by snapshot vs. history
beta : float, optional
Optional scaling param for radius
show_eps : bool
Verbose for comptued epsilon value
plot : list, optional
Generations to show as plots
save_plot : str, optional
Path to save plots
Returns
-------
None
"""
previous_gen = None # Holds the previous generations distance matrix
seen_times = [] # Accumulates the seen time steps
t_intervals = np.unique(times)
if plot_gens == None:
# Default generations to plot
plot_gens = [int(np.median(t_intervals)/2), # Quarter 1
int(np.median(t_intervals)), # Middle
t_intervals[-1]] # End
# Loop through each time step
for time in t_intervals:
seen_times.append(time)
current_gen = X.loc[X['Time'].isin(seen_times)]
current_gen = current_gen.iloc[:,[0,1]].values
current_snap = X.loc[X['Time'] == time]
# Compute pairwise distances
snap_distances = pairwise_distances(current_snap)
# Calculating episolon based on current snapshot or history
if time == 0:
# No history cost to consider
self.eps = (np.median(np.unique(snap_distances))/beta)
else:
# Determine radius based on history and current snapshot
self.eps = ((((1-alpha)*
np.median(np.unique(snap_distances)))/beta)
+ (alpha*
np.median(np.unique(previous_gen)))/beta)
# If there is currently no valid distances, make radius very small
if self.eps == 0:
self.eps = 1e-5
if show_eps:
print('Generation {g} epsilon: {e}'
.format(g = self.current_time, e = self.eps))
# Determine cluster labels and noise
labels = self.fit_predict(current_gen)
noise = list(labels).count(-1)
# Save noise and cluster count for each generation
self.clusters_gen_.append(len(set(labels))
- (1 if -1 in labels else 0))
self.noise_gen_.append(noise)
# Saving previous generation as the history
previous_gen = pairwise_distances(current_gen)
# Plotting
if time in plot_gens:
self.showPlot(current_gen, time, labels, noise, alpha,
save_plot=save_plot)
return None
[docs] def callSTATIC(self, X, beta, save_plot = None):
""""Normal DBSCAN implementation"""
X = X.iloc[:,[0,1]].values
# Compute pairwise distances
dist_cg = pairwise_distances(X)
# Compute radius measure
self.eps = np.median(np.unique(dist_cg))/beta
# Determine cluster labels and noise
labels = self.fit_predict(X)
noise = list(labels).count(-1)
#Plotting
self.showPlot(X, time=None, labels=labels, noise=noise, alpha=None,
save_plot=save_plot)