* fix missings words for the listener-filter

* add types
* update comments
This commit is contained in:
Mylloon 2022-08-06 01:35:32 +02:00
parent 7f1f0fad8b
commit 37e4e5e874
Signed by: Anri
GPG key ID: A82D63DFF8D1317F

159
main.py
View file

@ -7,9 +7,10 @@ from datetime import datetime
from pytz import timezone from pytz import timezone
from queue import Queue from queue import Queue
from json import loads from json import loads
from itertools import product
def load(variables) -> dict: def load(variables) -> dict:
"""Load environment variables.""" """Load environment variables"""
keys = {} keys = {}
load_dotenv() # load .env file load_dotenv() # load .env file
for var in variables: for var in variables:
@ -40,33 +41,49 @@ def load(variables) -> dict:
return keys return keys
def cleanTweet(tweet: str) -> str: def cleanTweet(tweet: str) -> str:
"""Remove all unwanted elements from the tweet.""" """Remove all unwanted elements from the tweet"""
tweet = tweet.lower() # convert to lower case # Convert to lower case
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet) # remove URLs tweet = tweet.lower()
hashtagMatch = findall(r"#\S+", tweet) # check all hashtags # Remove URLs
if len(hashtagMatch) < 3: # if less than 3 tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet)
tweet = sub(r"#\S+", " ", tweet) # remove them # Check all hashtags
hashtagMatch = findall(r"#\S+", tweet)
# If less than 3
if len(hashtagMatch) < 3:
# Remove them
tweet = sub(r"#\S+", " ", tweet)
else: else:
return "" # too much hashtags, ignoring tweet # Too much hashtags in the tweet -> so ignore it
tweet = sub(r"@\S+", " ", tweet) # remove usernames return ""
tweet = sub(r" *?[^\w\s]+", " ", tweet) # remove everything who is not a letter or a number or a space # Remove usernames
tweet = sub(r"\S+(?=si|ci)", " ", tweet) # remove element of the word only if the last syllable can be matched (so more words will be answered without adding them manually) tweet = sub(r"@\S+", " ", tweet)
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet) # remove key smashing in certains words # Remove everything who isn't a letter/number/space
tweet = sub(r" *?[^\w\s]+", " ", tweet)
# Remove element of the word only if the last syllable can be matched
# (so more words will be answered without adding them manually)
tweet = sub(r"\S+(?=si|ci)", " ", tweet)
# Remove key smashing in certains words
# uiii naaaan quoiiii noooon heiiin siiii
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
return tweet.strip() return tweet.strip()
class Listener(StreamListener): class Listener(StreamListener):
"""Watch for tweets that match criteria in real-time.""" """Watch for tweets that match criteria in real-time"""
def __init__(self, api = None, users = None, forcelist = None, q = Queue()): def __init__(self, api: API = None, users: list = None, forcelist: list = None, q = Queue()):
super(Listener, self).__init__() super(Listener, self).__init__()
self.q = q self.q = q
self.api = api self.api = api
self.accounts = users self.accounts = [users, forcelist]
self.forcelist = forcelist
self.listOfFriendsID = getFriendsID(api, users) + getIDs(api, forcelist) self.listOfFriendsID = getFriendsID(api, users) + getIDs(api, forcelist)
def on_connect(self): def on_connect(self):
print(f"Scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts)} comme timeline et ces comptes : @{', @'.join(self.forcelist)}...") if self.accounts[1] == []:
forcelist = "Aucun"
else:
forcelist = f"@{', @'.join(self.accounts[1])}"
print(f"Début du scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts[0])} et ces comptes en plus : {forcelist} comme timeline...")
def on_disconnect(notice): def on_disconnect(notice):
notice = notice["disconnect"] notice = notice["disconnect"]
@ -75,23 +92,33 @@ class Listener(StreamListener):
print(f"Raison : {notice['reason']}") print(f"Raison : {notice['reason']}")
def on_status(self, status): def on_status(self, status):
if status._json["user"]["id"] in self.listOfFriendsID and status._json["user"]["screen_name"] not in keys["WHITELIST"]: # verification of the author of the tweet json = status._json
if seniority(status._json["created_at"]): # verification of the age of the tweet # Verify the author of the tweet
if not hasattr(status, "retweeted_status"): # ignore Retweet if json["user"]["id"] in self.listOfFriendsID and json["user"]["screen_name"] not in keys["WHITELIST"]:
if "extended_tweet" in status._json: # Verify the age of the tweet
if seniority(json["created_at"]):
# Verify if the tweet isn't a retweet
if not hasattr(status, "retweeted_status"):
# Fetch the tweet
if "extended_tweet" in json:
tweet = cleanTweet(status.extended_tweet["full_text"]) tweet = cleanTweet(status.extended_tweet["full_text"])
else: else:
tweet = cleanTweet(status.text) tweet = cleanTweet(status.text)
# Fetch the last word of the tweet
lastWord = tweet.split()[-1:][0] lastWord = tweet.split()[-1:][0]
if keys["VERBOSE"]: if keys["VERBOSE"]:
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags" infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(f"Tweet trouvé de {status._json['user']['screen_name']} ({infoLastWord})...", end = " ") print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ")
if lastWord in universalBase: # check if the last word found is a supported word # Check if the last word found is a supported word
if lastWord in universalBase:
answer = None answer = None
# Fetch an adequate response
for mot in base.items(): for mot in base.items():
if lastWord in mot[1]: if lastWord in mot[1]:
# Handle specific case
if mot[0] == "bon": if mot[0] == "bon":
if datetime.now().hour in range(7, 17): # between 7am and 5pm # Between 7am and 5pm
if datetime.now().hour in range(7, 17):
answer = answers[mot[0]][0] # jour answer = answers[mot[0]][0] # jour
else: else:
answer = answers[mot[0]][1] # soir answer = answers[mot[0]][1] # soir
@ -100,14 +127,17 @@ class Listener(StreamListener):
if answer == None: if answer == None:
if keys["VERBOSE"]: if keys["VERBOSE"]:
print(f"{errorMessage} Aucune réponse trouvée.") print(f"{errorMessage} Aucune réponse trouvée.")
# If an answer has been found
else: else:
if keys["VERBOSE"]: if keys["VERBOSE"]:
print(f"Envoie d'un {answer[0]}...", end = " ") print(f"Envoie d'un {answer[0]}...", end = " ")
try: # send answer try:
self.api.update_status(status = choice(answer), in_reply_to_status_id = status._json["id"], auto_populate_reply_metadata = True) # Send the tweet with the answer
print(f"{status._json['user']['screen_name']} s'est fait {answer[0]} !") self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True)
print(f"{json['user']['screen_name']} s'est fait {answer[0]} !")
except Exception as error: except Exception as error:
error = loads(error.response.text)["errors"][0] error = loads(error.response.text)["errors"][0]
# https://developer.twitter.com/en/support/twitter-api/error-troubleshooting
if error["code"] == 385: if error["code"] == 385:
error["message"] = "Tweet supprimé ou auteur en privé/bloqué." error["message"] = "Tweet supprimé ou auteur en privé/bloqué."
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}") print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
@ -116,6 +146,7 @@ class Listener(StreamListener):
print("Annulation car le dernier mot n'est pas intéressant.") print("Annulation car le dernier mot n'est pas intéressant.")
def do_stuff(self): def do_stuff(self):
"""Loop for the Listener"""
while True: while True:
self.q.get() self.q.get()
self.q.task_done() self.q.task_done()
@ -132,53 +163,61 @@ class Listener(StreamListener):
print("\n") print("\n")
return False return False
def getFriendsID(api, users: list) -> list: def getFriendsID(api: API, users: list) -> list:
"""Get all friends of choosen users.""" """Get all friends of choosen users"""
liste = [] liste = []
# Get IDs of the user's friends
for user in users: for user in users:
liste.extend(api.friends_ids(user)) liste.extend(api.friends_ids(user))
return list(set(liste)) return list(set(liste))
def getIDs(api, users: list) -> list: def getIDs(api: API, users: list) -> list:
"""Get all the ID of users""" """Get all the ID of users"""
liste = [] liste = []
# Get IDs of the users
for user in users: for user in users:
liste.append(api.get_user(user)._json["id"]) liste.append(api.get_user(user)._json["id"])
return list(set(liste)) return list(set(liste))
def seniority(date: str) -> bool: def seniority(date: str) -> bool:
"""Return True only if the given string date is less than one day old.""" """Return True only if the given string date is less than one day old"""
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") # convert String format to datetime format # Convert string format to datetime format
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC")) # Twitter give us an UTC time datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y")
age = datetime.now(timezone("UTC")) - datetimeObject # time now in UTC minus the time we got to get the age of the date # Twitter give us an UTC time
return False if age.days >= 1 else True # False if older than a day datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC"))
# time now in UTC minus the time we got to get the age of the date
age = datetime.now(timezone("UTC")) - datetimeObject
# False if older than a day, else True
return False if age.days >= 1 else True
def permute(array: list) -> list: def generateWords(array: list) -> list:
"""Retrieves all possible combinations for the given list and returns the result as a list.""" """
Retrieves all possible combinations for the given list and returns the result as a list
This is used for the filter in the stream (before calling the Listener)
"""
quoiListe = [] quoiListe = []
for text in array: # all element of the list for text in array:
if text.lower() not in quoiListe: # Add all combinations
quoiListe.append(text.lower()) # word fully in lowercase # Example for 'oui': ['OUI', 'OUi', 'OuI', 'Oui', 'oUI', 'oUi', 'ouI', 'oui']
if text.upper() not in quoiListe: quoiListe.extend(list(map(''.join, product(*zip(text.upper(), text.lower())))))
quoiListe.append(text.upper()) # word fully in uppercase
if text.capitalize() not in quoiListe:
quoiListe.append(text.capitalize()) # word with the first letter in uppercase
return quoiListe return quoiListe
def createBaseTrigger(lists) -> list: def createBaseTrigger(lists: list) -> list:
"""Merges all given lists into one.""" """Merges all given lists into one"""
listing = [] listing = []
for liste in lists: for liste in lists:
listing.extend(liste) listing.extend(liste)
return list(set(listing)) return list(set(listing))
def createBaseAnswers(word) -> list: def createBaseAnswers(word: str) -> list:
"""Generates default answers for a given word.""" """Generates default answers for a given word"""
return [word, f"({word})", word.upper(), f"{word} lol"] return [word, f"({word})", word.upper(), f"{word} lol", f"{word} 👀"]
def start(): def start():
"""Start the bot.""" """Start the bot"""
auth = OAuthHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"]) auth = OAuthHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"])
auth.set_access_token(keys["TOKEN"], keys["TOKEN_SECRET"]) auth.set_access_token(keys["TOKEN"], keys["TOKEN_SECRET"])
@ -212,9 +251,11 @@ if __name__ == "__main__":
-- --
PSEUDO is the PSEUDO of the account you want to listen to snipe. PSEUDO is the PSEUDO of the account you want to listen to snipe.
""" """
errorMessage = "Une erreur survient !" # error message # Error message
errorMessage = "Une erreur survient !"
base = { # words to detect in lowercase # Words who trigger the bot (keys in lowercase)
base = {
"quoi": ["quoi", "koi", "quoient"], "quoi": ["quoi", "koi", "quoient"],
"oui": ["oui", "ui", "wi"], "oui": ["oui", "ui", "wi"],
"non": ["non", "nn"], "non": ["non", "nn"],
@ -252,7 +293,8 @@ if __name__ == "__main__":
"vois": ["vois", "voit", "voie", "voi"], "vois": ["vois", "voit", "voie", "voi"],
} }
answers = { # creation of answers # Answers for all the triggers (keys in lowercase)
answers = {
"quoi": createBaseAnswers("feur") "quoi": createBaseAnswers("feur")
+ createBaseAnswers("feuse") + createBaseAnswers("feuse")
+ [ + [
@ -374,11 +416,14 @@ if __name__ == "__main__":
"vois": createBaseAnswers("ture"), "vois": createBaseAnswers("ture"),
} }
universalBase = createBaseTrigger(list(base.values())) # creation of a list of all the words # List of all the trigger words
universalBase = createBaseTrigger(list(base.values()))
triggerWords = permute(universalBase) # creation of a list of all the words (upper and lower case) # List of all the triggers words's variations
triggerWords = generateWords(universalBase)
# Loading environment variables and launching the bot # Loading environment variables
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"]) keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
print("") # just a newline
# Start the bot
start() start()