from dotenv import load_dotenv from os import environ from tweepy import OAuthHandler, API, StreamListener, Stream from re import sub from random import choice from datetime import datetime from pytz import timezone def load(variables) -> dict: """Load environment variables.""" keys = {} load_dotenv() # load .env file for var in variables: try: res = environ[var] if var == "PSEUDOS": res = list(set(res.split(',')) - {""}) # create a list for the channels and remove blank channels and doubles keys[var] = res except KeyError: print(f"Please set the environment variable {var} (.env file supported)") exit(1) return keys class Listener(StreamListener): def __init__(self, api = None, users = None): super(Listener, self).__init__() self.api = api self.listOfFriendsID = getFriendsID(api, users) def on_status(self, status): """Answer to tweets.""" if seniority(status._json["created_at"]): tweetText = sub(r'https?:\/\/\S+| +?\?|\?| +?\!| ?\!|-|~', '', status._json["text"]) if status._json["user"]["id"] in self.listOfFriendsID: lastWord = tweetText.split()[-1:].lower() if lastWord in universalBase: try: if lastWord in quoiBase: answer = feur elif lastWord in ouiBase: answer = stiti elif lastWord in nonBase: answer = bril else: print(f"{errorMessage} I didn't know how to answer.") return self.api.update_status(status = choice(answer), in_reply_to_status_id = status._json["id"], auto_populate_reply_metadata = True) print(f"{status._json['user']['screen_name']} est passé au coiffeur !") except Exception as error: print(f"{errorMessage} {error}") def getFriendsID(api, users: list) -> list: """Get all friends of choosen users.""" liste = [] for user in users: liste.extend(api.friends_ids(user)) return list(set(liste)) def seniority(date: str) -> bool: """Return True only if the given string date is less than one day old.""" datetimeObject = datetime.strptime(date, '%a %b %d %H:%M:%S +0000 %Y') # Convert String format to datetime format datetimeObject = datetimeObject.replace(tzinfo = timezone('UTC')) # Twitter give us an UTC time age = datetime.now(timezone('UTC')) - datetimeObject # Time now in UTC minus the time we got to get the age of the date return False if age.days >= 1 else True # False if older than a day def permute(array: list) -> list: """Retrieves all possible combinations for the given list and returns the result as a list.""" quoiListe = [] for text in array: # all element of the list n = len(text) mx = 1 << n # Number of permutations is 2^n text = text.lower() # Converting string to lower case for i in range(mx): # Using all subsequences and permuting them combination = [k for k in text] for j in range(n): if (((i >> j) & 1) == 1): # If j-th bit is set, we convert it to upper case combination[j] = text[j].upper() temp = "" for i in combination: temp += i quoiListe.append(temp) return quoiListe def createBaseTrigger(*lists) -> list: """Merges all given lists into one""" listing = [] for liste in lists: listing.extend(liste) return list(set(listing)) def createBaseAnswers(word) -> list: """Generates default answers for a given word""" return [word, f"({word})", word.capitalize(), f"{word} lol"] def main(accessToken: str, accessTokenSecret: str, consumerKey: str, consumerSecret: str, users: list): """Main method.""" auth = OAuthHandler(consumerKey, consumerSecret) auth.set_access_token(accessToken, accessTokenSecret) api = API(auth_handler = auth, wait_on_rate_limit = True) listener = Listener(api, users) stream = Stream(auth = api.auth, listener = listener) print(f"Scroll sur Twitter avec les abonnements de @{', @'.join(users)}...") stream.filter(track = triggerWords, languages = ["fr"], is_async = True) if __name__ == '__main__': """ TOKEN is the Access Token available in the Authentication Tokens section under Access Token and Secret sub-heading. TOKEN_SECRET is the Access Token Secret available in the Authentication Tokens section under Access Token and Secret sub-heading. CONSUMER_KEY is the API Key available in the Consumer Keys section. CONSUMER_SECRET is the API Secret Key available in the Consumer Keys section. -- PSEUDO is the PSEUDO of the account you want to listen to snipe. """ errorMessage = "Error happens!" # error message # words to detect quoiBase = ["quoi", "koi"] ouiBase = ["oui", "ui"] nonBase = ["non", "nn"] universalBase = createBaseTrigger(quoiBase, ouiBase, nonBase) # creation of the list with all alternatives (upper/lower case) triggerWords = permute(universalBase) # creation of answers feur = createBaseAnswers("feur") feur.extend(["https://twitter.com/shukuzi62/status/1422611919538724868/video/1"]) # add a message in addition to the default answers stiti = createBaseAnswers("stiti") bril = createBaseAnswers("bril") # loading environment variables and launching the bot keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS"]) main(keys["TOKEN"], keys["TOKEN_SECRET"], keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"], keys["PSEUDOS"])