Compare commits

...

2 commits

Author SHA1 Message Date
52525cab51
follow pep8 2022-08-07 21:55:48 +02:00
36792c1949
order imports 2022-08-07 21:52:40 +02:00

179
main.py
View file

@ -1,47 +1,52 @@
from dotenv import load_dotenv
from os import environ
from tweepy import StreamingClient, Client, StreamRule, Tweet
from re import sub, findall
from random import choice
from datetime import datetime from datetime import datetime
from pytz import timezone
from json import loads from json import loads
from os import environ
from random import choice
from re import findall, sub
from dotenv import load_dotenv
from pytz import timezone
from tweepy import Client, StreamingClient, StreamRule, Tweet
def load(variables) -> dict: def load(variables) -> dict:
"""Load environment variables""" """Load environment variables"""
keys = {} keys = {}
load_dotenv() # load .env file load_dotenv() # load .env file
for var in variables: for var in variables:
try: try:
if var == "VERBOSE": # check is VERBOSE is set if var == "VERBOSE": # check is VERBOSE is set
try: try:
res = bool(environ[var]) res = bool(environ[var])
except: except:
res = False # if not its False res = False # if not its False
elif var == "WHITELIST": # check if WHITELIST is set elif var == "WHITELIST": # check if WHITELIST is set
try: try:
res = list(set(environ[var].split(",")) - {""}) res = list(set(environ[var].split(",")) - {""})
except: except:
res = [] # if not its an empty list res = [] # if not its an empty list
elif var == "FORCELIST": # check if FORCELIST is set elif var == "FORCELIST": # check if FORCELIST is set
try: try:
res = list(set(environ[var].split(",")) - {""}) res = list(set(environ[var].split(",")) - {""})
except: except:
res = [] # if not its an empty list res = [] # if not its an empty list
else: else:
res = environ[var] res = environ[var]
if var == "PSEUDOS": if var == "PSEUDOS":
res = list(set(res.split(",")) - {""}) # create a list for the channels and remove blank channels and doubles # create a list for the channels and remove blank channels and doubles
res = list(set(res.split(",")) - {""})
keys[var] = res keys[var] = res
except KeyError: except KeyError:
print(f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)") print(
f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)")
exit(1) exit(1)
return keys return keys
def cleanTweet(tweet: str) -> str: def cleanTweet(tweet: str) -> str:
"""Remove all unwanted elements from the tweet""" """Remove all unwanted elements from the tweet"""
# Convert to lower case # Convert to lower case
tweet = tweet.lower() tweet = tweet.lower()
# Remove URLs # Remove URLs
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet) tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet)
# Check all hashtags # Check all hashtags
@ -63,12 +68,15 @@ def cleanTweet(tweet: str) -> str:
# Remove key smashing in certains words # Remove key smashing in certains words
# uiii naaaan quoiiii noooon heiiin siiii # uiii naaaan quoiiii noooon heiiin siiii
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet) tweet = sub(
r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
return tweet.strip() return tweet.strip()
class Listener(StreamingClient): class Listener(StreamingClient):
"""Watch for tweets that match criteria in real-time""" """Watch for tweets that match criteria in real-time"""
def __init__( def __init__(
self, self,
bearer_token, bearer_token,
@ -90,11 +98,14 @@ class Listener(StreamingClient):
# Verify the age of the tweet # Verify the age of the tweet
if seniority(tweet.created_at): if seniority(tweet.created_at):
# Fetch the last word of the tweet # Fetch the last word of the tweet
tweet.username: str = self.client.get_user(id=tweet.author_id, user_auth=True).data.username tweet.username: str = self.client.get_user(
id=tweet.author_id, user_auth=True).data.username
lastWord = cleanTweet(tweet.text).split()[-1:][0] lastWord = cleanTweet(tweet.text).split()[-1:][0]
if keys["VERBOSE"]: if keys["VERBOSE"]:
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags" infoLastWord = f"dernier mot : \"{lastWord}\"" if len(
print(f"Tweet trouvé de {tweet.username} ({infoLastWord})...", end=" ") lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(
f"Tweet trouvé de {tweet.username} ({infoLastWord})...", end=" ")
# Check if the last word found is a supported word # Check if the last word found is a supported word
if lastWord in universalBase: if lastWord in universalBase:
answer = None answer = None
@ -112,9 +123,9 @@ class Listener(StreamingClient):
if mot[0] == "bon": if mot[0] == "bon":
# Between 7am and 5pm # Between 7am and 5pm
if datetime.now().hour in range(7, 17): if datetime.now().hour in range(7, 17):
answer = answers[mot[0]][0] # jour answer = answers[mot[0]][0] # jour
else: else:
answer = answers[mot[0]][1] # soir answer = answers[mot[0]][1] # soir
else: else:
# Normal answer # Normal answer
answer = answers[mot[0]] answer = answers[mot[0]]
@ -130,7 +141,8 @@ class Listener(StreamingClient):
print(f"Envoie d'un {answer[0]}...", end=" ") print(f"Envoie d'un {answer[0]}...", end=" ")
try: try:
# Send the tweet with the answer # Send the tweet with the answer
self.client.create_tweet(in_reply_to_tweet_id=tweet.id, text=choice(answer)) self.client.create_tweet(
in_reply_to_tweet_id=tweet.id, text=choice(answer))
print(f"{tweet.username} s'est fait {answer[0]} !") print(f"{tweet.username} s'est fait {answer[0]} !")
except Exception as error: except Exception as error:
error = loads(error.response.text)["errors"][0] error = loads(error.response.text)["errors"][0]
@ -142,7 +154,8 @@ class Listener(StreamingClient):
# Show error only if relevant, always in verbose # Show error only if relevant, always in verbose
if show_error or keys["VERBOSE"]: if show_error or keys["VERBOSE"]:
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}") print(
f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
else: else:
if keys["VERBOSE"]: if keys["VERBOSE"]:
print("Annulation car le dernier mot n'est pas intéressant.") print("Annulation car le dernier mot n'est pas intéressant.")
@ -156,6 +169,7 @@ class Listener(StreamingClient):
print("\n") print("\n")
return False return False
def repeater(word: str) -> str: def repeater(word: str) -> str:
"""Formating a word who need to be repeated""" """Formating a word who need to be repeated"""
# Remove first letter if the first letter is a "S" or a "T" # Remove first letter if the first letter is a "S" or a "T"
@ -168,15 +182,18 @@ def repeater(word: str) -> str:
# Random format from the base answer # Random format from the base answer
return createBaseAnswers(word) return createBaseAnswers(word)
def getFriends(client: Client, users: list[str]) -> list: def getFriends(client: Client, users: list[str]) -> list:
"""Get all friends of choosen users""" """Get all friends of choosen users"""
friends_list = [] friends_list = []
# Get IDs of the user's friends # Get IDs of the user's friends
for user in users: for user in users:
user_id = client.get_user(username=user, user_auth=True).data.id user_id = client.get_user(username=user, user_auth=True).data.id
friends_list.extend(client.get_users_following(id=user_id, user_auth=True)) friends_list.extend(client.get_users_following(
id=user_id, user_auth=True))
return friends_list[0] return friends_list[0]
def seniority(date: datetime) -> bool: def seniority(date: datetime) -> bool:
"""Return True only if the given string date is less than one day old""" """Return True only if the given string date is less than one day old"""
# Convert string format to datetime format # Convert string format to datetime format
@ -188,6 +205,7 @@ def seniority(date: datetime) -> bool:
# False if older than a day, else True # False if older than a day, else True
return False if age.days >= 1 else True return False if age.days >= 1 else True
def createBaseTrigger(lists: list[list]) -> list: def createBaseTrigger(lists: list[list]) -> list:
"""Merges all given lists into one""" """Merges all given lists into one"""
listing = [] listing = []
@ -195,6 +213,7 @@ def createBaseTrigger(lists: list[list]) -> list:
listing.extend(liste) listing.extend(liste)
return list(set(listing)) return list(set(listing))
def createBaseAnswers(word: str) -> list: def createBaseAnswers(word: str) -> list:
"""Generates default answers for a given word""" """Generates default answers for a given word"""
irritating_word = [ irritating_word = [
@ -211,6 +230,7 @@ def createBaseAnswers(word: str) -> list:
f"{word}...", f"{word}...",
] ]
def createClient(consumer_key, consumer_secret, access_token, access_token_secret) -> Client: def createClient(consumer_key, consumer_secret, access_token, access_token_secret) -> Client:
"""Create a client for the Twitter API v2""" """Create a client for the Twitter API v2"""
client = Client( client = Client(
@ -223,7 +243,8 @@ def createClient(consumer_key, consumer_secret, access_token, access_token_secre
if keys["VERBOSE"]: if keys["VERBOSE"]:
try: try:
client.get_me().data.username client.get_me().data.username
print(f"Authentification réussie en tant que @{client.get_me().data.username}.\n") print(
f"Authentification réussie en tant que @{client.get_me().data.username}.\n")
# Compte ignorés # Compte ignorés
if keys['WHITELIST'] == []: if keys['WHITELIST'] == []:
@ -244,15 +265,18 @@ def createClient(consumer_key, consumer_secret, access_token, access_token_secre
pseudos = "Aucun" pseudos = "Aucun"
else: else:
pseudos = f"@{', @'.join(keys['PSEUDOS'])}" pseudos = f"@{', @'.join(keys['PSEUDOS'])}"
print(f"Les comptes suivis par ces comptes sont traqués : {pseudos}.\n") print(
f"Les comptes suivis par ces comptes sont traqués : {pseudos}.\n")
print("Notez que si un compte est dans la whitelist, il sera dans tout les cas ignoré.\n") print(
"Notez que si un compte est dans la whitelist, il sera dans tout les cas ignoré.\n")
except: except:
print("Erreur d'authentification.") print("Erreur d'authentification.")
exit(1) exit(1)
return client return client
def create_rules(tracked_users: list[str]) -> list[str]: def create_rules(tracked_users: list[str]) -> list[str]:
"""Create rules for tracking users, by respecting the twitter API policies""" """Create rules for tracking users, by respecting the twitter API policies"""
rules = [] rules = []
@ -278,6 +302,7 @@ def create_rules(tracked_users: list[str]) -> list[str]:
return rules return rules
def start(): def start():
"""Start the bot""" """Start the bot"""
client = createClient( client = createClient(
@ -301,6 +326,7 @@ def start():
stream.add_rules(StreamRule(rule)) stream.add_rules(StreamRule(rule))
stream.filter(threaded=True) stream.filter(threaded=True)
if __name__ == "__main__": if __name__ == "__main__":
""" """
TOKEN is the Access Token available in the Authentication Tokens section under the Access Token and Secret sub-heading TOKEN is the Access Token available in the Authentication Tokens section under the Access Token and Secret sub-heading
@ -361,62 +387,62 @@ if __name__ == "__main__":
# Answers for all the triggers (keys in lowercase) # Answers for all the triggers (keys in lowercase)
answers = { answers = {
"quoi": createBaseAnswers("feur") "quoi": createBaseAnswers("feur")
+ createBaseAnswers("feuse") + createBaseAnswers("feuse")
+ [ + [
"https://twitter.com/Myshawii/status/1423219640025722880/video/1", "https://twitter.com/Myshawii/status/1423219640025722880/video/1",
"https://twitter.com/Myshawii/status/1423219684552417281/video/1", "https://twitter.com/Myshawii/status/1423219684552417281/video/1",
"feur (-isson -ictalope -diatre -uil)", "feur (-isson -ictalope -diatre -uil)",
"https://twitter.com/Myshawii/status/1455469162202075138/video/1", "https://twitter.com/Myshawii/status/1455469162202075138/video/1",
"https://twitter.com/Myshawii/status/1552026689101860865/video/1", "https://twitter.com/Myshawii/status/1552026689101860865/video/1",
"https://twitter.com/Myshawii/status/1553112547678720001/photo/1" "https://twitter.com/Myshawii/status/1553112547678720001/photo/1"
], ],
"oui": createBaseAnswers("stiti") "oui": createBaseAnswers("stiti")
+ createBaseAnswers("fi"), + createBaseAnswers("fi"),
"non": createBaseAnswers("bril"), "non": createBaseAnswers("bril"),
"nan": createBaseAnswers("cy"), "nan": createBaseAnswers("cy"),
"hein": createBaseAnswers("deux") "hein": createBaseAnswers("deux")
+ createBaseAnswers("bécile") + createBaseAnswers("bécile")
+ [ + [
"2" "2"
], ],
"ci": createBaseAnswers("tron") "ci": createBaseAnswers("tron")
+ createBaseAnswers("prine"), + createBaseAnswers("prine"),
"con": createBaseAnswers("combre") "con": createBaseAnswers("combre")
+ createBaseAnswers("gelé") + createBaseAnswers("gelé")
+ createBaseAnswers("pas"), + createBaseAnswers("pas"),
"ok": createBaseAnswers("sur glace"), "ok": createBaseAnswers("sur glace"),
"ouais": createBaseAnswers("stern"), "ouais": createBaseAnswers("stern"),
"comment": createBaseAnswers("tateur") "comment": createBaseAnswers("tateur")
+ createBaseAnswers("tatrice") + createBaseAnswers("tatrice")
+ createBaseAnswers("dant Cousteau"), + createBaseAnswers("dant Cousteau"),
"mais": createBaseAnswers("on") "mais": createBaseAnswers("on")
+ [ + [
"on (-dulation)" "on (-dulation)"
], ],
"fort": createBaseAnswers("boyard") "fort": createBaseAnswers("boyard")
+ [ + [
"boyard (-ennes)" "boyard (-ennes)"
], ],
"coup": createBaseAnswers("teau"), "coup": createBaseAnswers("teau"),
"ça": createBaseAnswers("perlipopette") "ça": createBaseAnswers("perlipopette")
+ createBaseAnswers("von") + createBaseAnswers("von")
+ createBaseAnswers("pristi") + createBaseAnswers("pristi")
+ [ + [
"pristi (-gnasse)" "pristi (-gnasse)"
], ],
"bon": [ "bon": [
createBaseAnswers("jour"), createBaseAnswers("jour"),
@ -424,38 +450,38 @@ if __name__ == "__main__":
], ],
"qui": createBaseAnswers("wi") "qui": createBaseAnswers("wi")
+ createBaseAnswers("mono"), + createBaseAnswers("mono"),
"sur": createBaseAnswers("prise"), "sur": createBaseAnswers("prise"),
"pas": createBaseAnswers("nini") "pas": createBaseAnswers("nini")
+ createBaseAnswers("steur") + createBaseAnswers("steur")
+ createBaseAnswers("trimoine") + createBaseAnswers("trimoine")
+ createBaseAnswers("") + createBaseAnswers("")
+ createBaseAnswers("stis"), + createBaseAnswers("stis"),
"ka": createBaseAnswers("pitaine") "ka": createBaseAnswers("pitaine")
+ createBaseAnswers("pitulation"), + createBaseAnswers("pitulation"),
"fais": createBaseAnswers("rtile"), "fais": createBaseAnswers("rtile"),
"tant": createBaseAnswers("gente") "tant": createBaseAnswers("gente")
+ createBaseAnswers("tation"), + createBaseAnswers("tation"),
"et": createBaseAnswers("eint") "et": createBaseAnswers("eint")
+ createBaseAnswers("ain"), + createBaseAnswers("ain"),
"la": createBaseAnswers("vabo") "la": createBaseAnswers("vabo")
+ createBaseAnswers("vande"), + createBaseAnswers("vande"),
"tki": createBaseAnswers("la"), "tki": createBaseAnswers("la"),
"moi": createBaseAnswers("tié") "moi": createBaseAnswers("tié")
+ createBaseAnswers("sson") + createBaseAnswers("sson")
+ createBaseAnswers("sissure"), + createBaseAnswers("sissure"),
"toi": createBaseAnswers("lette") "toi": createBaseAnswers("lette")
+ createBaseAnswers("ture"), + createBaseAnswers("ture"),
"top": createBaseAnswers("inambour"), "top": createBaseAnswers("inambour"),
@ -464,17 +490,17 @@ if __name__ == "__main__":
"ya": createBaseAnswers("hourt"), "ya": createBaseAnswers("hourt"),
"yo": createBaseAnswers("ghourt") "yo": createBaseAnswers("ghourt")
+ createBaseAnswers("yo"), + createBaseAnswers("yo"),
"ni": createBaseAnswers("cotine"), "ni": createBaseAnswers("cotine"),
"re": createBaseAnswers("pas") "re": createBaseAnswers("pas")
+ createBaseAnswers("veil") + createBaseAnswers("veil")
+ createBaseAnswers("tourne"), + createBaseAnswers("tourne"),
"quand": createBaseAnswers("dide") "quand": createBaseAnswers("dide")
+ createBaseAnswers("tal") + createBaseAnswers("tal")
+ createBaseAnswers("didat"), + createBaseAnswers("didat"),
"sol": createBaseAnswers("itaire"), "sol": createBaseAnswers("itaire"),
@ -487,7 +513,8 @@ if __name__ == "__main__":
universalBase = createBaseTrigger(list(base.values())) universalBase = createBaseTrigger(list(base.values()))
# Loading environment variables # Loading environment variables
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "BEARER_TOKEN", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"]) keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET",
"BEARER_TOKEN", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
# Start the bot # Start the bot
start() start()