Compare commits

..

No commits in common. "52525cab51a3ac8c524c47252880e339f8f3b486" and "46037106e6d09a4aeb7d5c1309934172e728dd4c" have entirely different histories.

65
main.py
View file

@ -1,13 +1,11 @@
from datetime import datetime
from json import loads
from os import environ
from random import choice
from re import findall, sub
from dotenv import load_dotenv
from os import environ
from tweepy import StreamingClient, Client, StreamRule, Tweet
from re import sub, findall
from random import choice
from datetime import datetime
from pytz import timezone
from tweepy import Client, StreamingClient, StreamRule, Tweet
from json import loads
def load(variables) -> dict:
"""Load environment variables"""
@ -33,16 +31,13 @@ def load(variables) -> dict:
else:
res = environ[var]
if var == "PSEUDOS":
# create a list for the channels and remove blank channels and doubles
res = list(set(res.split(",")) - {""})
res = list(set(res.split(",")) - {""}) # create a list for the channels and remove blank channels and doubles
keys[var] = res
except KeyError:
print(
f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)")
print(f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)")
exit(1)
return keys
def cleanTweet(tweet: str) -> str:
"""Remove all unwanted elements from the tweet"""
# Convert to lower case
@ -68,15 +63,12 @@ def cleanTweet(tweet: str) -> str:
# Remove key smashing in certains words
# uiii naaaan quoiiii noooon heiiin siiii
tweet = sub(
r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
return tweet.strip()
class Listener(StreamingClient):
"""Watch for tweets that match criteria in real-time"""
def __init__(
self,
bearer_token,
@ -98,14 +90,11 @@ class Listener(StreamingClient):
# Verify the age of the tweet
if seniority(tweet.created_at):
# Fetch the last word of the tweet
tweet.username: str = self.client.get_user(
id=tweet.author_id, user_auth=True).data.username
tweet.username: str = self.client.get_user(id=tweet.author_id, user_auth=True).data.username
lastWord = cleanTweet(tweet.text).split()[-1:][0]
if keys["VERBOSE"]:
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(
lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(
f"Tweet trouvé de {tweet.username} ({infoLastWord})...", end=" ")
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(f"Tweet trouvé de {tweet.username} ({infoLastWord})...", end=" ")
# Check if the last word found is a supported word
if lastWord in universalBase:
answer = None
@ -141,8 +130,7 @@ class Listener(StreamingClient):
print(f"Envoie d'un {answer[0]}...", end=" ")
try:
# Send the tweet with the answer
self.client.create_tweet(
in_reply_to_tweet_id=tweet.id, text=choice(answer))
self.client.create_tweet(in_reply_to_tweet_id=tweet.id, text=choice(answer))
print(f"{tweet.username} s'est fait {answer[0]} !")
except Exception as error:
error = loads(error.response.text)["errors"][0]
@ -154,8 +142,7 @@ class Listener(StreamingClient):
# Show error only if relevant, always in verbose
if show_error or keys["VERBOSE"]:
print(
f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
else:
if keys["VERBOSE"]:
print("Annulation car le dernier mot n'est pas intéressant.")
@ -169,7 +156,6 @@ class Listener(StreamingClient):
print("\n")
return False
def repeater(word: str) -> str:
"""Formating a word who need to be repeated"""
# Remove first letter if the first letter is a "S" or a "T"
@ -182,18 +168,15 @@ def repeater(word: str) -> str:
# Random format from the base answer
return createBaseAnswers(word)
def getFriends(client: Client, users: list[str]) -> list:
"""Get all friends of choosen users"""
friends_list = []
# Get IDs of the user's friends
for user in users:
user_id = client.get_user(username=user, user_auth=True).data.id
friends_list.extend(client.get_users_following(
id=user_id, user_auth=True))
friends_list.extend(client.get_users_following(id=user_id, user_auth=True))
return friends_list[0]
def seniority(date: datetime) -> bool:
"""Return True only if the given string date is less than one day old"""
# Convert string format to datetime format
@ -205,7 +188,6 @@ def seniority(date: datetime) -> bool:
# False if older than a day, else True
return False if age.days >= 1 else True
def createBaseTrigger(lists: list[list]) -> list:
"""Merges all given lists into one"""
listing = []
@ -213,7 +195,6 @@ def createBaseTrigger(lists: list[list]) -> list:
listing.extend(liste)
return list(set(listing))
def createBaseAnswers(word: str) -> list:
"""Generates default answers for a given word"""
irritating_word = [
@ -230,7 +211,6 @@ def createBaseAnswers(word: str) -> list:
f"{word}...",
]
def createClient(consumer_key, consumer_secret, access_token, access_token_secret) -> Client:
"""Create a client for the Twitter API v2"""
client = Client(
@ -243,8 +223,7 @@ def createClient(consumer_key, consumer_secret, access_token, access_token_secre
if keys["VERBOSE"]:
try:
client.get_me().data.username
print(
f"Authentification réussie en tant que @{client.get_me().data.username}.\n")
print(f"Authentification réussie en tant que @{client.get_me().data.username}.\n")
# Compte ignorés
if keys['WHITELIST'] == []:
@ -265,18 +244,15 @@ def createClient(consumer_key, consumer_secret, access_token, access_token_secre
pseudos = "Aucun"
else:
pseudos = f"@{', @'.join(keys['PSEUDOS'])}"
print(
f"Les comptes suivis par ces comptes sont traqués : {pseudos}.\n")
print(f"Les comptes suivis par ces comptes sont traqués : {pseudos}.\n")
print(
"Notez que si un compte est dans la whitelist, il sera dans tout les cas ignoré.\n")
print("Notez que si un compte est dans la whitelist, il sera dans tout les cas ignoré.\n")
except:
print("Erreur d'authentification.")
exit(1)
return client
def create_rules(tracked_users: list[str]) -> list[str]:
"""Create rules for tracking users, by respecting the twitter API policies"""
rules = []
@ -302,7 +278,6 @@ def create_rules(tracked_users: list[str]) -> list[str]:
return rules
def start():
"""Start the bot"""
client = createClient(
@ -326,7 +301,6 @@ def start():
stream.add_rules(StreamRule(rule))
stream.filter(threaded=True)
if __name__ == "__main__":
"""
TOKEN is the Access Token available in the Authentication Tokens section under the Access Token and Secret sub-heading
@ -513,8 +487,7 @@ if __name__ == "__main__":
universalBase = createBaseTrigger(list(base.values()))
# Loading environment variables
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET",
"BEARER_TOKEN", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "BEARER_TOKEN", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
# Start the bot
start()