From f99ab71369dfec054192ac98c713d4074c1a19a6 Mon Sep 17 00:00:00 2001 From: Mylloon Date: Sun, 7 Aug 2022 20:16:07 +0200 Subject: [PATCH] track on user and not on keyword basis --- main.py | 235 +++++++++++++++++++++++++------------------------------- 1 file changed, 103 insertions(+), 132 deletions(-) diff --git a/main.py b/main.py index 0ab9836..9dedb75 100644 --- a/main.py +++ b/main.py @@ -74,25 +74,14 @@ class Listener(StreamingClient): self, bearer_token, client: Client, - users: list = [], - forcelist: list = [], q = Queue() ): super(Listener, self).__init__(bearer_token) self.q = q self.client = client - self.accounts = { - "users": users, - "forcelist": forcelist - } - self.victim_list = getFriends(client, users) + getIDs(client, forcelist) def on_connect(self): - if self.accounts['forcelist'] == []: - forcelist = "Aucun" - else: - forcelist = f"@{', @'.join(self.accounts['forcelist'])}" - print(f"Début du scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts['users'])} et ces comptes en plus : {forcelist} comme timeline...") + print(f"Début du scroll sur Twitter...") def on_disconnect_message(notice): notice = notice["disconnect"] @@ -100,75 +89,73 @@ class Listener(StreamingClient): if len(notice["reason"]) > 0: print(f"Raison : {notice['reason']}") - def on_status(self, status): - print(status) + def on_tweet(self, tweet): + print(tweet) exit(0) json = status._json - # Verify the author of the tweet - if json["user"]["id"] in self.victim_list and json["user"]["screen_name"] not in keys["WHITELIST"]: - # Verify the age of the tweet - if seniority(json["created_at"]): - # Verify if the tweet isn't a retweet - if not hasattr(status, "retweeted_status"): - # Fetch the tweet - if "extended_tweet" in json: - tweet = cleanTweet(status.extended_tweet["full_text"]) - else: - tweet = cleanTweet(status.text) - # Fetch the last word of the tweet - lastWord = tweet.split()[-1:][0] - if keys["VERBOSE"]: - infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags" - print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ") - # Check if the last word found is a supported word - if lastWord in universalBase: - answer = None + # Verify the age of the tweet + if seniority(json["created_at"]): + # Verify if the tweet isn't a retweet + if not hasattr(status, "retweeted_status"): + # Fetch the tweet + if "extended_tweet" in json: + tweet = cleanTweet(status.extended_tweet["full_text"]) + else: + tweet = cleanTweet(status.text) + # Fetch the last word of the tweet + lastWord = tweet.split()[-1:][0] + if keys["VERBOSE"]: + infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags" + print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ") + # Check if the last word found is a supported word + if lastWord in universalBase: + answer = None - # Check repetition - repetition = findall(r"di(\S+)", lastWord) - if(len(repetition) > 0): - # We need to repeat something... - answer = repeater(repetition[0]) + # Check repetition + repetition = findall(r"di(\S+)", lastWord) + if(len(repetition) > 0): + # We need to repeat something... + answer = repeater(repetition[0]) - # Fetch an other adequate (better) response - for mot in base.items(): - if lastWord in mot[1]: - # Handle specific case - if mot[0] == "bon": - # Between 7am and 5pm - if datetime.now().hour in range(7, 17): - answer = answers[mot[0]][0] # jour - else: - answer = answers[mot[0]][1] # soir + # Fetch an other adequate (better) response + for mot in base.items(): + if lastWord in mot[1]: + # Handle specific case + if mot[0] == "bon": + # Between 7am and 5pm + if datetime.now().hour in range(7, 17): + answer = answers[mot[0]][0] # jour else: - # Normal answer - answer = answers[mot[0]] - if answer == None: - if keys["VERBOSE"]: - print(f"{errorMessage} Aucune réponse trouvée.") - # If an answer has been found - else: - if keys["VERBOSE"]: - print(f"Envoie d'un {answer[0]}...", end = " ") - try: - # Send the tweet with the answer - # TODO: Update Twitter API V2 - self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True) - print(f"{json['user']['screen_name']} s'est fait {answer[0]} !") - except Exception as error: - error = loads(error.response.text)["errors"][0] - # https://developer.twitter.com/en/support/twitter-api/error-troubleshooting - show_error = True - if error["code"] == 385: - error["message"] = f"Tweet supprimé ou auteur ({json['user']['screen_name']}) en privé/bloqué." - show_error = False - - # Show error only if relevant, always in verbose - if show_error or keys["VERBOSE"]: - print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}") + answer = answers[mot[0]][1] # soir + else: + # Normal answer + answer = answers[mot[0]] + if answer == None: + if keys["VERBOSE"]: + print(f"{errorMessage} Aucune réponse trouvée.") + # If an answer has been found else: if keys["VERBOSE"]: - print("Annulation car le dernier mot n'est pas intéressant.") + print(f"Envoie d'un {answer[0]}...", end = " ") + try: + # Send the tweet with the answer + # TODO: Update Twitter API V2 + self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True) + print(f"{json['user']['screen_name']} s'est fait {answer[0]} !") + except Exception as error: + error = loads(error.response.text)["errors"][0] + # https://developer.twitter.com/en/support/twitter-api/error-troubleshooting + show_error = True + if error["code"] == 385: + error["message"] = f"Tweet supprimé ou auteur ({json['user']['screen_name']}) en privé/bloqué." + show_error = False + + # Show error only if relevant, always in verbose + if show_error or keys["VERBOSE"]: + print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}") + else: + if keys["VERBOSE"]: + print("Annulation car le dernier mot n'est pas intéressant.") def do_stuff(self): """Loop for the Listener""" @@ -178,10 +165,7 @@ class Listener(StreamingClient): def on_request_error(self, status_code): print(f"{errorMessage[:-2]} ({status_code}) !", end = " ") - if status_code == 413: - if keys["VERBOSE"]: - print("La liste des mots est trop longue (triggerWords).") - elif status_code == 420: + if status_code == 420: if keys["VERBOSE"]: print("Déconnecter du flux.") else: @@ -228,36 +212,6 @@ def seniority(date: str) -> bool: # False if older than a day, else True return False if age.days >= 1 else True -def generateWords(array: list[str]) -> list: - """ - Retrieves all possible combinations for the given list and returns the result as a list - - This is used for the filter in the stream (before calling the Listener::on_status) - """ - quoiListe = [] - - for text in array: - # Add all combinations - # Example for 'oui': ['OUI', 'OUi', 'OuI', 'Oui', 'oUI', 'oUi', 'ouI', 'oui'] - # - # -> Depends on: from itertools import product - # -> Problem : Create a too long list (+1000 words, max is 400) - # -> Cf. https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview - # - # quoiListe.extend(list(map(''.join, product(*zip(text.upper(), text.lower()))))) - - if text.lower() not in quoiListe: - # Word in lowercase - quoiListe.append(text.lower()) - if text.upper() not in quoiListe: - # Word in uppercase - quoiListe.append(text.upper()) - if text.capitalize() not in quoiListe: - # Word capitalized - quoiListe.append(text.capitalize()) - - return quoiListe - def createBaseTrigger(lists: list[list]) -> list: """Merges all given lists into one""" listing = [] @@ -293,38 +247,58 @@ def createClient(consumer_key, consumer_secret, access_token, access_token_secre if keys["VERBOSE"]: try: client.get_me().data.username - print(f"Authentification réussie en tant que @{client.get_me().data.username}.", end = " ") + print(f"Authentification réussie en tant que @{client.get_me().data.username}.\n") + + # Compte ignorés + if keys['WHITELIST'] == []: + whitelist = "Aucun" + else: + whitelist = f"@{', @'.join(keys['WHITELIST'])}" + print(f"Liste des comptes ignorés : {whitelist}.") + + # Compte forcés + if keys['FORCELIST'] == []: + forcelist = "Aucun" + else: + forcelist = f"@{', @'.join(keys['FORCELIST'])}" + print(f"Liste des comptes forcés : {forcelist}.") + + # Compte aux following suivis + if keys['PSEUDOS'] == []: + pseudos = "Aucun" + else: + pseudos = f"@{', @'.join(keys['PSEUDOS'])}" + print(f"Les comptes suivis par ces comptes sont traqués : {pseudos}.\n") + + print("Notez que si un compte est dans la whiteliste, il sera dans tout les cas ignoré.\n") except: print("Erreur d'authentification.") exit(1) - if keys['WHITELIST'] == []: - whitelist = "Aucun" - else: - whitelist = f"@{', @'.join(keys['WHITELIST'])}" - print(f"Liste des comptes ignorés : {whitelist}.") - return client def start(): """Start the bot""" - stream = Listener( - bearer_token=keys["BEARER_TOKEN"], - client=createClient( - keys["CONSUMER_KEY"], - keys["CONSUMER_SECRET"], - keys["TOKEN"], - keys["TOKEN_SECRET"], - ), - users=keys["PSEUDOS"], - forcelist=keys["FORCELIST"], + client = createClient( + keys["CONSUMER_KEY"], + keys["CONSUMER_SECRET"], + keys["TOKEN"], + keys["TOKEN_SECRET"], ) - # TODO: Break in v2 API : max 25 rule (https://docs.tweepy.org/en/stable/streamrule.html#tweepy.StreamRule) - # Need research, is it possible to run the stream without filter and still catching everything we want? + stream = Listener( + bearer_token=keys["BEARER_TOKEN"], + client=client, + ) - # Only track specifics words - stream.add_rules([StreamRule(word) for word in triggerWords]) + # Only track specifics users + # Including users in forcelist and removing users in whitelist + tracked_users = [ + i for i in [ + user.data["username"] for user in getFriends(client, keys["PSEUDOS"]) + ] + keys["FORCELIST"] if i not in keys["WHITELIST"] + ] + stream.add_rules(StreamRule(''.join([f'from:{word} OR ' for word in tracked_users])[:-3])) stream.filter(threaded=True) if __name__ == "__main__": @@ -512,9 +486,6 @@ if __name__ == "__main__": # List of all the trigger words universalBase = createBaseTrigger(list(base.values())) - # List of all the triggers words's variations - triggerWords = generateWords(universalBase) - # Loading environment variables keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "BEARER_TOKEN", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])