Source code for evoclusterstream.cluster.EvoDBSCAN

"""
From the paper:
Evolutionary Clustering and 
Community Detection Algorithms for Social Media Health Surveillance

Kyle Spurlock, Tanner Bogart, Heba Elgazzar
2020

Version 1.2 of an Evolutionary Aadaptation of DBSCAN clustering algorithm.

Example
-------
    df = pd.read_csv(r"encoded_twitter_dataset.csv")
    X = df.iloc[:1200,[3,4,5]] 
    t_labels = np.unique(X['Time'])
    
    evo1 = EvoDBSCAN(min_samples = 2)
    evo1.callSTATIC(X, beta)
    
    # Evolutionary a = 0
    evo2 = EvoDBSCAN(min_samples = 2)
    noise0 = evo2.callDBSCAN(X, t_labels, alpha = 0.8, beta=1)
"""

import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
from sklearn.metrics import pairwise_distances


[docs]class EvoDBSCAN(DBSCAN): """Implementation of Evolutionary DBSCAN with dynamic radius measure. Notes ----- Acts as a wrapper for sci-kit learn DBSCAN class. Attributes ---------- eps : float Radius measure for finding neighbourhood of core point min_samples : int Minimum number of neighbours to form a core point clusters_gen : list Stores cluster count per generation noise_gen : list Stores noise count per time generation eps_gen : list Stores eps parameter per time generation metric : str Distance metric (manhattan, euclidean, etc.) metric_params : dict Additional arguments for metric function algorithm : str Algorithm used to compute pointwise distances leaf_size: int Specific to BallTree or cKDTree algorithm p : int Power of Minkowski metric n_jobs : int Number of parallel jobs """ def __init__(self, min_samples=5,*, eps=0, metric='euclidean', algorithm='auto', leaf_size=30, p=None, n_jobs=None): super().__init__(min_samples=min_samples, eps=eps, metric=metric, algorithm=algorithm, leaf_size=leaf_size, p = p, n_jobs=n_jobs) self.clusters_gen_ = [] self.noise_gen_ = [] self.eps_gen_ = []
[docs] def showPlot(self, current_gen, time, labels, noise, alpha, save_plot=None): """Performs plotting of clusters at generation""" # Plot clusters plt.scatter(current_gen[:, 0], current_gen[:, 1], c = labels, cmap = 'tab20') # Plot noise plt.scatter(current_gen[labels == -1, 0], current_gen[labels == -1, 1], c = 'black') # Create graph title with params title = 'DBSCAN Generation-{t} Alpha-{a} Noise-{n}'.format(t= time, a= alpha, n= noise) plt.title(title) if not save_plot == None: # Save fig as PNG plt.savefig('{p}{t}.png'.format(p=save_plot, t = title)) plt.show() # Show fig
[docs] def callDBSCAN(self, X, times, alpha, beta=1, show_eps=False, plot_gens=None, save_plot=None): """Perform evolutionary DBSCAN clustering. Parameters ---------- X : 'pd.DataFrame' Dataframe of tabular data samples times : list List containing times for X samples alpha : float Parameter used to modulate epsilon by snapshot vs. history beta : float, optional Optional scaling param for radius show_eps : bool Verbose for comptued epsilon value plot : list, optional Generations to show as plots save_plot : str, optional Path to save plots Returns ------- None """ previous_gen = None # Holds the previous generations distance matrix seen_times = [] # Accumulates the seen time steps t_intervals = np.unique(times) if plot_gens == None: # Default generations to plot plot_gens = [int(np.median(t_intervals)/2), # Quarter 1 int(np.median(t_intervals)), # Middle t_intervals[-1]] # End # Loop through each time step for time in t_intervals: seen_times.append(time) current_gen = X.loc[X['Time'].isin(seen_times)] current_gen = current_gen.iloc[:,[0,1]].values current_snap = X.loc[X['Time'] == time] # Compute pairwise distances snap_distances = pairwise_distances(current_snap) # Calculating episolon based on current snapshot or history if time == 0: # No history cost to consider self.eps = (np.median(np.unique(snap_distances))/beta) else: # Determine radius based on history and current snapshot self.eps = ((((1-alpha)* np.median(np.unique(snap_distances)))/beta) + (alpha* np.median(np.unique(previous_gen)))/beta) # If there is currently no valid distances, make radius very small if self.eps == 0: self.eps = 1e-5 if show_eps: print('Generation {g} epsilon: {e}' .format(g = self.current_time, e = self.eps)) # Determine cluster labels and noise labels = self.fit_predict(current_gen) noise = list(labels).count(-1) # Save noise and cluster count for each generation self.clusters_gen_.append(len(set(labels)) - (1 if -1 in labels else 0)) self.noise_gen_.append(noise) # Saving previous generation as the history previous_gen = pairwise_distances(current_gen) # Plotting if time in plot_gens: self.showPlot(current_gen, time, labels, noise, alpha, save_plot=save_plot) return None
[docs] def callSTATIC(self, X, beta, save_plot = None): """"Normal DBSCAN implementation""" X = X.iloc[:,[0,1]].values # Compute pairwise distances dist_cg = pairwise_distances(X) # Compute radius measure self.eps = np.median(np.unique(dist_cg))/beta # Determine cluster labels and noise labels = self.fit_predict(X) noise = list(labels).count(-1) #Plotting self.showPlot(X, time=None, labels=labels, noise=noise, alpha=None, save_plot=save_plot)