This repository has been archived on 2022-08-14. You can view files and clone it, but cannot push or open issues or pull requests.
feurBot/main.py

444 lines
16 KiB
Python
Raw Normal View History

2021-08-03 14:17:00 +02:00
from dotenv import load_dotenv
from os import environ
from tweepy import OAuthHandler, API, StreamListener, Stream
from re import sub, findall
2021-08-03 19:19:30 +02:00
from random import choice
2021-08-03 20:27:06 +02:00
from datetime import datetime
2021-08-03 19:59:32 +02:00
from pytz import timezone
2021-08-04 14:04:52 +02:00
from queue import Queue
2021-08-07 03:12:51 +02:00
from json import loads
2021-08-03 14:17:00 +02:00
def load(variables) -> dict:
"""Load environment variables"""
2021-08-03 14:17:00 +02:00
keys = {}
load_dotenv() # load .env file
for var in variables:
try:
2021-08-05 20:26:25 +02:00
if var == "VERBOSE": # check is VERBOSE is set
try:
res = bool(environ[var])
except:
2021-08-05 20:26:25 +02:00
res = False # if not its False
elif var == "WHITELIST": # check if WHITELIST is set
try:
res = list(set(environ[var].split(",")) - {""})
except:
res = [] # if not its an empty list
elif var == "FORCELIST": # check if FORCELIST is set
try:
res = list(set(environ[var].split(",")) - {""})
except:
res = [] # if not its an empty list
else:
res = environ[var]
if var == "PSEUDOS":
2021-08-05 18:43:13 +02:00
res = list(set(res.split(",")) - {""}) # create a list for the channels and remove blank channels and doubles
keys[var] = res
2021-08-03 14:17:00 +02:00
except KeyError:
2021-08-04 18:30:49 +02:00
print(f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)")
2021-08-03 14:17:00 +02:00
exit(1)
return keys
def cleanTweet(tweet: str) -> str:
"""Remove all unwanted elements from the tweet"""
# Convert to lower case
tweet = tweet.lower()
# Remove URLs
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet)
# Check all hashtags
hashtagMatch = findall(r"#\S+", tweet)
# If less than 3
if len(hashtagMatch) < 3:
# Remove them
tweet = sub(r"#\S+", " ", tweet)
2021-08-05 19:52:54 +02:00
else:
# Too much hashtags in the tweet -> so ignore it
return ""
# Remove usernames
tweet = sub(r"@\S+", " ", tweet)
# Remove everything who isn't a letter/number/space
tweet = sub(r" *?[^\w\s]+", " ", tweet)
# Remove element of the word only if the last syllable can be matched
# (so more words will be answered without adding them manually)
tweet = sub(r"\S+(?=si|ci)", " ", tweet)
# Remove key smashing in certains words
# uiii naaaan quoiiii noooon heiiin siiii
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
2021-11-02 10:39:51 +01:00
return tweet.strip()
2021-08-03 14:17:00 +02:00
class Listener(StreamListener):
"""Watch for tweets that match criteria in real-time"""
def __init__(self, api: API = None, users: list = None, forcelist: list = None, q = Queue()):
2021-08-03 14:17:00 +02:00
super(Listener, self).__init__()
2021-08-04 14:04:52 +02:00
self.q = q
2021-08-03 14:17:00 +02:00
self.api = api
self.accounts = [users, forcelist]
2022-08-06 00:24:24 +02:00
self.listOfFriendsID = getFriendsID(api, users) + getIDs(api, forcelist)
2021-08-05 18:51:14 +02:00
def on_connect(self):
if self.accounts[1] == []:
forcelist = "Aucun"
else:
forcelist = f"@{', @'.join(self.accounts[1])}"
print(f"Début du scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts[0])} et ces comptes en plus : {forcelist} comme timeline...")
2021-11-02 10:39:51 +01:00
2021-08-05 18:41:30 +02:00
def on_disconnect(notice):
notice = notice["disconnect"]
print(f"Déconnexion (code {notice['code']}).", end = " ")
2021-08-05 18:43:13 +02:00
if len(notice["reason"]) > 0:
2021-08-05 18:41:30 +02:00
print(f"Raison : {notice['reason']}")
2021-08-03 14:17:00 +02:00
def on_status(self, status):
json = status._json
# Verify the author of the tweet
if json["user"]["id"] in self.listOfFriendsID and json["user"]["screen_name"] not in keys["WHITELIST"]:
# Verify the age of the tweet
if seniority(json["created_at"]):
# Verify if the tweet isn't a retweet
if not hasattr(status, "retweeted_status"):
# Fetch the tweet
if "extended_tweet" in json:
tweet = cleanTweet(status.extended_tweet["full_text"])
else:
tweet = cleanTweet(status.text)
# Fetch the last word of the tweet
lastWord = tweet.split()[-1:][0]
if keys["VERBOSE"]:
2021-08-05 19:52:54 +02:00
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ")
# Check if the last word found is a supported word
if lastWord in universalBase:
answer = None
# Fetch an adequate response
for mot in base.items():
if lastWord in mot[1]:
# Handle specific case
2021-08-06 01:53:33 +02:00
if mot[0] == "bon":
# Between 7am and 5pm
if datetime.now().hour in range(7, 17):
2021-08-06 01:53:33 +02:00
answer = answers[mot[0]][0] # jour
else:
answer = answers[mot[0]][1] # soir
else:
answer = answers[mot[0]]
if answer == None:
if keys["VERBOSE"]:
print(f"{errorMessage} Aucune réponse trouvée.")
# If an answer has been found
else:
if keys["VERBOSE"]:
print(f"Envoie d'un {answer[0]}...", end = " ")
try:
# Send the tweet with the answer
self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True)
print(f"{json['user']['screen_name']} s'est fait {answer[0]} !")
except Exception as error:
2021-08-07 03:12:51 +02:00
error = loads(error.response.text)["errors"][0]
# https://developer.twitter.com/en/support/twitter-api/error-troubleshooting
2021-08-07 03:12:51 +02:00
if error["code"] == 385:
error["message"] = "Tweet supprimé ou auteur en privé/bloqué."
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
2021-08-04 18:30:49 +02:00
else:
if keys["VERBOSE"]:
print("Annulation car le dernier mot n'est pas intéressant.")
2021-08-04 14:05:20 +02:00
2021-08-04 14:04:52 +02:00
def do_stuff(self):
"""Loop for the Listener"""
2021-08-04 14:04:52 +02:00
while True:
self.q.get()
self.q.task_done()
2021-08-03 19:59:32 +02:00
def on_error(self, status_code):
print(f"{errorMessage[:-2]} ({status_code}) !", end = " ")
if status_code == 413:
2021-08-05 18:30:43 +02:00
if keys["VERBOSE"]:
print("La liste des mots est trop longue (triggerWords).")
elif status_code == 420:
2021-08-05 18:30:43 +02:00
if keys["VERBOSE"]:
print("Déconnecter du flux.")
else:
print("\n")
return False
2022-08-06 01:50:53 +02:00
def getFriendsID(api: API, users: list[str]) -> list:
"""Get all friends of choosen users"""
liste = []
# Get IDs of the user's friends
for user in users:
liste.extend(api.friends_ids(user))
2021-08-04 01:06:16 +02:00
return list(set(liste))
2022-08-06 01:50:53 +02:00
def getIDs(api: API, users: list[str]) -> list:
2022-08-06 00:24:24 +02:00
"""Get all the ID of users"""
liste = []
# Get IDs of the users
2022-08-06 00:24:24 +02:00
for user in users:
liste.append(api.get_user(user)._json["id"])
return list(set(liste))
def seniority(date: str) -> bool:
"""Return True only if the given string date is less than one day old"""
# Convert string format to datetime format
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y")
# Twitter give us an UTC time
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC"))
# time now in UTC minus the time we got to get the age of the date
age = datetime.now(timezone("UTC")) - datetimeObject
# False if older than a day, else True
return False if age.days >= 1 else True
2022-08-06 01:50:53 +02:00
def generateWords(array: list[str]) -> list:
"""
Retrieves all possible combinations for the given list and returns the result as a list
2022-08-06 01:50:53 +02:00
This is used for the filter in the stream (before calling the Listener::on_status)
"""
2021-08-03 23:33:53 +02:00
quoiListe = []
for text in array:
# Add all combinations
# Example for 'oui': ['OUI', 'OUi', 'OuI', 'Oui', 'oUI', 'oUi', 'ouI', 'oui']
2022-08-06 01:50:53 +02:00
#
# -> Depends on: from itertools import product
# -> Problem : Create a too long list (+1000 words, max is 400)
# -> Cf. https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
#
# quoiListe.extend(list(map(''.join, product(*zip(text.upper(), text.lower())))))
if text.lower() not in quoiListe:
# Word in lowercase
quoiListe.append(text.lower())
if text.upper() not in quoiListe:
# Word in uppercase
quoiListe.append(text.upper())
if text.capitalize() not in quoiListe:
# Word capitalized
quoiListe.append(text.capitalize())
2021-08-03 23:33:53 +02:00
return quoiListe
2022-08-06 01:50:53 +02:00
def createBaseTrigger(lists: list[list]) -> list:
"""Merges all given lists into one"""
listing = []
for liste in lists:
listing.extend(liste)
return list(set(listing))
def createBaseAnswers(word: str) -> list:
"""Generates default answers for a given word"""
return [word, f"({word})", word.upper(), f"{word} lol", f"{word} 👀"]
def start():
"""Start the bot"""
auth = OAuthHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"])
auth.set_access_token(keys["TOKEN"], keys["TOKEN_SECRET"])
2021-11-02 10:39:51 +01:00
2021-08-03 23:11:17 +02:00
api = API(auth_handler = auth, wait_on_rate_limit = True)
2021-08-03 14:17:00 +02:00
2021-08-05 17:04:20 +02:00
if keys["VERBOSE"]:
try:
api.verify_credentials()
print(f"Authentification réussie en tant que", end = " ")
except:
print("Erreur d'authentification.")
exit(1)
print(f"@{api.me()._json['screen_name']}.")
2021-11-02 10:39:51 +01:00
2021-08-05 20:26:25 +02:00
if keys['WHITELIST'] == []:
whitelist = "Aucun"
else:
whitelist = f"@{', @'.join(keys['WHITELIST'])}"
2021-08-05 21:47:44 +02:00
print(f"Liste des comptes ignorés : {whitelist}.")
2021-08-05 17:04:20 +02:00
2022-08-05 23:26:06 +02:00
listener = Listener(api, keys["PSEUDOS"], keys["FORCELIST"])
2021-08-03 19:19:30 +02:00
stream = Stream(auth = api.auth, listener = listener)
2021-08-04 14:08:10 +02:00
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, is_async = True)
2021-08-03 14:17:00 +02:00
2021-08-05 18:43:13 +02:00
if __name__ == "__main__":
2021-08-03 14:17:00 +02:00
"""
2021-08-03 19:19:30 +02:00
TOKEN is the Access Token available in the Authentication Tokens section under Access Token and Secret sub-heading.
TOKEN_SECRET is the Access Token Secret available in the Authentication Tokens section under Access Token and Secret sub-heading.
CONSUMER_KEY is the API Key available in the Consumer Keys section.
CONSUMER_SECRET is the API Secret Key available in the Consumer Keys section.
2021-08-03 14:17:00 +02:00
--
2021-08-03 23:11:17 +02:00
PSEUDO is the PSEUDO of the account you want to listen to snipe.
2021-08-03 14:17:00 +02:00
"""
# Error message
errorMessage = "Une erreur survient !"
# Words who trigger the bot (keys in lowercase)
base = {
2021-08-04 19:40:03 +02:00
"quoi": ["quoi", "koi", "quoient"],
2021-08-06 01:57:53 +02:00
"oui": ["oui", "ui", "wi"],
2021-08-04 18:33:55 +02:00
"non": ["non", "nn"],
2021-08-05 12:32:05 +02:00
"nan": ["nan"],
2022-05-17 22:17:33 +02:00
"hein": ["hein", "1", "un"],
2021-08-06 02:51:36 +02:00
"ci": ["ci", "si"],
2021-08-05 12:32:05 +02:00
"con": ["con"],
"ok": ["ok", "okay", "oké", "k"],
2021-08-05 12:32:05 +02:00
"ouais": ["ouais", "oué"],
"comment": ["comment"],
2021-08-05 20:35:09 +02:00
"mais": ["mais", ""],
2021-08-05 22:16:29 +02:00
"fort": ["fort"],
2021-08-06 01:44:12 +02:00
"coup": ["coup", "cou"],
2021-08-07 01:25:31 +02:00
"ça": ["ça", "sa"],
2021-08-06 01:57:53 +02:00
"bon": ["bon"],
2021-08-07 01:25:31 +02:00
"qui": ["qui", "ki"],
"sur": ["sur", "sûr"],
"pas": ["pas", "pa"],
2021-08-07 10:39:38 +02:00
"ka": ["ka", "kha"],
2022-01-26 10:09:38 +01:00
"fais": ["fais", "fait"],
2022-05-12 15:29:19 +02:00
"tant": ["tant", "temps", "tend", "tends"],
"et": ["et"],
"la": ["la", ""],
"tki": ["tki"],
"moi": ["moi", "mwa"],
2022-07-20 01:53:52 +02:00
"toi": ["toi", "toit"],
"top": ["top"],
2022-07-23 00:59:50 +02:00
"jour": ["jour", "bonjour"],
"ya": ["ya", "y'a"],
2022-08-02 20:52:24 +02:00
"yo": ["yo"],
"ni": ["ni"],
2022-08-02 21:33:46 +02:00
"re": ["re", "reu", "reuh"],
2022-08-05 02:33:23 +02:00
"quand": ["quand", "kan", "qand", "quan"],
"sol": ["sol"],
2022-08-05 02:42:19 +02:00
"vois": ["vois", "voit", "voie", "voi"],
2021-08-04 18:30:49 +02:00
}
# Answers for all the triggers (keys in lowercase)
answers = {
"quoi": createBaseAnswers("feur")
+ createBaseAnswers("feuse")
+ [
"https://twitter.com/Myshawii/status/1423219640025722880/video/1",
"https://twitter.com/Myshawii/status/1423219684552417281/video/1",
2022-08-05 02:33:23 +02:00
f"{createBaseAnswers('feur')} (-isson -ictalope -diatre -uil)",
"https://twitter.com/Myshawii/status/1455469162202075138/video/1",
"https://twitter.com/Myshawii/status/1552026689101860865/video/1",
"https://twitter.com/Myshawii/status/1553112547678720001/photo/1"
],
"oui": createBaseAnswers("stiti")
+ createBaseAnswers("fi"),
2021-08-04 18:33:55 +02:00
"non": createBaseAnswers("bril"),
2021-08-05 12:32:05 +02:00
"nan": createBaseAnswers("cy"),
"hein": createBaseAnswers("deux")
+ createBaseAnswers("bécile")
+ [
"2"
],
"ci": createBaseAnswers("tron")
+ createBaseAnswers("prine"),
"con": createBaseAnswers("combre")
+ createBaseAnswers("gelé")
+ createBaseAnswers("pas"),
2021-08-05 12:32:05 +02:00
"ok": createBaseAnswers("sur glace"),
2021-08-05 12:32:05 +02:00
"ouais": createBaseAnswers("stern"),
"comment": createBaseAnswers("tateur")
+ createBaseAnswers("tatrice")
+ createBaseAnswers("dant Cousteau"),
2022-08-05 02:33:23 +02:00
"mais": createBaseAnswers("on")
+ [
f"{createBaseAnswers('on')} (-dulation)"
],
2022-08-05 02:33:23 +02:00
"fort": createBaseAnswers("boyard")
+ [
f"{createBaseAnswers('boyard')} (-ennes)"
],
2021-08-05 22:16:29 +02:00
"coup": createBaseAnswers("teau"),
2022-08-05 02:33:23 +02:00
"ça": createBaseAnswers("perlipopette")
+ createBaseAnswers("von")
+ createBaseAnswers("pristi")
+ [
f"{createBaseAnswers('pristi')} (-gnasse)"
],
"bon": [
createBaseAnswers("jour"),
createBaseAnswers("soir")
],
"qui": createBaseAnswers("wi")
+ createBaseAnswers("mono"),
2021-08-07 01:25:31 +02:00
"sur": createBaseAnswers("prise"),
"pas": createBaseAnswers("nini")
+ createBaseAnswers("steur")
+ createBaseAnswers("trimoine")
+ createBaseAnswers("")
+ createBaseAnswers("stis"),
"ka": createBaseAnswers("pitaine")
+ createBaseAnswers("pitulation"),
2022-01-26 10:09:38 +01:00
"fais": createBaseAnswers("rtile"),
"tant": createBaseAnswers("gente")
+ createBaseAnswers("tation"),
"et": createBaseAnswers("eint")
+ createBaseAnswers("ain"),
"la": createBaseAnswers("vabo")
+ createBaseAnswers("vande"),
"tki": createBaseAnswers("la"),
"moi": createBaseAnswers("tié")
+ createBaseAnswers("sson")
+ createBaseAnswers("sissure"),
"toi": createBaseAnswers("lette")
+ createBaseAnswers("ture"),
2022-07-20 01:53:52 +02:00
"top": createBaseAnswers("inambour"),
2022-07-23 00:59:50 +02:00
"jour": createBaseAnswers("nal"),
2022-07-23 00:59:50 +02:00
"ya": createBaseAnswers("hourt"),
"yo": createBaseAnswers("ghourt")
+ createBaseAnswers("yo"),
2022-08-02 20:52:24 +02:00
"ni": createBaseAnswers("cotine"),
"re": createBaseAnswers("pas")
+ createBaseAnswers("veil")
+ createBaseAnswers("tourne"),
"quand": createBaseAnswers("dide")
+ createBaseAnswers("tal")
2022-08-05 02:33:23 +02:00
+ createBaseAnswers("didat"),
2022-08-05 02:42:19 +02:00
"sol": createBaseAnswers("itaire"),
"vois": createBaseAnswers("ture"),
2021-08-04 18:30:49 +02:00
}
2021-08-04 19:40:03 +02:00
# List of all the trigger words
universalBase = createBaseTrigger(list(base.values()))
2021-08-04 19:40:03 +02:00
# List of all the triggers words's variations
triggerWords = generateWords(universalBase)
2021-08-04 19:40:03 +02:00
# Loading environment variables
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
# Start the bot
start()