from dotenv import load_dotenv from os import environ from tweepy import OAuthHandler, API, StreamListener, Stream from re import sub from random import choice from datetime import datetime from pytz import timezone from queue import Queue from utils.remove_unicode import emojis def load(variables) -> dict: """Load environment variables.""" keys = {} load_dotenv() # load .env file for var in variables: try: if var == "VERBOSE": try: res = bool(environ[var]) except: res = False else: res = environ[var] if var == "PSEUDOS": res = list(set(res.split(',')) - {""}) # create a list for the channels and remove blank channels and doubles keys[var] = res except KeyError: print(f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)") exit(1) return keys class Listener(StreamListener): def __init__(self, api = None, users = None, q = Queue()): super(Listener, self).__init__() self.q = q self.api = api self.listOfFriendsID = getFriendsID(api, users) def on_status(self, status): """Answer to tweets.""" if status._json["user"]["id"] in self.listOfFriendsID: # verification of the author of the tweet if seniority(status._json["created_at"]): # verification of the age of the tweet if not hasattr(status, "retweeted_status"): # ignore Retweet try: # retrieve the entire tweet tweet = status.extended_tweet["full_text"].lower() except AttributeError: tweet = status.text.lower() # recovery of the last "usable" word of the tweet tweetText = sub(r"https?:\/\/\S+| *\?+| *!+| *,+|-|~|\.+|…|\^+|@\S+" + f"|{emojis()}", " ", tweet) # deletion with space tweetText = sub(r"(?<=ui)i+|(?<=na)a+(? list: """Get all friends of choosen users.""" liste = [] for user in users: liste.extend(api.friends_ids(user)) return list(set(liste)) def seniority(date: str) -> bool: """Return True only if the given string date is less than one day old.""" datetimeObject = datetime.strptime(date, '%a %b %d %H:%M:%S +0000 %Y') # Convert String format to datetime format datetimeObject = datetimeObject.replace(tzinfo = timezone('UTC')) # Twitter give us an UTC time age = datetime.now(timezone('UTC')) - datetimeObject # Time now in UTC minus the time we got to get the age of the date return False if age.days >= 1 else True # False if older than a day def permute(array: list) -> list: """Retrieves all possible combinations for the given list and returns the result as a list.""" quoiListe = [] for text in array: # all element of the list n = len(text) mx = 1 << n # Number of permutations is 2^n text = text.lower() # Converting string to lower case for i in range(mx): # Using all subsequences and permuting them combination = [k for k in text] for j in range(n): if (((i >> j) & 1) == 1): # If j-th bit is set, we convert it to upper case combination[j] = text[j].upper() temp = "" for i in combination: temp += i quoiListe.append(temp) return quoiListe def createBaseTrigger(lists) -> list: """Merges all given lists into one.""" listing = [] for liste in lists: listing.extend(liste) return list(set(listing)) def createBaseAnswers(word) -> list: """Generates default answers for a given word.""" return [word, f"({word})", word.upper(), f"{word} lol"] def main(accessToken: str, accessTokenSecret: str, consumerKey: str, consumerSecret: str, users: list): """Main method.""" auth = OAuthHandler(consumerKey, consumerSecret) auth.set_access_token(accessToken, accessTokenSecret) api = API(auth_handler = auth, wait_on_rate_limit = True) listener = Listener(api, users) stream = Stream(auth = api.auth, listener = listener) print(f"Scroll sur Twitter avec les abonnements de @{', @'.join(users)}...") stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, is_async = True) if __name__ == '__main__': """ TOKEN is the Access Token available in the Authentication Tokens section under Access Token and Secret sub-heading. TOKEN_SECRET is the Access Token Secret available in the Authentication Tokens section under Access Token and Secret sub-heading. CONSUMER_KEY is the API Key available in the Consumer Keys section. CONSUMER_SECRET is the API Secret Key available in the Consumer Keys section. -- PSEUDO is the PSEUDO of the account you want to listen to snipe. """ errorMessage = "Une erreur survient !" # error message # words to detect in lowercase base = { "quoi": ["quoi", "koi", "quoient"], "oui": ["oui", "ui"], "non": ["non", "nn"], "nan": ["nan"], "hein": ["hein"], "ci": ["ci", "si"], "con": ["con"], "ok": ["ok", "okay", "oké", "k"], "ouais": ["ouais", "oué"], "comment": ["comment"], "mais": ["mais", "mé"] } # creation of answers answers = { "quoi": createBaseAnswers("feur") + [ "https://twitter.com/Myshawii/status/1423219640025722880/video/1", "feur (-isson)", "https://twitter.com/Myshawii/status/1423219684552417281/video/1", "feur (-issonictalopediatreuil)" ], "oui": createBaseAnswers("stiti"), "non": createBaseAnswers("bril"), "nan": createBaseAnswers("cy"), "hein": createBaseAnswers("deux") + [ "2" ], "ci": createBaseAnswers("tron"), "con": createBaseAnswers("combre"), "ok": createBaseAnswers("sur glace"), "ouais": createBaseAnswers("stern"), "comment": createBaseAnswers("tateur"), "mais": createBaseAnswers("on") } # creation of a list of all the words (only lowercase) universalBase = createBaseTrigger(list(base.values())) # creation of a list of all the words (upper and lower case) triggerWords = permute(universalBase) # loading environment variables and launching the bot keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE"]) main(keys["TOKEN"], keys["TOKEN_SECRET"], keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"], keys["PSEUDOS"])