My goal is to collect about 10000 tweets and save them to a csv file. Since Twitter’s rate limit is 450 requests per 15 minutes, ideally I’d like to automate this process. The guides I’ve seen only use the tweepy module, but since I don’t know much about it, I’ve used the python example code given on Twitter:
import requests
import pandas as pd
import os
import json
# To set your enviornment variables in your terminal run the following line:
os.environ['BEARER_TOKEN']=''
def auth():
return os.environ.get("BEARER_TOKEN")
def create_url():
query = "has:images lang:en -is:retweet"
tweet_fields = "tweet.fields=attachments,created_at,author_id"
expansions = "expansions=attachments.media_keys"
media_fields = "media.fields=media_key,preview_image_url,type,url"
max_results = "max_results=100"
url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
query, tweet_fields, expansions, media_fields, max_results
)
return url
def create_headers(bearer_token):
headers = {"Authorization": "Bearer {}".format(bearer_token)}
return headers
def connect_to_endpoint(url, headers):
response = requests.request("GET", url, headers=headers)
print(response.status_code)
if response.status_code != 200:
raise Exception(response.status_code, response.text)
return response.json()
def save_json(file_name, file_content):
with open(file_name, 'w', encoding='utf-8') as write_file:
json.dump(file_content, write_file, sort_keys=True, ensure_ascii=False, indent=4)
def main():
bearer_token = auth()
url = create_url()
headers = create_headers(bearer_token)
json_response = connect_to_endpoint(url, headers)
#Save the data as a json file
#save_json('collected_tweets.json', json_response)
#save tweets as csv
#df = pd.json_normalize(data=json_response)
df1 = pd.DataFrame(json_response['data'])
df1.to_csv('tweets_data.csv', mode="a")
df2 = pd.DataFrame(json_response['includes'])
df2.to_csv('tweets_includes_media.csv', mode="a")
print(json.dumps(json_response['meta'], sort_keys=True, indent=4))
if __name__ == "__main__":
main()
How should I modify this code to loop within Twitter’s v2 rate limit?
Tweepy has not been updated to use the new version of the Twitter API (V2), so what I will find in the Twitter documentation most of the time may not be consistent with what Tweepy provides. Tweepy will still work well with V1, however, some tweet matching features may be different, I just need to be careful
Given the goals you mentioned, it’s not clear if I want to use the nearest search endpoint. For example, it might be easier to start a 1% stream with the sample stream. Here is Twitter’s example code for that endpoint. The main benefit of this is that you can run it in the “background” (see note below) and use a condition that kills the process once you have collected 10k tweets. This way I don’t have to worry about hitting the tweet limit – by default Twitter limits me to ~1% of my queries (in my case, “has:images lang:en -is:retweet”), and just real-time Collect these tweets. If I am trying to get a full record of non-retweeted English tweets between two time periods, I will need to add those time points to the query, then manage the limit as described above. See start_time and end_time in the API reference docs
Note: To run a script in the background, write your program, then execute it with nohup nameofstreamingcode.py > logfile.log 2>&1 & from the terminal. Any normal terminal output (i.e. print lines and/or errors) would be written to a new file called logfile.log, and the & at the very end of the command makes the process run in the background (so you can close your terminal and come back to it later).
To use the nearest search endpoint, you need to add a lot to the connect_to_endpoint(url, headers) function.
def connect_to_endpoint(url, headers):
response = requests.request("GET", url, headers=headers)
# Twitter returns (in the header of the request object) how many
# requests you have left. Lets use this to our advantage
remaining_requests = int(response.headers["x-rate-limit-remaining"])
# If that number is one, we get the reset-time
# and wait until then, plus 15 seconds (your welcome Twitter).
# The regular 429 exception is caught below as well,
# however, we want to program defensively, where possible.
if remaining_requests == 1:
buffer_wait_time = 15
resume_time = datetime.fromtimestamp( int(response.headers["x-rate-limit-reset"]) + buffer_wait_time )
print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
pause_until(resume_time) ## Link to this code in above answer
# We still may get some weird errors from Twitter.
# We only care about the time dependent errors (i.e. errors
# that Twitter wants us to wait for).
# Most of these errors can be solved simply by waiting
# a little while and pinging Twitter again - so that's what we do.
if response.status_code != 200:
# Too many requests error
if response.status_code == 429:
buffer_wait_time = 15
resume_time = datetime.fromtimestamp( int(response.headers["x-rate-limit-reset"]) + buffer_wait_time )
print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
pause_until(resume_time) ## Link to this code in above answer
# Twitter internal server error
elif response.status_code == 500:
# Twitter needs a break, so we wait 30 seconds
resume_time = datetime.now().timestamp() + 30
print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
pause_until(resume_time) ## Link to this code in above answer
# Twitter service unavailable error
elif response.status_code == 503:
# Twitter needs a break, so we wait 30 seconds
resume_time = datetime.now().timestamp() + 30
print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
pause_until(resume_time) ## Link to this code in above answer
# If we get this far, we've done something wrong and should exit
raise Exception(
"Request returned an error: {} {}".format(
response.status_code, response.text
)
)
# Each time we get a 200 response, lets exit the function and return the response.json
if response.ok:
return response.json()
Since the full query results will be much larger than the 100 tweets you request in each query, you’ll need to track your location in the larger query. This is achieved by ^{}
To get the next_token, it’s actually quite easy. Just get it from the meta field in the response. For clarity, you can use the above function like this
# Get response
response = connect_to_endpoint(url, headers)
# Get next_token
next_token = response["meta"]["next_token"]
This token then needs to be passed in the query details, which are included in the url created using the create_url() function. This means that you also need to update the create_url() function to something like
def create_url(pagination_token=None):
query = "has:images lang:en -is:retweet"
tweet_fields = "tweet.fields=attachments,created_at,author_id"
expansions = "expansions=attachments.media_keys"
media_fields = "media.fields=media_key,preview_image_url,type,url"
max_results = "max_results=100"
if pagination_token == None:
url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
query, tweet_fields, expansions, media_fields, max_results
)
else:
url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}&{}".format(
query, tweet_fields, expansions, media_fields, max_results, pagination_token
)
return url
After changing the above function, the code should flow as follows
make a request
Get next_token from response[“meta”][“next_token”]
Update query parameters to include next_token with create_url()
Rinse and repeat until:
You can get 10k tweets
query stop
One last note: I wouldn’t try to write your file using pandas dataframes. I would create an empty list, append the results of each new query to that list, and then write the final list of dictionary objects to a json file (see this question for details). I learned from this that raw tweets and pandas dataframes don’t play nicely. Much better to get used to how json objects and dictionaries work.
Try using the scheduler:
import sched
import time
scheduler = sched.scheduler(time.time, time.sleep)
scheduler.enter(delay=16 * 60, priority=1, action=connect_to_endpoint)
delay
is the amount of time between two events
action
is a method that executes every 16 minutes (in this example)
Consider precise timing and precise method repetition.