From 46037106e6d09a4aeb7d5c1309934172e728dd4c Mon Sep 17 00:00:00 2001 From: Mylloon Date: Sun, 7 Aug 2022 21:50:59 +0200 Subject: [PATCH] convert stream entirely to the API v2 --- main.py | 145 ++++++++++++++++++++++++++------------------------------ 1 file changed, 66 insertions(+), 79 deletions(-) diff --git a/main.py b/main.py index 3bda179..23fbce2 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,6 @@ from re import sub, findall from random import choice from datetime import datetime from pytz import timezone -from queue import Queue from json import loads def load(variables) -> dict: @@ -74,10 +73,8 @@ class Listener(StreamingClient): self, bearer_token, client: Client, - q = Queue() ): - super(Listener, self).__init__(bearer_token) - self.q = q + super(Listener, self).__init__(bearer_token, wait_on_rate_limit=True) self.client = client def on_connect(self): @@ -85,80 +82,73 @@ class Listener(StreamingClient): def on_exception(): notice = notice["disconnect"] - print(f"Erreur (code {notice['code']}).", end = " ") + print(f"Erreur (code {notice['code']}).", end=" ") if len(notice["reason"]) > 0: print(f"Raison : {notice['reason']}") def on_tweet(self, tweet: Tweet): - print(tweet) - exit(0) - json = status._json # Verify the age of the tweet - if seniority(json["created_at"]): - # Verify if the tweet isn't a retweet - if not hasattr(status, "retweeted_status"): - # Fetch the tweet - if "extended_tweet" in json: - tweet = cleanTweet(status.extended_tweet["full_text"]) - else: - tweet = cleanTweet(status.text) - # Fetch the last word of the tweet - lastWord = tweet.split()[-1:][0] - if keys["VERBOSE"]: - infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags" - print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ") - # Check if the last word found is a supported word - if lastWord in universalBase: - answer = None + if seniority(tweet.created_at): + # Fetch the last word of the tweet + tweet.username: str = self.client.get_user(id=tweet.author_id, user_auth=True).data.username + lastWord = cleanTweet(tweet.text).split()[-1:][0] + if keys["VERBOSE"]: + infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags" + print(f"Tweet trouvé de {tweet.username} ({infoLastWord})...", end=" ") + # Check if the last word found is a supported word + if lastWord in universalBase: + answer = None - # Check repetition - repetition = findall(r"di(\S+)", lastWord) - if(len(repetition) > 0): - # We need to repeat something... - answer = repeater(repetition[0]) + # Check repetition + repetition = findall(r"di(\S+)", lastWord) + if(len(repetition) > 0): + # We need to repeat something... + answer = repeater(repetition[0]) - # Fetch an other adequate (better) response - for mot in base.items(): - if lastWord in mot[1]: - # Handle specific case - if mot[0] == "bon": - # Between 7am and 5pm - if datetime.now().hour in range(7, 17): - answer = answers[mot[0]][0] # jour - else: - answer = answers[mot[0]][1] # soir + # Fetch an other adequate (better) response + for mot in base.items(): + if lastWord in mot[1]: + # Handle specific case + if mot[0] == "bon": + # Between 7am and 5pm + if datetime.now().hour in range(7, 17): + answer = answers[mot[0]][0] # jour else: - # Normal answer - answer = answers[mot[0]] - if answer == None: - if keys["VERBOSE"]: - print(f"{errorMessage} Aucune réponse trouvée.") - # If an answer has been found - else: - if keys["VERBOSE"]: - print(f"Envoie d'un {answer[0]}...", end = " ") - try: - # Send the tweet with the answer - # TODO: Update Twitter API V2 - self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True) - print(f"{json['user']['screen_name']} s'est fait {answer[0]} !") - except Exception as error: - error = loads(error.response.text)["errors"][0] - # https://developer.twitter.com/en/support/twitter-api/error-troubleshooting - show_error = True - if error["code"] == 385: - error["message"] = f"Tweet supprimé ou auteur ({json['user']['screen_name']}) en privé/bloqué." - show_error = False + answer = answers[mot[0]][1] # soir + else: + # Normal answer + answer = answers[mot[0]] - # Show error only if relevant, always in verbose - if show_error or keys["VERBOSE"]: - print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}") + # If no answer has been found + if answer == None: + if keys["VERBOSE"]: + print(f"{errorMessage} Aucune réponse trouvée.") + + # If an answer has been found else: if keys["VERBOSE"]: - print("Annulation car le dernier mot n'est pas intéressant.") + print(f"Envoie d'un {answer[0]}...", end=" ") + try: + # Send the tweet with the answer + self.client.create_tweet(in_reply_to_tweet_id=tweet.id, text=choice(answer)) + print(f"{tweet.username} s'est fait {answer[0]} !") + except Exception as error: + error = loads(error.response.text)["errors"][0] + # https://developer.twitter.com/en/support/twitter-api/error-troubleshooting + show_error = True + if error["code"] == 385: + error["message"] = f"Tweet supprimé ou auteur ({tweet.username}) en privé/bloqué." + show_error = False + + # Show error only if relevant, always in verbose + if show_error or keys["VERBOSE"]: + print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}") + else: + if keys["VERBOSE"]: + print("Annulation car le dernier mot n'est pas intéressant.") def on_request_error(self, status_code): - print(f"{errorMessage[:-2]} ({status_code}) !", end = " ") + print(f"{errorMessage[:-2]} ({status_code}) !", end=" ") if status_code == 420: if keys["VERBOSE"]: print("Déconnecter du flux.") @@ -187,20 +177,12 @@ def getFriends(client: Client, users: list[str]) -> list: friends_list.extend(client.get_users_following(id=user_id, user_auth=True)) return friends_list[0] -def getIDs(client: Client, users: list[str]) -> list: - """Get all the ID of users""" - users_list = [] - # Get IDs of the users - for user in users: - users_list.append(client.get_user(username=user, user_auth=True).data) - return users_list - -def seniority(date: str) -> bool: +def seniority(date: datetime) -> bool: """Return True only if the given string date is less than one day old""" # Convert string format to datetime format datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") # Twitter give us an UTC time - datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC")) + datetimeObject = datetimeObject.replace(tzinfo=timezone("UTC")) # time now in UTC minus the time we got to get the age of the date age = datetime.now(timezone("UTC")) - datetimeObject # False if older than a day, else True @@ -275,6 +257,8 @@ def create_rules(tracked_users: list[str]) -> list[str]: """Create rules for tracking users, by respecting the twitter API policies""" rules = [] buffer = "" + + # Track users for user in tracked_users: # Check if the rule don't exceeds the maximum length of a rule (512) # 5 is len of "from:" @@ -286,6 +270,12 @@ def create_rules(tracked_users: list[str]) -> list[str]: if len(buffer) > 0: rules.append(buffer[:-4]) + # Ignore retweets + rules.append("-is:retweet") + + if len(rules) > 25: + raise BufferError("Too much rules.") + return rules def start(): @@ -305,10 +295,7 @@ def start(): ] + keys["FORCELIST"] if i not in keys["WHITELIST"] ] - stream = Listener( - bearer_token=keys["BEARER_TOKEN"], - client=client, - ) + stream = Listener(keys["BEARER_TOKEN"], client) for rule in create_rules(tracked_users): stream.add_rules(StreamRule(rule))