retrieval and pos tagging of tweets¶
This project can be found on GitHub as well: https://github.com/Alioun/twitter/
The following example shows a simple and easy way of how you can use core to realize a wide variety of tasks, in this case retrieving tweets from a particular place and then running a Part of Speech (POS) tagger over the retrieved tweets to build a base for further analyses on them.
- Third-party packages used in this example:
- tweepy: Tweepy is an easy-to-use, yet powerful way to use the various twitter APIs. In our case tweepy is only used to access the “Geo Search” API and the standard “Search” API. https://github.com/tweepy/tweepy
- nltk: NLTK is short for “Natural Language Toolkit” and is a library combining many different libraries used for NLP into one convenient package. In this example, we only use the TweetTokenizer provided by NLTK to try to clean up the tweets while also tokenizing them for the POS tagger. https://github.com/nltk/nltk , https://www.nltk.org/
- pattern: Another library for NLP and much more. We will only use the POS tagger from its German module. https://github.com/clips/pattern
Imports:
import tweepy
from core4.queue.job import CoreJob
from nltk.tokenize.casual import TweetTokenizer
from pattern.text.de import parsetree
The first part is to retrieve the tweets.
To be able to retrieve any tweets, you need to have Twitter Developer account and need to create an app within that account. If you don’t have these already, you can create them over at https://developer.twitter.com/ and https://developer.twitter.com/en/apps respectively. After you are done, go into the app’s “Details” and then to the “Keys and tokens” tab. Here, you can find your Consumer API keys. Note that you have to generate a access token and its secret if you have never generated it before and regenerate the two if you don’t have previously generated values saved anywhere.
Copy all four - consumer API key, consumer API secret key, access token and access token secret - into your local.yaml. You can find example twitter.yaml and local.yaml files below.
With the authorization requirements cleared, we will query the Geo Search API to get the place ID for a specific region (Germany in this case). Using that place ID, we can query Twitter for all tweets coming from that region.
The job is set up to stop execution and terminate when it encounters a rate limit error. Twitter’s rate limit timeframe is 15 minutes, so the job is scheduled to get as much data as it can until being rate-limited every 20 minutes.
All retrieved tweets are written into a MongoDB collection for easy access later on:
class TweetLoader(CoreJob):
"""
Loads all tweets coming from a given region or place and saves them into
a mongodb collection.
The tweets are retrieved via the Twitter Search API in no particular
order, this means the latest tweets are not guaranteed.
"""
author = 'adi'
schedule = '0,20,40 * * * *' # runs every 20 minutes
def execute(self, *args, **kwargs):
consumer_key = self.config.twitter.api.consumer_key
consumer_secret = self.config.twitter.api.consumer_secret
access_token = self.config.twitter.api.access_token
access_token_secret = self.config.twitter.api.access_token_secret
self.set_source('Twitter Search API')
tweet_coll = self.config.twitter.api.tweet_coll
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(
auth)
# this code was run once to get the place id, afterwards I saved it to
# prevent getting rate limited on a part that isn't going to change
# place = api.geo_search(query='Germany', granularity="country")
# place_id = place[0].id
place_id = 'fdcd221ac44fa326'
# get as many tweets as you can before being rate limited and then end
# the job
try:
while True:
tweets = api.search(q="place:%s" % place_id,
lang='de',
count='100',
tweet_mode='extended')
for tweet in tweets:
tweet_dict = tweet._json
tweet_dict['_id'] = tweet_dict['id_str']
tweet_coll.find_one_and_update({'_id': tweet_dict['_id']},
{'$setOnInsert': tweet_dict},
upsert=True)
except tweepy.RateLimitError:
pass # let the job end normally
For the POS tagging, the tweets found in the previous tweet collection are tokenized, POS tagged and then written into a seperate MongoDB collection to make them available for later analyses:
class TweetPOSTagger(CoreJob):
"""
Tokenizes and POS tags all available tweets in the tweets
collection and writes them to the mongo collection grouped by word
lemma, word type and chunk type.
The tokenizer strips the twitter handles.
The POS tagger used is for german.
"""
author = 'adi'
def execute(self, *args, **kwargs):
tweet_coll = self.config.twitter.api.tweet_coll
pos_processed_strip_coll = \
self.config.twitter.pos.pos_processed_strip_coll
self.set_source('POS Tagger')
cur = tweet_coll.find()
for i, doc in enumerate(cur):
tokenizer = TweetTokenizer(strip_handles=True)
tmp_string = tokenizer.tokenize(doc['full_text'])
s = [' '.join(tmp_string)]
tokenized_string = [isinstance(s, str) and s.split(" ") or s for s
in s]
if tokenized_string:
try:
sentence_list = parsetree(tokenized_string,
tokenize=False,
lemmata=True)
except TypeError:
continue
for sentence in sentence_list:
for chunk in sentence.chunks:
for word in chunk.words:
dic = {}
dic['_id'] = '{}_{}_{}'.format(word.lemma,
word.type,
chunk.type)
dic['word'] = word.string
dic['word_category'] = word.type
dic['word_lemma'] = word.lemma
dic['chunk_category'] = chunk.type
dic['chunk_lemmata'] = chunk.lemmata
pos_processed_strip_coll.update_one(
filter={'_id': dic['_id']},
update={'$setOnInsert': dic},
upsert=True)
pos_processed_strip_coll.update_one(
filter={'_id': dic['_id']},
update={'$inc': {'count': 1}},
upsert=True)
# TODO: re-enable once the bug is fixed.
# requests.append(UpdateOne(filter={'_id':
# pos_dic['_id']}, update={'$set': pos_dic},
# upsert=True))
# requests.append(UpdateOne(filter={'_id':
# pos_dic['_id']}, update={'$inc': {'count': 1}},
# upsert=True))
self.progress(i / cur.count())
# TODO: re-enable once the bug is fixed
# pos_unprocessed_coll.bulk_write(requests)
Example of the project yaml (twitter.yaml):
DEFAULT:
mongo_database: twitter
pos:
pos_processed_strip_coll: !connect mongodb://pos_processed_strip
api:
tweet_coll: !connect mongodb://tweets
consumer_key: #check local.yaml
consumer_secret: #check local.yaml
access_token: #check local.yaml
access_token_secret: #check local.yaml
Example of the local.yaml file:
DEFAULT:
mongo_url: mongodb://core:654321@localhost:27017
mongo_database: core4dev
logging:
mongodb: INFO
stderr: DEBUG
stdout: ~
worker:
min_free_ram: 32
api:
setting:
debug: True
cookie_secret: hello world
admin_password: hans
twitter:
api:
consumer_key: #insert your own
consumer_secret: #insert your own
access_token: #insert your own
access_token_secret: #insert your own