Source code for evoclusterstream.stream.TweepyStreamer

"""
From the paper:
Evolutionary Clustering and 
Community Detection Algorithms for Social Media Health Surveillance

Kyle Spurlock, Tanner Bogart, Heba Elgazzar
2020

Notes
-----
    Requires Twitter API keys and tokens
    Retrieve from here https://developer.twitter.com

    Current configuration collects user geographical location based on tweet
    keywords.
    
Attributes
----------
    data : :obj:stream
        Data stream of uscities csv file
    uscities : pd.DataFrame
        DataFrame of uscities csv file

Example
-------
    search_terms = ['tweet1', 'tweet2']
    
    
    consumer_key = "your_consumer_key"
    consumer_secret_key = "your_consumer_secret_key"
    
    access_token = "your_access_token"
    access_token_secret = "your_access_token_secret"
    
    Streamer = TweepyStreamer(consumer_key, consumer_secret_key, access_token,
                              access_token_secret)
    
    user info = Streamer.stream_tweets(search_terms, n_samples = 100)
"""

import pandas as pd
from sklearn.preprocessing import LabelEncoder
import tweepy
import numpy as np
import pkg_resources

# Need this for conversion of user location
try:
    data = pkg_resources.resource_stream(__name__, 'data/uscities.csv')
    uscities = pd.read_csv(data).iloc[:,[0,3,6,7]]
except FileNotFoundError as e:
    print(e)
    print("Could not include uscities.csv for TweepyStream module.")
# (2020). United States Cities Database.
# SimpleMaps. https://simplemaps.com/data/us-cities.

[docs]def location_verify(location, uscities): """Performs match of user location to US Cities database """ for index, city in uscities['city'].iteritems(): if location.find(city) != -1: return True return False
[docs]def location_change(location, uscities): """Expands verified location to include state and coordinate elements""" for index, city in uscities['city'].iteritems(): if location.find(city) != -1: loc_line = [str(uscities['city'][index]), str(uscities['state_name'][index]), str(uscities['lat'][index]), str(uscities['lng'][index])] return loc_line
[docs]def full_preprocess(df): """Preprocesses and saves dataset as csv""" # Encoding user values for anonymity df['User'] = LabelEncoder().fit_transform(df['User']) T = df['Timestamp'].to_list() # Processing of timestamps for i in range(len(T)): timestamp = str(T[i]) year, daytime = timestamp.split(' ') hour, minute, sec = daytime.split(':') time = int(minute)*60+int(sec) # Convert to seconds T[i] = time # Replace timestamp with encoded version T = LabelEncoder().fit_transform(T) # Encode the time values df['Timestamp'] = T # Replace timestamp column with encoded values df = (df.sort_values(by=['Timestamp'], ascending = True)).values df = pd.DataFrame(df, columns=['User','City','State','Lat','Long','Time']) return df
[docs]class TweepyStreamer(): """Class implementation for Twitter Streamer Attributes ---------- auth : :obj:Tweepy.OAuthHandler Tweepy OAuthHandler class, authorizes API with keys. api : :obj:Tweepy.API Provides access to RESTful Twitter API. wait_on_rate_lim : bool, optional Specifies whether to sleep upon reaching max stream requests. wait_on_rate_lim_notify : bool, optional Verbose for wait_on_rate_lim """ def __init__(self, consumer_key, consumer_secret, access_token, access_secret,*, wait_on_rate_lim=True, wait_on_rate_lim_notify=True): self.auth = tweepy.OAuthHandler(consumer_key, consumer_secret) self.auth.set_access_token(access_token, access_secret) self.api = tweepy.API(self.auth, wait_on_rate_limit=wait_on_rate_lim, wait_on_rate_limit_notify=wait_on_rate_lim_notify)
[docs] def stream_tweets(self, search_terms, n_samples, verbose = True): """Method for accessing Twitter stream API using Tweepy Cursor Parameters ---------- search_terms : list Array of keywords to search for in tweets n_samples : int Number of samples total to collect verbose : bool, optional Controls console outputs Returns ------- pd.DataFrame Contains information from found tweets """ if verbose: print('Starting stream.') # List to hold various user characteristics full_list = [] cols = ['User','City','State','Lat','Long','Timestamp'] # List of users to ensure the same user/tweet # is not picked up several times seen_users = [] # Counter to keep track of iterations max_itr = int(n_samples/len(search_terms))+1 for keyword in search_terms: itr = 0 for tweet in tweepy.Cursor(self.api.search, q='\"{}\" -filter:retweets'.format(keyword), count=10, lang='en').items(): if not itr == max_itr: if (tweet.user.location and (tweet.user.id_str not in seen_users) and 60 < tweet.user.followers_count < 10000): if (location_verify(tweet.user.location, uscities)): # If user's location is enabled and verified if verbose: print('Found.') loc = tweet.user.location # Match location with US Cities database loc = location_change(loc, uscities) # Append all tweet information to array tweet_info_list = [] tweet_info_list = np.append(tweet_info_list, tweet.user.id_str) tweet_info_list = np.append(tweet_info_list, loc) tweet_info_list = np.append(tweet_info_list, tweet.created_at) # Append tweet info list to the full collection list full_list.append(tweet_info_list) itr+=1 else: break full_list = pd.DataFrame(data=full_list, columns=cols) full_list = full_preprocess(full_list) if verbose: print("Finished.") return full_list