From 37e4e5e874cb7d26d7e12e54a6b033d82ae21158 Mon Sep 17 00:00:00 2001 From: Mylloon Date: Sat, 6 Aug 2022 01:35:32 +0200 Subject: [PATCH] * fix missings words for the listener-filter * add types * update comments --- main.py | 159 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 102 insertions(+), 57 deletions(-) diff --git a/main.py b/main.py index 42d3d78..a50d9bf 100644 --- a/main.py +++ b/main.py @@ -7,9 +7,10 @@ from datetime import datetime from pytz import timezone from queue import Queue from json import loads +from itertools import product def load(variables) -> dict: - """Load environment variables.""" + """Load environment variables""" keys = {} load_dotenv() # load .env file for var in variables: @@ -40,33 +41,49 @@ def load(variables) -> dict: return keys def cleanTweet(tweet: str) -> str: - """Remove all unwanted elements from the tweet.""" - tweet = tweet.lower() # convert to lower case - tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet) # remove URLs - hashtagMatch = findall(r"#\S+", tweet) # check all hashtags - if len(hashtagMatch) < 3: # if less than 3 - tweet = sub(r"#\S+", " ", tweet) # remove them + """Remove all unwanted elements from the tweet""" + # Convert to lower case + tweet = tweet.lower() + # Remove URLs + tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet) + # Check all hashtags + hashtagMatch = findall(r"#\S+", tweet) + # If less than 3 + if len(hashtagMatch) < 3: + # Remove them + tweet = sub(r"#\S+", " ", tweet) else: - return "" # too much hashtags, ignoring tweet - tweet = sub(r"@\S+", " ", tweet) # remove usernames - tweet = sub(r" *?[^\w\s]+", " ", tweet) # remove everything who is not a letter or a number or a space - tweet = sub(r"\S+(?=si|ci)", " ", tweet) # remove element of the word only if the last syllable can be matched (so more words will be answered without adding them manually) - tweet = sub(r"(?<=ui)i+|(?<=na)a+(? so ignore it + return "" + # Remove usernames + tweet = sub(r"@\S+", " ", tweet) + # Remove everything who isn't a letter/number/space + tweet = sub(r" *?[^\w\s]+", " ", tweet) + # Remove element of the word only if the last syllable can be matched + # (so more words will be answered without adding them manually) + tweet = sub(r"\S+(?=si|ci)", " ", tweet) + + # Remove key smashing in certains words + # uiii naaaan quoiiii noooon heiiin siiii + tweet = sub(r"(?<=ui)i+|(?<=na)a+(? 0 else "tweet ignoré car trop de hashtags" - print(f"Tweet trouvé de {status._json['user']['screen_name']} ({infoLastWord})...", end = " ") - if lastWord in universalBase: # check if the last word found is a supported word + print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ") + # Check if the last word found is a supported word + if lastWord in universalBase: answer = None + # Fetch an adequate response for mot in base.items(): if lastWord in mot[1]: + # Handle specific case if mot[0] == "bon": - if datetime.now().hour in range(7, 17): # between 7am and 5pm + # Between 7am and 5pm + if datetime.now().hour in range(7, 17): answer = answers[mot[0]][0] # jour else: answer = answers[mot[0]][1] # soir @@ -100,14 +127,17 @@ class Listener(StreamListener): if answer == None: if keys["VERBOSE"]: print(f"{errorMessage} Aucune réponse trouvée.") + # If an answer has been found else: if keys["VERBOSE"]: print(f"Envoie d'un {answer[0]}...", end = " ") - try: # send answer - self.api.update_status(status = choice(answer), in_reply_to_status_id = status._json["id"], auto_populate_reply_metadata = True) - print(f"{status._json['user']['screen_name']} s'est fait {answer[0]} !") + try: + # Send the tweet with the answer + self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True) + print(f"{json['user']['screen_name']} s'est fait {answer[0]} !") except Exception as error: error = loads(error.response.text)["errors"][0] + # https://developer.twitter.com/en/support/twitter-api/error-troubleshooting if error["code"] == 385: error["message"] = "Tweet supprimé ou auteur en privé/bloqué." print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}") @@ -116,6 +146,7 @@ class Listener(StreamListener): print("Annulation car le dernier mot n'est pas intéressant.") def do_stuff(self): + """Loop for the Listener""" while True: self.q.get() self.q.task_done() @@ -132,53 +163,61 @@ class Listener(StreamListener): print("\n") return False -def getFriendsID(api, users: list) -> list: - """Get all friends of choosen users.""" +def getFriendsID(api: API, users: list) -> list: + """Get all friends of choosen users""" liste = [] + # Get IDs of the user's friends for user in users: liste.extend(api.friends_ids(user)) return list(set(liste)) -def getIDs(api, users: list) -> list: +def getIDs(api: API, users: list) -> list: """Get all the ID of users""" liste = [] + # Get IDs of the users for user in users: liste.append(api.get_user(user)._json["id"]) return list(set(liste)) def seniority(date: str) -> bool: - """Return True only if the given string date is less than one day old.""" - datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") # convert String format to datetime format - datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC")) # Twitter give us an UTC time - age = datetime.now(timezone("UTC")) - datetimeObject # time now in UTC minus the time we got to get the age of the date - return False if age.days >= 1 else True # False if older than a day + """Return True only if the given string date is less than one day old""" + # Convert string format to datetime format + datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") + # Twitter give us an UTC time + datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC")) + # time now in UTC minus the time we got to get the age of the date + age = datetime.now(timezone("UTC")) - datetimeObject + # False if older than a day, else True + return False if age.days >= 1 else True -def permute(array: list) -> list: - """Retrieves all possible combinations for the given list and returns the result as a list.""" +def generateWords(array: list) -> list: + """ + Retrieves all possible combinations for the given list and returns the result as a list + + This is used for the filter in the stream (before calling the Listener) + """ quoiListe = [] - for text in array: # all element of the list - if text.lower() not in quoiListe: - quoiListe.append(text.lower()) # word fully in lowercase - if text.upper() not in quoiListe: - quoiListe.append(text.upper()) # word fully in uppercase - if text.capitalize() not in quoiListe: - quoiListe.append(text.capitalize()) # word with the first letter in uppercase + for text in array: + # Add all combinations + # Example for 'oui': ['OUI', 'OUi', 'OuI', 'Oui', 'oUI', 'oUi', 'ouI', 'oui'] + quoiListe.extend(list(map(''.join, product(*zip(text.upper(), text.lower()))))) + return quoiListe -def createBaseTrigger(lists) -> list: - """Merges all given lists into one.""" +def createBaseTrigger(lists: list) -> list: + """Merges all given lists into one""" listing = [] for liste in lists: listing.extend(liste) return list(set(listing)) -def createBaseAnswers(word) -> list: - """Generates default answers for a given word.""" - return [word, f"({word})", word.upper(), f"{word} lol"] +def createBaseAnswers(word: str) -> list: + """Generates default answers for a given word""" + return [word, f"({word})", word.upper(), f"{word} lol", f"{word} 👀"] def start(): - """Start the bot.""" + """Start the bot""" auth = OAuthHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"]) auth.set_access_token(keys["TOKEN"], keys["TOKEN_SECRET"]) @@ -212,9 +251,11 @@ if __name__ == "__main__": -- PSEUDO is the PSEUDO of the account you want to listen to snipe. """ - errorMessage = "Une erreur survient !" # error message + # Error message + errorMessage = "Une erreur survient !" - base = { # words to detect in lowercase + # Words who trigger the bot (keys in lowercase) + base = { "quoi": ["quoi", "koi", "quoient"], "oui": ["oui", "ui", "wi"], "non": ["non", "nn"], @@ -252,7 +293,8 @@ if __name__ == "__main__": "vois": ["vois", "voit", "voie", "voi"], } - answers = { # creation of answers + # Answers for all the triggers (keys in lowercase) + answers = { "quoi": createBaseAnswers("feur") + createBaseAnswers("feuse") + [ @@ -374,11 +416,14 @@ if __name__ == "__main__": "vois": createBaseAnswers("ture"), } - universalBase = createBaseTrigger(list(base.values())) # creation of a list of all the words + # List of all the trigger words + universalBase = createBaseTrigger(list(base.values())) - triggerWords = permute(universalBase) # creation of a list of all the words (upper and lower case) + # List of all the triggers words's variations + triggerWords = generateWords(universalBase) - # Loading environment variables and launching the bot + # Loading environment variables keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"]) - print("") # just a newline + + # Start the bot start()