convert stream entirely to the API v2

This commit is contained in:
Mylloon 2022-08-07 21:50:59 +02:00
parent ed6e72270c
commit 46037106e6
Signed by: Anri
GPG key ID: A82D63DFF8D1317F

145
main.py
View file

@ -5,7 +5,6 @@ from re import sub, findall
from random import choice from random import choice
from datetime import datetime from datetime import datetime
from pytz import timezone from pytz import timezone
from queue import Queue
from json import loads from json import loads
def load(variables) -> dict: def load(variables) -> dict:
@ -74,10 +73,8 @@ class Listener(StreamingClient):
self, self,
bearer_token, bearer_token,
client: Client, client: Client,
q = Queue()
): ):
super(Listener, self).__init__(bearer_token) super(Listener, self).__init__(bearer_token, wait_on_rate_limit=True)
self.q = q
self.client = client self.client = client
def on_connect(self): def on_connect(self):
@ -85,80 +82,73 @@ class Listener(StreamingClient):
def on_exception(): def on_exception():
notice = notice["disconnect"] notice = notice["disconnect"]
print(f"Erreur (code {notice['code']}).", end = " ") print(f"Erreur (code {notice['code']}).", end=" ")
if len(notice["reason"]) > 0: if len(notice["reason"]) > 0:
print(f"Raison : {notice['reason']}") print(f"Raison : {notice['reason']}")
def on_tweet(self, tweet: Tweet): def on_tweet(self, tweet: Tweet):
print(tweet)
exit(0)
json = status._json
# Verify the age of the tweet # Verify the age of the tweet
if seniority(json["created_at"]): if seniority(tweet.created_at):
# Verify if the tweet isn't a retweet # Fetch the last word of the tweet
if not hasattr(status, "retweeted_status"): tweet.username: str = self.client.get_user(id=tweet.author_id, user_auth=True).data.username
# Fetch the tweet lastWord = cleanTweet(tweet.text).split()[-1:][0]
if "extended_tweet" in json: if keys["VERBOSE"]:
tweet = cleanTweet(status.extended_tweet["full_text"]) infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
else: print(f"Tweet trouvé de {tweet.username} ({infoLastWord})...", end=" ")
tweet = cleanTweet(status.text) # Check if the last word found is a supported word
# Fetch the last word of the tweet if lastWord in universalBase:
lastWord = tweet.split()[-1:][0] answer = None
if keys["VERBOSE"]:
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ")
# Check if the last word found is a supported word
if lastWord in universalBase:
answer = None
# Check repetition # Check repetition
repetition = findall(r"di(\S+)", lastWord) repetition = findall(r"di(\S+)", lastWord)
if(len(repetition) > 0): if(len(repetition) > 0):
# We need to repeat something... # We need to repeat something...
answer = repeater(repetition[0]) answer = repeater(repetition[0])
# Fetch an other adequate (better) response # Fetch an other adequate (better) response
for mot in base.items(): for mot in base.items():
if lastWord in mot[1]: if lastWord in mot[1]:
# Handle specific case # Handle specific case
if mot[0] == "bon": if mot[0] == "bon":
# Between 7am and 5pm # Between 7am and 5pm
if datetime.now().hour in range(7, 17): if datetime.now().hour in range(7, 17):
answer = answers[mot[0]][0] # jour answer = answers[mot[0]][0] # jour
else:
answer = answers[mot[0]][1] # soir
else: else:
# Normal answer answer = answers[mot[0]][1] # soir
answer = answers[mot[0]] else:
if answer == None: # Normal answer
if keys["VERBOSE"]: answer = answers[mot[0]]
print(f"{errorMessage} Aucune réponse trouvée.")
# If an answer has been found
else:
if keys["VERBOSE"]:
print(f"Envoie d'un {answer[0]}...", end = " ")
try:
# Send the tweet with the answer
# TODO: Update Twitter API V2
self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True)
print(f"{json['user']['screen_name']} s'est fait {answer[0]} !")
except Exception as error:
error = loads(error.response.text)["errors"][0]
# https://developer.twitter.com/en/support/twitter-api/error-troubleshooting
show_error = True
if error["code"] == 385:
error["message"] = f"Tweet supprimé ou auteur ({json['user']['screen_name']}) en privé/bloqué."
show_error = False
# Show error only if relevant, always in verbose # If no answer has been found
if show_error or keys["VERBOSE"]: if answer == None:
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}") if keys["VERBOSE"]:
print(f"{errorMessage} Aucune réponse trouvée.")
# If an answer has been found
else: else:
if keys["VERBOSE"]: if keys["VERBOSE"]:
print("Annulation car le dernier mot n'est pas intéressant.") print(f"Envoie d'un {answer[0]}...", end=" ")
try:
# Send the tweet with the answer
self.client.create_tweet(in_reply_to_tweet_id=tweet.id, text=choice(answer))
print(f"{tweet.username} s'est fait {answer[0]} !")
except Exception as error:
error = loads(error.response.text)["errors"][0]
# https://developer.twitter.com/en/support/twitter-api/error-troubleshooting
show_error = True
if error["code"] == 385:
error["message"] = f"Tweet supprimé ou auteur ({tweet.username}) en privé/bloqué."
show_error = False
# Show error only if relevant, always in verbose
if show_error or keys["VERBOSE"]:
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
else:
if keys["VERBOSE"]:
print("Annulation car le dernier mot n'est pas intéressant.")
def on_request_error(self, status_code): def on_request_error(self, status_code):
print(f"{errorMessage[:-2]} ({status_code}) !", end = " ") print(f"{errorMessage[:-2]} ({status_code}) !", end=" ")
if status_code == 420: if status_code == 420:
if keys["VERBOSE"]: if keys["VERBOSE"]:
print("Déconnecter du flux.") print("Déconnecter du flux.")
@ -187,20 +177,12 @@ def getFriends(client: Client, users: list[str]) -> list:
friends_list.extend(client.get_users_following(id=user_id, user_auth=True)) friends_list.extend(client.get_users_following(id=user_id, user_auth=True))
return friends_list[0] return friends_list[0]
def getIDs(client: Client, users: list[str]) -> list: def seniority(date: datetime) -> bool:
"""Get all the ID of users"""
users_list = []
# Get IDs of the users
for user in users:
users_list.append(client.get_user(username=user, user_auth=True).data)
return users_list
def seniority(date: str) -> bool:
"""Return True only if the given string date is less than one day old""" """Return True only if the given string date is less than one day old"""
# Convert string format to datetime format # Convert string format to datetime format
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y")
# Twitter give us an UTC time # Twitter give us an UTC time
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC")) datetimeObject = datetimeObject.replace(tzinfo=timezone("UTC"))
# time now in UTC minus the time we got to get the age of the date # time now in UTC minus the time we got to get the age of the date
age = datetime.now(timezone("UTC")) - datetimeObject age = datetime.now(timezone("UTC")) - datetimeObject
# False if older than a day, else True # False if older than a day, else True
@ -275,6 +257,8 @@ def create_rules(tracked_users: list[str]) -> list[str]:
"""Create rules for tracking users, by respecting the twitter API policies""" """Create rules for tracking users, by respecting the twitter API policies"""
rules = [] rules = []
buffer = "" buffer = ""
# Track users
for user in tracked_users: for user in tracked_users:
# Check if the rule don't exceeds the maximum length of a rule (512) # Check if the rule don't exceeds the maximum length of a rule (512)
# 5 is len of "from:" # 5 is len of "from:"
@ -286,6 +270,12 @@ def create_rules(tracked_users: list[str]) -> list[str]:
if len(buffer) > 0: if len(buffer) > 0:
rules.append(buffer[:-4]) rules.append(buffer[:-4])
# Ignore retweets
rules.append("-is:retweet")
if len(rules) > 25:
raise BufferError("Too much rules.")
return rules return rules
def start(): def start():
@ -305,10 +295,7 @@ def start():
] + keys["FORCELIST"] if i not in keys["WHITELIST"] ] + keys["FORCELIST"] if i not in keys["WHITELIST"]
] ]
stream = Listener( stream = Listener(keys["BEARER_TOKEN"], client)
bearer_token=keys["BEARER_TOKEN"],
client=client,
)
for rule in create_rules(tracked_users): for rule in create_rules(tracked_users):
stream.add_rules(StreamRule(rule)) stream.add_rules(StreamRule(rule))