Compare commits

...

2 commits

Author SHA1 Message Date
52525cab51
follow pep8 2022-08-07 21:55:48 +02:00
36792c1949
order imports 2022-08-07 21:52:40 +02:00

179
main.py
View file

@ -1,47 +1,52 @@
from dotenv import load_dotenv
from os import environ
from tweepy import StreamingClient, Client, StreamRule, Tweet
from re import sub, findall
from random import choice
from datetime import datetime
from pytz import timezone
from json import loads
from os import environ
from random import choice
from re import findall, sub
from dotenv import load_dotenv
from pytz import timezone
from tweepy import Client, StreamingClient, StreamRule, Tweet
def load(variables) -> dict:
"""Load environment variables"""
keys = {}
load_dotenv() # load .env file
load_dotenv() # load .env file
for var in variables:
try:
if var == "VERBOSE": # check is VERBOSE is set
if var == "VERBOSE": # check is VERBOSE is set
try:
res = bool(environ[var])
except:
res = False # if not its False
elif var == "WHITELIST": # check if WHITELIST is set
res = False # if not its False
elif var == "WHITELIST": # check if WHITELIST is set
try:
res = list(set(environ[var].split(",")) - {""})
except:
res = [] # if not its an empty list
elif var == "FORCELIST": # check if FORCELIST is set
res = [] # if not its an empty list
elif var == "FORCELIST": # check if FORCELIST is set
try:
res = list(set(environ[var].split(",")) - {""})
except:
res = [] # if not its an empty list
res = [] # if not its an empty list
else:
res = environ[var]
if var == "PSEUDOS":
res = list(set(res.split(",")) - {""}) # create a list for the channels and remove blank channels and doubles
# create a list for the channels and remove blank channels and doubles
res = list(set(res.split(",")) - {""})
keys[var] = res
except KeyError:
print(f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)")
print(
f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)")
exit(1)
return keys
def cleanTweet(tweet: str) -> str:
"""Remove all unwanted elements from the tweet"""
# Convert to lower case
tweet = tweet.lower()
tweet = tweet.lower()
# Remove URLs
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet)
# Check all hashtags
@ -63,12 +68,15 @@ def cleanTweet(tweet: str) -> str:
# Remove key smashing in certains words
# uiii naaaan quoiiii noooon heiiin siiii
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
tweet = sub(
r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
return tweet.strip()
class Listener(StreamingClient):
"""Watch for tweets that match criteria in real-time"""
def __init__(
self,
bearer_token,
@ -90,11 +98,14 @@ class Listener(StreamingClient):
# Verify the age of the tweet
if seniority(tweet.created_at):
# Fetch the last word of the tweet
tweet.username: str = self.client.get_user(id=tweet.author_id, user_auth=True).data.username
tweet.username: str = self.client.get_user(
id=tweet.author_id, user_auth=True).data.username
lastWord = cleanTweet(tweet.text).split()[-1:][0]
if keys["VERBOSE"]:
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(f"Tweet trouvé de {tweet.username} ({infoLastWord})...", end=" ")
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(
lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(
f"Tweet trouvé de {tweet.username} ({infoLastWord})...", end=" ")
# Check if the last word found is a supported word
if lastWord in universalBase:
answer = None
@ -112,9 +123,9 @@ class Listener(StreamingClient):
if mot[0] == "bon":
# Between 7am and 5pm
if datetime.now().hour in range(7, 17):
answer = answers[mot[0]][0] # jour
answer = answers[mot[0]][0] # jour
else:
answer = answers[mot[0]][1] # soir
answer = answers[mot[0]][1] # soir
else:
# Normal answer
answer = answers[mot[0]]
@ -130,7 +141,8 @@ class Listener(StreamingClient):
print(f"Envoie d'un {answer[0]}...", end=" ")
try:
# Send the tweet with the answer
self.client.create_tweet(in_reply_to_tweet_id=tweet.id, text=choice(answer))
self.client.create_tweet(
in_reply_to_tweet_id=tweet.id, text=choice(answer))
print(f"{tweet.username} s'est fait {answer[0]} !")
except Exception as error:
error = loads(error.response.text)["errors"][0]
@ -142,7 +154,8 @@ class Listener(StreamingClient):
# Show error only if relevant, always in verbose
if show_error or keys["VERBOSE"]:
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
print(
f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
else:
if keys["VERBOSE"]:
print("Annulation car le dernier mot n'est pas intéressant.")
@ -156,6 +169,7 @@ class Listener(StreamingClient):
print("\n")
return False
def repeater(word: str) -> str:
"""Formating a word who need to be repeated"""
# Remove first letter if the first letter is a "S" or a "T"
@ -168,15 +182,18 @@ def repeater(word: str) -> str:
# Random format from the base answer
return createBaseAnswers(word)
def getFriends(client: Client, users: list[str]) -> list:
"""Get all friends of choosen users"""
friends_list = []
# Get IDs of the user's friends
for user in users:
user_id = client.get_user(username=user, user_auth=True).data.id
friends_list.extend(client.get_users_following(id=user_id, user_auth=True))
friends_list.extend(client.get_users_following(
id=user_id, user_auth=True))
return friends_list[0]
def seniority(date: datetime) -> bool:
"""Return True only if the given string date is less than one day old"""
# Convert string format to datetime format
@ -188,6 +205,7 @@ def seniority(date: datetime) -> bool:
# False if older than a day, else True
return False if age.days >= 1 else True
def createBaseTrigger(lists: list[list]) -> list:
"""Merges all given lists into one"""
listing = []
@ -195,6 +213,7 @@ def createBaseTrigger(lists: list[list]) -> list:
listing.extend(liste)
return list(set(listing))
def createBaseAnswers(word: str) -> list:
"""Generates default answers for a given word"""
irritating_word = [
@ -211,6 +230,7 @@ def createBaseAnswers(word: str) -> list:
f"{word}...",
]
def createClient(consumer_key, consumer_secret, access_token, access_token_secret) -> Client:
"""Create a client for the Twitter API v2"""
client = Client(
@ -223,7 +243,8 @@ def createClient(consumer_key, consumer_secret, access_token, access_token_secre
if keys["VERBOSE"]:
try:
client.get_me().data.username
print(f"Authentification réussie en tant que @{client.get_me().data.username}.\n")
print(
f"Authentification réussie en tant que @{client.get_me().data.username}.\n")
# Compte ignorés
if keys['WHITELIST'] == []:
@ -244,15 +265,18 @@ def createClient(consumer_key, consumer_secret, access_token, access_token_secre
pseudos = "Aucun"
else:
pseudos = f"@{', @'.join(keys['PSEUDOS'])}"
print(f"Les comptes suivis par ces comptes sont traqués : {pseudos}.\n")
print(
f"Les comptes suivis par ces comptes sont traqués : {pseudos}.\n")
print("Notez que si un compte est dans la whitelist, il sera dans tout les cas ignoré.\n")
print(
"Notez que si un compte est dans la whitelist, il sera dans tout les cas ignoré.\n")
except:
print("Erreur d'authentification.")
exit(1)
return client
def create_rules(tracked_users: list[str]) -> list[str]:
"""Create rules for tracking users, by respecting the twitter API policies"""
rules = []
@ -278,6 +302,7 @@ def create_rules(tracked_users: list[str]) -> list[str]:
return rules
def start():
"""Start the bot"""
client = createClient(
@ -301,6 +326,7 @@ def start():
stream.add_rules(StreamRule(rule))
stream.filter(threaded=True)
if __name__ == "__main__":
"""
TOKEN is the Access Token available in the Authentication Tokens section under the Access Token and Secret sub-heading
@ -361,62 +387,62 @@ if __name__ == "__main__":
# Answers for all the triggers (keys in lowercase)
answers = {
"quoi": createBaseAnswers("feur")
+ createBaseAnswers("feuse")
+ [
"https://twitter.com/Myshawii/status/1423219640025722880/video/1",
"https://twitter.com/Myshawii/status/1423219684552417281/video/1",
"feur (-isson -ictalope -diatre -uil)",
"https://twitter.com/Myshawii/status/1455469162202075138/video/1",
"https://twitter.com/Myshawii/status/1552026689101860865/video/1",
"https://twitter.com/Myshawii/status/1553112547678720001/photo/1"
],
+ createBaseAnswers("feuse")
+ [
"https://twitter.com/Myshawii/status/1423219640025722880/video/1",
"https://twitter.com/Myshawii/status/1423219684552417281/video/1",
"feur (-isson -ictalope -diatre -uil)",
"https://twitter.com/Myshawii/status/1455469162202075138/video/1",
"https://twitter.com/Myshawii/status/1552026689101860865/video/1",
"https://twitter.com/Myshawii/status/1553112547678720001/photo/1"
],
"oui": createBaseAnswers("stiti")
+ createBaseAnswers("fi"),
+ createBaseAnswers("fi"),
"non": createBaseAnswers("bril"),
"nan": createBaseAnswers("cy"),
"hein": createBaseAnswers("deux")
+ createBaseAnswers("bécile")
+ [
"2"
],
+ createBaseAnswers("bécile")
+ [
"2"
],
"ci": createBaseAnswers("tron")
+ createBaseAnswers("prine"),
+ createBaseAnswers("prine"),
"con": createBaseAnswers("combre")
+ createBaseAnswers("gelé")
+ createBaseAnswers("pas"),
+ createBaseAnswers("gelé")
+ createBaseAnswers("pas"),
"ok": createBaseAnswers("sur glace"),
"ouais": createBaseAnswers("stern"),
"comment": createBaseAnswers("tateur")
+ createBaseAnswers("tatrice")
+ createBaseAnswers("dant Cousteau"),
+ createBaseAnswers("tatrice")
+ createBaseAnswers("dant Cousteau"),
"mais": createBaseAnswers("on")
+ [
"on (-dulation)"
],
+ [
"on (-dulation)"
],
"fort": createBaseAnswers("boyard")
+ [
"boyard (-ennes)"
],
+ [
"boyard (-ennes)"
],
"coup": createBaseAnswers("teau"),
"ça": createBaseAnswers("perlipopette")
+ createBaseAnswers("von")
+ createBaseAnswers("pristi")
+ [
"pristi (-gnasse)"
],
+ createBaseAnswers("von")
+ createBaseAnswers("pristi")
+ [
"pristi (-gnasse)"
],
"bon": [
createBaseAnswers("jour"),
@ -424,38 +450,38 @@ if __name__ == "__main__":
],
"qui": createBaseAnswers("wi")
+ createBaseAnswers("mono"),
+ createBaseAnswers("mono"),
"sur": createBaseAnswers("prise"),
"pas": createBaseAnswers("nini")
+ createBaseAnswers("steur")
+ createBaseAnswers("trimoine")
+ createBaseAnswers("")
+ createBaseAnswers("stis"),
+ createBaseAnswers("steur")
+ createBaseAnswers("trimoine")
+ createBaseAnswers("")
+ createBaseAnswers("stis"),
"ka": createBaseAnswers("pitaine")
+ createBaseAnswers("pitulation"),
+ createBaseAnswers("pitulation"),
"fais": createBaseAnswers("rtile"),
"tant": createBaseAnswers("gente")
+ createBaseAnswers("tation"),
+ createBaseAnswers("tation"),
"et": createBaseAnswers("eint")
+ createBaseAnswers("ain"),
+ createBaseAnswers("ain"),
"la": createBaseAnswers("vabo")
+ createBaseAnswers("vande"),
+ createBaseAnswers("vande"),
"tki": createBaseAnswers("la"),
"moi": createBaseAnswers("tié")
+ createBaseAnswers("sson")
+ createBaseAnswers("sissure"),
+ createBaseAnswers("sson")
+ createBaseAnswers("sissure"),
"toi": createBaseAnswers("lette")
+ createBaseAnswers("ture"),
+ createBaseAnswers("ture"),
"top": createBaseAnswers("inambour"),
@ -464,17 +490,17 @@ if __name__ == "__main__":
"ya": createBaseAnswers("hourt"),
"yo": createBaseAnswers("ghourt")
+ createBaseAnswers("yo"),
+ createBaseAnswers("yo"),
"ni": createBaseAnswers("cotine"),
"re": createBaseAnswers("pas")
+ createBaseAnswers("veil")
+ createBaseAnswers("tourne"),
+ createBaseAnswers("veil")
+ createBaseAnswers("tourne"),
"quand": createBaseAnswers("dide")
+ createBaseAnswers("tal")
+ createBaseAnswers("didat"),
+ createBaseAnswers("tal")
+ createBaseAnswers("didat"),
"sol": createBaseAnswers("itaire"),
@ -487,7 +513,8 @@ if __name__ == "__main__":
universalBase = createBaseTrigger(list(base.values()))
# Loading environment variables
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "BEARER_TOKEN", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET",
"BEARER_TOKEN", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
# Start the bot
start()