This repository has been archived on 2022-08-14. You can view files and clone it, but cannot push or open issues or pull requests.
feurBot/main.py

537 lines
17 KiB
Python
Raw Normal View History

2022-08-07 21:52:40 +02:00
from datetime import datetime
from json import loads
2021-08-03 14:17:00 +02:00
from os import environ
2021-08-03 19:19:30 +02:00
from random import choice
2022-08-07 21:52:40 +02:00
from re import findall, sub
from dotenv import load_dotenv
2022-08-08 09:04:17 +02:00
from tweepy import Client, StreamingClient, StreamRule, Tweet, errors
2022-08-07 21:52:40 +02:00
2021-08-03 14:17:00 +02:00
def load(variables) -> dict:
"""Load environment variables"""
2021-08-03 14:17:00 +02:00
keys = {}
2022-08-07 21:55:48 +02:00
load_dotenv() # load .env file
2021-08-03 14:17:00 +02:00
for var in variables:
try:
2022-08-07 21:55:48 +02:00
if var == "VERBOSE": # check is VERBOSE is set
try:
res = bool(environ[var])
except:
2022-08-07 21:55:48 +02:00
res = False # if not its False
elif var == "WHITELIST": # check if WHITELIST is set
2021-08-05 20:26:25 +02:00
try:
res = list(set(environ[var].split(",")) - {""})
except:
2022-08-07 21:55:48 +02:00
res = [] # if not its an empty list
elif var == "FORCELIST": # check if FORCELIST is set
try:
res = list(set(environ[var].split(",")) - {""})
except:
2022-08-07 21:55:48 +02:00
res = [] # if not its an empty list
else:
res = environ[var]
if var == "PSEUDOS":
2022-08-07 21:55:48 +02:00
# create a list for the channels and remove blank channels and doubles
res = list(set(res.split(",")) - {""})
keys[var] = res
2021-08-03 14:17:00 +02:00
except KeyError:
2022-08-07 21:55:48 +02:00
print(
f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)")
2021-08-03 14:17:00 +02:00
exit(1)
return keys
2022-08-07 21:55:48 +02:00
2022-08-08 09:36:10 +02:00
def cleanTweet(tweet: str) -> str | None:
"""Remove all unwanted elements from the tweet"""
# Convert to lower case
2022-08-07 21:55:48 +02:00
tweet = tweet.lower()
# Remove URLs
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet)
# Check all hashtags
hashtagMatch = findall(r"#\S+", tweet)
# If less than 3
if len(hashtagMatch) < 3:
# Remove them
tweet = sub(r"#\S+", " ", tweet)
2021-08-05 19:52:54 +02:00
else:
# Too much hashtags in the tweet -> so ignore it
2022-08-08 09:36:10 +02:00
return None
# Remove usernames
tweet = sub(r"@\S+", " ", tweet)
# Remove everything who isn't a letter/number/space
tweet = sub(r" *?[^\w\s]+", " ", tweet)
# Remove element of the word only if the last syllable can be matched
# (so more words will be answered without adding them manually)
tweet = sub(r"\S+(?=si|ci)", " ", tweet)
# Remove key smashing in certains words
2022-08-08 02:51:58 +02:00
# uiii naaaan quoiiii noooon heiiin siiii
2022-08-07 21:55:48 +02:00
tweet = sub(
r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
2021-11-02 10:39:51 +01:00
return tweet.strip()
2022-08-07 21:55:48 +02:00
class Listener(StreamingClient):
"""Watch for tweets that match criteria in real-time"""
2022-08-07 21:55:48 +02:00
2022-08-06 02:28:18 +02:00
def __init__(
self,
bearer_token,
client: Client,
2022-08-06 02:28:18 +02:00
):
2022-08-07 21:50:59 +02:00
super(Listener, self).__init__(bearer_token, wait_on_rate_limit=True)
self.client = client
2022-08-08 01:20:35 +02:00
self.cache = {}
2021-08-05 18:51:14 +02:00
def on_connect(self):
2022-08-07 20:16:07 +02:00
print(f"Début du scroll sur Twitter...")
2021-11-02 10:39:51 +01:00
2022-08-08 01:20:35 +02:00
def get_user(self, uid: int) -> str:
"""Return username by ID, with cache support"""
# If not cached
if not uid in self.cache:
# Fetch from Twitter
self.cache[uid] = self.client.get_user(
id=uid, user_auth=True).data.username
# Return the username
return self.cache[uid]
2022-08-07 21:14:23 +02:00
def on_tweet(self, tweet: Tweet):
# Check if the tweet is not a retweet
if not tweet.text.startswith("RT @"):
2022-08-08 01:20:35 +02:00
username = self.get_user(tweet.author_id)
# Clean the tweet
lastWord = cleanTweet(tweet.text)
# Log
2022-08-07 21:50:59 +02:00
if keys["VERBOSE"]:
2022-08-08 01:20:35 +02:00
infoLastWord = "dernier mot : "
2022-08-08 09:36:10 +02:00
newline = "\n"
match lastWord:
case None:
infoLastWord += "tweet ignoré car trop de hashtags"
case w if len(w) == 0:
infoLastWord += "tweet pas intéressant"
case _:
infoLastWord += f"dernier mot : {lastWord.split()[-1:][0]}"
newline = ""
2022-08-07 21:55:48 +02:00
print(
2022-08-08 09:36:10 +02:00
f"Tweet trouvé de {username} ({infoLastWord})...{newline}", end=" ")
2022-08-08 01:20:35 +02:00
2022-08-08 09:36:10 +02:00
# Ignore a tweet
if lastWord == None or len(lastWord) == 0:
2022-08-08 01:20:35 +02:00
return
# Fetch the last word of the tweet
lastWord = lastWord.split()[-1:][0]
2022-08-07 21:50:59 +02:00
# Check if the last word found is a supported word
if lastWord in universalBase:
answer = None
# Check repetition
repetition = findall(r"di(\S+)", lastWord)
if(len(repetition) > 0):
# We need to repeat something...
answer = repeater(repetition[0])
# Fetch an other adequate (better) response
for mot in base.items():
if lastWord in mot[1]:
# Handle specific case
if mot[0] == "bon":
# Between 7am and 5pm
if datetime.now().hour in range(7, 17):
2022-08-07 21:55:48 +02:00
answer = answers[mot[0]][0] # jour
2022-08-07 20:16:07 +02:00
else:
2022-08-07 21:55:48 +02:00
answer = answers[mot[0]][1] # soir
2022-08-07 21:50:59 +02:00
else:
# Normal answer
answer = answers[mot[0]]
# If no answer has been found
if answer == None:
if keys["VERBOSE"]:
print(f"{errorMessage} Aucune réponse trouvée.")
# If an answer has been found
2022-08-07 20:16:07 +02:00
else:
if keys["VERBOSE"]:
2022-08-07 21:50:59 +02:00
print(f"Envoie d'un {answer[0]}...", end=" ")
try:
# Send the tweet with the answer
2022-08-08 02:32:28 +02:00
self.client.create_tweet(
in_reply_to_tweet_id=tweet.id, text=choice(answer))
2022-08-08 01:20:35 +02:00
print(f"{username} s'est fait {answer[0]} !")
2022-08-08 09:04:17 +02:00
except errors.Forbidden:
if keys["VERBOSE"]:
2022-08-07 21:55:48 +02:00
print(
2022-08-08 09:04:17 +02:00
f"{errorMessage[:-2]} ! Tweet supprimé ou auteur ({username}) en privé/bloqué.")
2022-08-07 21:50:59 +02:00
else:
if keys["VERBOSE"]:
print("Annulation car le dernier mot n'est pas intéressant.")
2021-08-04 14:05:20 +02:00
2022-08-08 09:09:21 +02:00
def on_request_error(self, status_code):
print(f"{errorMessage[:-2]} ({status_code}) !", end=" ")
match status_code:
case 420:
if keys["VERBOSE"]:
print("Déconnecter du flux.")
case 429:
if keys["VERBOSE"]:
print("En attente de reconnexion...")
case _:
print("\n")
return False
2022-08-07 21:55:48 +02:00
2022-08-06 03:27:15 +02:00
def repeater(word: str) -> str:
"""Formating a word who need to be repeated"""
# Remove first letter if the first letter is a "S" or a "T"
# Explanation: Trigger word for the repeater is "di" and sometimes it is
# "dis", sometimes its "dit", that's why we need to remove this 2 letters
# from the final answer
if word[0] == 's' or word[0] == 't':
word = word[1:]
# Random format from the base answer
return createBaseAnswers(word)
2022-08-07 21:55:48 +02:00
2022-08-07 17:24:42 +02:00
def getFriends(client: Client, users: list[str]) -> list:
"""Get all friends of choosen users"""
2022-08-07 17:24:42 +02:00
friends_list = []
# Get IDs of the user's friends
for user in users:
2022-08-07 17:24:42 +02:00
user_id = client.get_user(username=user, user_auth=True).data.id
2022-08-07 21:55:48 +02:00
friends_list.extend(client.get_users_following(
id=user_id, user_auth=True))
2022-08-07 17:24:42 +02:00
return friends_list[0]
2022-08-07 21:55:48 +02:00
2022-08-06 01:50:53 +02:00
def createBaseTrigger(lists: list[list]) -> list:
"""Merges all given lists into one"""
listing = []
for liste in lists:
listing.extend(liste)
return list(set(listing))
2022-08-07 21:55:48 +02:00
def createBaseAnswers(word: str) -> list:
"""Generates default answers for a given word"""
2022-08-07 15:42:03 +02:00
irritating_word = [
"lol",
"👀",
"XD",
]
return [
2022-08-08 09:59:20 +02:00
# Assuming the first element of this list is always word, don't change it
2022-08-07 15:42:03 +02:00
word,
f"({word})",
word.upper(),
f"{word} {choice(irritating_word)}",
f"{word}...",
]
2022-08-07 21:55:48 +02:00
def createClient(consumer_key, consumer_secret, access_token, access_token_secret) -> Client:
"""Create a client for the Twitter API v2"""
client = Client(
consumer_key=consumer_key,
consumer_secret=consumer_secret,
access_token=access_token,
access_token_secret=access_token_secret
)
2021-08-03 14:17:00 +02:00
2021-08-05 17:04:20 +02:00
if keys["VERBOSE"]:
try:
client.get_me().data.username
2022-08-07 21:55:48 +02:00
print(
f"Authentification réussie en tant que @{client.get_me().data.username}.\n")
2022-08-07 20:16:07 +02:00
# Compte ignorés
if keys['WHITELIST'] == []:
whitelist = "Aucun"
else:
whitelist = f"@{', @'.join(keys['WHITELIST'])}"
print(f"Liste des comptes ignorés : {whitelist}.")
# Compte forcés
if keys['FORCELIST'] == []:
forcelist = "Aucun"
else:
forcelist = f"@{', @'.join(keys['FORCELIST'])}"
print(f"Liste des comptes forcés : {forcelist}.")
# Compte aux following suivis
if keys['PSEUDOS'] == []:
pseudos = "Aucun"
else:
pseudos = f"@{', @'.join(keys['PSEUDOS'])}"
2022-08-07 21:55:48 +02:00
print(
f"Les comptes suivis par ces comptes sont traqués : {pseudos}.\n")
2022-08-07 20:16:07 +02:00
2022-08-07 21:55:48 +02:00
print(
"Notez que si un compte est dans la whitelist, il sera dans tout les cas ignoré.\n")
2021-08-05 17:04:20 +02:00
except:
print("Erreur d'authentification.")
exit(1)
2021-11-02 10:39:51 +01:00
return client
2022-08-07 21:55:48 +02:00
2022-08-07 21:14:23 +02:00
def create_rules(tracked_users: list[str]) -> list[str]:
"""Create rules for tracking users, by respecting the twitter API policies"""
rules = []
2022-08-08 09:41:09 +02:00
# Repeating rules
2022-08-08 09:58:23 +02:00
repeat = "-is:retweet ("
2022-08-08 09:41:09 +02:00
# Buffer
buffer = repeat
2022-08-07 21:50:59 +02:00
# Track users
2022-08-07 21:14:23 +02:00
for user in tracked_users:
# Check if the rule don't exceeds the maximum length of a rule (512)
# 5 is len of "from:"
2022-08-08 09:58:23 +02:00
# 1 is len for closing parenthesis
if len(buffer) + len(user) + 5 + 1 > 512:
rules.append(buffer[:-4] + ")")
2022-08-08 09:41:09 +02:00
buffer = repeat
2022-08-08 09:58:23 +02:00
buffer += f"from:{user} OR "
2022-08-07 21:14:23 +02:00
if len(buffer) > 0:
2022-08-08 09:58:23 +02:00
rules.append(buffer[:-4] + ")")
2022-08-07 21:50:59 +02:00
if len(rules) > 25:
raise BufferError("Too much rules.")
2022-08-07 21:14:23 +02:00
return rules
2022-08-07 21:55:48 +02:00
def start():
"""Start the bot"""
2022-08-07 20:16:07 +02:00
client = createClient(
keys["CONSUMER_KEY"],
keys["CONSUMER_SECRET"],
keys["TOKEN"],
keys["TOKEN_SECRET"],
)
# Only track specifics users
# Including users in forcelist and removing users in whitelist
tracked_users = [
i for i in [
user.data["username"] for user in getFriends(client, keys["PSEUDOS"])
] + keys["FORCELIST"] if i not in keys["WHITELIST"]
]
2022-08-07 21:14:23 +02:00
2022-08-07 21:50:59 +02:00
stream = Listener(keys["BEARER_TOKEN"], client)
2022-08-07 21:14:23 +02:00
2022-08-08 09:41:09 +02:00
# Clean rules
2022-08-08 09:48:25 +02:00
old_rules = stream.get_rules()
if (old_rules.data):
2022-08-08 09:58:23 +02:00
stream.delete_rules([rule.id for rule in old_rules.data])
2022-08-08 09:41:09 +02:00
# Add new rules
2022-08-08 09:48:25 +02:00
stream.add_rules([StreamRule(rule)
for rule in create_rules(tracked_users)])
2022-08-08 01:20:35 +02:00
stream.filter(threaded=True, tweet_fields=['author_id'])
2021-08-03 14:17:00 +02:00
2022-08-07 21:55:48 +02:00
2021-08-05 18:43:13 +02:00
if __name__ == "__main__":
2021-08-03 14:17:00 +02:00
"""
2022-08-07 17:05:47 +02:00
TOKEN is the Access Token available in the Authentication Tokens section under the Access Token and Secret sub-heading
TOKEN_SECRET is the Access Token Secret available in the Authentication Tokens section under the Access Token and Secret sub-heading
CONSUMER_KEY is the API Key available in the Consumer Keys section under the API Key and Secret sub-heading
CONSUMER_SECRET is the API Secret Key available in the Consumer Keys section under the API Key and Secret sub-heading
BEARER_TOKEN is the Bearer Token available in the Authentication Tokens section under the Bearer Token sub-heading
2021-08-03 14:17:00 +02:00
--
2022-08-07 17:05:47 +02:00
PSEUDOS is a list of account you want to listen, all of his·er following (guys followed by PSEUDO) will be sniped
WHITELSIT is a list of account who are protected from the bot
FORCELIST is a list of account who are targeted by the bot, if user is in the whitelist, he·r will be ignored
---
VERBOSE enable some debugs log
2021-08-03 14:17:00 +02:00
"""
# Error message
errorMessage = "Une erreur survient !"
# Words who trigger the bot (keys in lowercase)
base = {
2021-08-04 19:40:03 +02:00
"quoi": ["quoi", "koi", "quoient"],
2021-08-06 01:57:53 +02:00
"oui": ["oui", "ui", "wi"],
2021-08-04 18:33:55 +02:00
"non": ["non", "nn"],
2021-08-05 12:32:05 +02:00
"nan": ["nan"],
2022-05-17 22:17:33 +02:00
"hein": ["hein", "1", "un"],
2021-08-06 02:51:36 +02:00
"ci": ["ci", "si"],
2021-08-05 12:32:05 +02:00
"con": ["con"],
"ok": ["ok", "okay", "oké", "k"],
2021-08-05 12:32:05 +02:00
"ouais": ["ouais", "oué"],
"comment": ["comment"],
2021-08-05 20:35:09 +02:00
"mais": ["mais", ""],
2021-08-05 22:16:29 +02:00
"fort": ["fort"],
2021-08-06 01:44:12 +02:00
"coup": ["coup", "cou"],
2021-08-07 01:25:31 +02:00
"ça": ["ça", "sa"],
2021-08-06 01:57:53 +02:00
"bon": ["bon"],
2021-08-07 01:25:31 +02:00
"qui": ["qui", "ki"],
"sur": ["sur", "sûr"],
"pas": ["pas", "pa"],
2021-08-07 10:39:38 +02:00
"ka": ["ka", "kha"],
2022-01-26 10:09:38 +01:00
"fais": ["fais", "fait"],
2022-05-12 15:29:19 +02:00
"tant": ["tant", "temps", "tend", "tends"],
"et": ["et"],
"la": ["la", ""],
"tki": ["tki"],
"moi": ["moi", "mwa"],
2022-07-20 01:53:52 +02:00
"toi": ["toi", "toit"],
"top": ["top"],
2022-07-23 00:59:50 +02:00
"jour": ["jour", "bonjour"],
"ya": ["ya", "y'a"],
2022-08-02 20:52:24 +02:00
"yo": ["yo"],
"ni": ["ni"],
2022-08-02 21:33:46 +02:00
"re": ["re", "reu", "reuh"],
2022-08-05 02:33:23 +02:00
"quand": ["quand", "kan", "qand", "quan"],
"sol": ["sol"],
2022-08-05 02:42:19 +02:00
"vois": ["vois", "voit", "voie", "voi"],
2022-08-07 15:39:38 +02:00
"akhy": ["akhy", "aquis", "aquit"],
2021-08-04 18:30:49 +02:00
}
# Answers for all the triggers (keys in lowercase)
answers = {
"quoi": createBaseAnswers("feur")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("feuse")
+ [
"https://twitter.com/Myshawii/status/1423219640025722880/video/1",
"https://twitter.com/Myshawii/status/1423219684552417281/video/1",
"feur (-isson -ictalope -diatre -uil)",
"https://twitter.com/Myshawii/status/1455469162202075138/video/1",
"https://twitter.com/Myshawii/status/1552026689101860865/video/1",
"https://twitter.com/Myshawii/status/1553112547678720001/photo/1"
],
"oui": createBaseAnswers("stiti")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("fi"),
2021-08-04 18:33:55 +02:00
"non": createBaseAnswers("bril"),
2021-08-05 12:32:05 +02:00
"nan": createBaseAnswers("cy"),
"hein": createBaseAnswers("deux")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("bécile")
+ [
"2"
],
"ci": createBaseAnswers("tron")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("prine"),
"con": createBaseAnswers("combre")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("gelé")
+ createBaseAnswers("pas"),
2021-08-05 12:32:05 +02:00
"ok": createBaseAnswers("sur glace"),
2021-08-05 12:32:05 +02:00
"ouais": createBaseAnswers("stern"),
"comment": createBaseAnswers("tateur")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("tatrice")
+ createBaseAnswers("dant Cousteau"),
2022-08-05 02:33:23 +02:00
"mais": createBaseAnswers("on")
2022-08-07 21:55:48 +02:00
+ [
"on (-dulation)"
],
2022-08-05 02:33:23 +02:00
"fort": createBaseAnswers("boyard")
2022-08-07 21:55:48 +02:00
+ [
"boyard (-ennes)"
],
2021-08-05 22:16:29 +02:00
"coup": createBaseAnswers("teau"),
2022-08-05 02:33:23 +02:00
"ça": createBaseAnswers("perlipopette")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("von")
+ createBaseAnswers("pristi")
+ [
"pristi (-gnasse)"
],
"bon": [
createBaseAnswers("jour"),
createBaseAnswers("soir")
],
"qui": createBaseAnswers("wi")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("mono"),
2021-08-07 01:25:31 +02:00
"sur": createBaseAnswers("prise"),
"pas": createBaseAnswers("nini")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("steur")
+ createBaseAnswers("trimoine")
+ createBaseAnswers("")
+ createBaseAnswers("stis"),
"ka": createBaseAnswers("pitaine")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("pitulation"),
2022-01-26 10:09:38 +01:00
"fais": createBaseAnswers("rtile"),
"tant": createBaseAnswers("gente")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("tation"),
"et": createBaseAnswers("eint")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("ain"),
"la": createBaseAnswers("vabo")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("vande"),
"tki": createBaseAnswers("la"),
"moi": createBaseAnswers("tié")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("sson")
+ createBaseAnswers("sissure"),
"toi": createBaseAnswers("lette")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("ture"),
2022-07-20 01:53:52 +02:00
"top": createBaseAnswers("inambour"),
2022-07-23 00:59:50 +02:00
"jour": createBaseAnswers("nal"),
2022-07-23 00:59:50 +02:00
"ya": createBaseAnswers("hourt"),
"yo": createBaseAnswers("ghourt")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("yo"),
2022-08-02 20:52:24 +02:00
"ni": createBaseAnswers("cotine"),
"re": createBaseAnswers("pas")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("veil")
+ createBaseAnswers("tourne"),
"quand": createBaseAnswers("dide")
2022-08-07 21:55:48 +02:00
+ createBaseAnswers("tal")
+ createBaseAnswers("didat"),
2022-08-05 02:33:23 +02:00
2022-08-05 02:42:19 +02:00
"sol": createBaseAnswers("itaire"),
"vois": createBaseAnswers("ture"),
2022-08-07 15:39:38 +02:00
"akhy": createBaseAnswers("nator"),
2021-08-04 18:30:49 +02:00
}
2021-08-04 19:40:03 +02:00
# List of all the trigger words
universalBase = createBaseTrigger(list(base.values()))
2021-08-04 19:40:03 +02:00
# Loading environment variables
2022-08-07 21:55:48 +02:00
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET",
"BEARER_TOKEN", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
# Start the bot
start()