2021-08-03 14:17:00 +02:00
|
|
|
from dotenv import load_dotenv
|
|
|
|
from os import environ
|
|
|
|
from tweepy import OAuthHandler, API, StreamListener, Stream
|
2021-08-05 18:29:32 +02:00
|
|
|
from re import sub, findall
|
2021-08-03 19:19:30 +02:00
|
|
|
from random import choice
|
2021-08-03 20:27:06 +02:00
|
|
|
from datetime import datetime
|
2021-08-03 19:59:32 +02:00
|
|
|
from pytz import timezone
|
2021-08-04 14:04:52 +02:00
|
|
|
from queue import Queue
|
2021-08-03 14:17:00 +02:00
|
|
|
|
2021-08-04 13:42:14 +02:00
|
|
|
def load(variables) -> dict:
|
|
|
|
"""Load environment variables."""
|
2021-08-03 14:17:00 +02:00
|
|
|
keys = {}
|
|
|
|
load_dotenv() # load .env file
|
|
|
|
for var in variables:
|
|
|
|
try:
|
2021-08-04 21:01:30 +02:00
|
|
|
if var == "VERBOSE":
|
|
|
|
try:
|
|
|
|
res = bool(environ[var])
|
|
|
|
except:
|
|
|
|
res = False
|
|
|
|
else:
|
|
|
|
res = environ[var]
|
2021-08-04 00:04:38 +02:00
|
|
|
if var == "PSEUDOS":
|
2021-08-05 18:43:13 +02:00
|
|
|
res = list(set(res.split(",")) - {""}) # create a list for the channels and remove blank channels and doubles
|
2021-08-04 00:04:38 +02:00
|
|
|
keys[var] = res
|
2021-08-03 14:17:00 +02:00
|
|
|
except KeyError:
|
2021-08-04 18:30:49 +02:00
|
|
|
print(f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)")
|
2021-08-03 14:17:00 +02:00
|
|
|
exit(1)
|
|
|
|
return keys
|
|
|
|
|
2021-08-05 18:29:32 +02:00
|
|
|
def cleanTweet(tweet: str) -> str:
|
|
|
|
"""Remove all unwanted elements from the tweet."""
|
|
|
|
tweet = tweet.lower() # convert to lower case
|
|
|
|
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet) # remove URLs
|
|
|
|
hashtagMatch = findall(r"#\S+", tweet) # check all hashtags
|
|
|
|
if len(hashtagMatch) < 3: # if less than 3
|
|
|
|
tweet = sub(r"#\S+", " ", tweet) # remove them
|
|
|
|
tweet = sub(r"@\S+", " ", tweet) # remove usernames
|
|
|
|
tweet = sub(r" *?[^\w\s]+", " ", tweet) # remove everything who is not a letter or a number or a space
|
|
|
|
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet) # remove key smashing in certains words
|
|
|
|
|
|
|
|
return tweet.strip()
|
|
|
|
|
2021-08-03 14:17:00 +02:00
|
|
|
class Listener(StreamListener):
|
2021-08-05 18:50:33 +02:00
|
|
|
"""Watch for tweets that match criteria in real-time."""
|
2021-08-04 14:04:52 +02:00
|
|
|
def __init__(self, api = None, users = None, q = Queue()):
|
2021-08-03 14:17:00 +02:00
|
|
|
super(Listener, self).__init__()
|
2021-08-04 14:04:52 +02:00
|
|
|
self.q = q
|
2021-08-03 14:17:00 +02:00
|
|
|
self.api = api
|
2021-08-05 18:29:32 +02:00
|
|
|
self.users = users
|
2021-08-04 00:04:38 +02:00
|
|
|
self.listOfFriendsID = getFriendsID(api, users)
|
2021-08-05 18:51:14 +02:00
|
|
|
|
2021-08-05 18:29:32 +02:00
|
|
|
def on_connect(self):
|
2021-08-05 18:47:48 +02:00
|
|
|
print(f"Scroll sur Twitter avec les abonnements de @{', @'.join(self.users)} comme timeline...")
|
2021-08-05 18:29:32 +02:00
|
|
|
|
2021-08-05 18:41:30 +02:00
|
|
|
def on_disconnect(notice):
|
|
|
|
notice = notice["disconnect"]
|
|
|
|
print(f"Déconnexion (code {notice['code']}).", end = " ")
|
2021-08-05 18:43:13 +02:00
|
|
|
if len(notice["reason"]) > 0:
|
2021-08-05 18:41:30 +02:00
|
|
|
print(f"Raison : {notice['reason']}")
|
2021-08-03 14:17:00 +02:00
|
|
|
|
|
|
|
def on_status(self, status):
|
2021-08-04 16:00:05 +02:00
|
|
|
if status._json["user"]["id"] in self.listOfFriendsID: # verification of the author of the tweet
|
|
|
|
if seniority(status._json["created_at"]): # verification of the age of the tweet
|
2021-08-04 20:24:22 +02:00
|
|
|
if not hasattr(status, "retweeted_status"): # ignore Retweet
|
2021-08-05 18:29:32 +02:00
|
|
|
if "extended_tweet" in status._json:
|
|
|
|
tweet = cleanTweet(status.extended_tweet["full_text"])
|
|
|
|
else:
|
|
|
|
tweet = cleanTweet(status.text)
|
|
|
|
lastWord = tweet.split()[-1:][0]
|
2021-08-04 21:01:30 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
print(f"Tweet trouvé de {status._json['user']['screen_name']} (dernier mot : \"{lastWord}\")...", end = " ")
|
2021-08-04 20:24:22 +02:00
|
|
|
if lastWord in universalBase: # check if the last word found is a supported word
|
|
|
|
answer = None
|
|
|
|
for mot in base.items():
|
|
|
|
if lastWord in mot[1]:
|
|
|
|
answer = answers[mot[0]]
|
|
|
|
if answer == None:
|
2021-08-04 21:01:30 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
print(f"{errorMessage} Aucune réponse trouvée.")
|
2021-08-04 20:24:22 +02:00
|
|
|
else:
|
2021-08-04 21:01:30 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
print(f"Envoie d'un {answer[0]}...", end = " ")
|
2021-08-04 20:24:22 +02:00
|
|
|
try: # send answer
|
|
|
|
self.api.update_status(status = choice(answer), in_reply_to_status_id = status._json["id"], auto_populate_reply_metadata = True)
|
|
|
|
print(f"{status._json['user']['screen_name']} s'est fait {answer[0]} !")
|
|
|
|
except Exception as error:
|
|
|
|
print(f"\n{errorMessage} {error}")
|
2021-08-04 18:30:49 +02:00
|
|
|
else:
|
2021-08-04 21:01:30 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
print("Annulation car le dernier mot n'est pas intéressant.")
|
2021-08-04 14:05:20 +02:00
|
|
|
|
2021-08-04 14:04:52 +02:00
|
|
|
def do_stuff(self):
|
|
|
|
while True:
|
|
|
|
self.q.get()
|
|
|
|
self.q.task_done()
|
2021-08-03 19:59:32 +02:00
|
|
|
|
2021-08-05 18:29:32 +02:00
|
|
|
def on_error(self, status_code):
|
|
|
|
print(f"{errorMessage[:-2]} ({status_code}) !", end = " ")
|
|
|
|
if status_code == 413:
|
2021-08-05 18:30:43 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
print("La liste des mots est trop longue (triggerWords).")
|
2021-08-05 18:29:32 +02:00
|
|
|
elif status_code == 420:
|
2021-08-05 18:30:43 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
print("Déconnecter du flux.")
|
2021-08-05 18:29:32 +02:00
|
|
|
else:
|
|
|
|
print("\n")
|
|
|
|
return False
|
|
|
|
|
2021-08-04 13:42:14 +02:00
|
|
|
def getFriendsID(api, users: list) -> list:
|
|
|
|
"""Get all friends of choosen users."""
|
2021-08-04 00:04:38 +02:00
|
|
|
liste = []
|
|
|
|
for user in users:
|
|
|
|
liste.extend(api.friends_ids(user))
|
2021-08-04 01:06:16 +02:00
|
|
|
return list(set(liste))
|
2021-08-04 00:04:38 +02:00
|
|
|
|
2021-08-04 13:42:14 +02:00
|
|
|
def seniority(date: str) -> bool:
|
|
|
|
"""Return True only if the given string date is less than one day old."""
|
2021-08-05 18:43:13 +02:00
|
|
|
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") # Convert String format to datetime format
|
|
|
|
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC")) # Twitter give us an UTC time
|
|
|
|
age = datetime.now(timezone("UTC")) - datetimeObject # Time now in UTC minus the time we got to get the age of the date
|
2021-08-03 19:59:32 +02:00
|
|
|
return False if age.days >= 1 else True # False if older than a day
|
2021-08-03 14:17:00 +02:00
|
|
|
|
2021-08-04 13:42:14 +02:00
|
|
|
def permute(array: list) -> list:
|
|
|
|
"""Retrieves all possible combinations for the given list and returns the result as a list."""
|
2021-08-03 23:33:53 +02:00
|
|
|
quoiListe = []
|
2021-08-03 21:19:22 +02:00
|
|
|
|
|
|
|
for text in array: # all element of the list
|
2021-08-05 18:57:17 +02:00
|
|
|
if text.lower() not in quoiListe:
|
|
|
|
quoiListe.append(text.lower())
|
|
|
|
if text.upper() not in quoiListe:
|
|
|
|
quoiListe.append(text.upper())
|
|
|
|
if text.capitalize() not in quoiListe:
|
|
|
|
quoiListe.append(text.capitalize())
|
2021-08-03 23:33:53 +02:00
|
|
|
return quoiListe
|
2021-08-03 21:19:22 +02:00
|
|
|
|
2021-08-04 18:30:49 +02:00
|
|
|
def createBaseTrigger(lists) -> list:
|
2021-08-04 16:00:05 +02:00
|
|
|
"""Merges all given lists into one."""
|
2021-08-04 13:42:14 +02:00
|
|
|
listing = []
|
|
|
|
for liste in lists:
|
|
|
|
listing.extend(liste)
|
|
|
|
return list(set(listing))
|
|
|
|
|
|
|
|
def createBaseAnswers(word) -> list:
|
2021-08-04 16:00:05 +02:00
|
|
|
"""Generates default answers for a given word."""
|
2021-08-04 15:45:11 +02:00
|
|
|
return [word, f"({word})", word.upper(), f"{word} lol"]
|
2021-08-04 13:42:14 +02:00
|
|
|
|
2021-08-04 00:04:38 +02:00
|
|
|
def main(accessToken: str, accessTokenSecret: str, consumerKey: str, consumerSecret: str, users: list):
|
2021-08-03 14:17:00 +02:00
|
|
|
"""Main method."""
|
|
|
|
auth = OAuthHandler(consumerKey, consumerSecret)
|
|
|
|
auth.set_access_token(accessToken, accessTokenSecret)
|
|
|
|
|
2021-08-03 23:11:17 +02:00
|
|
|
api = API(auth_handler = auth, wait_on_rate_limit = True)
|
2021-08-03 14:17:00 +02:00
|
|
|
|
2021-08-05 17:04:20 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
try:
|
|
|
|
api.verify_credentials()
|
|
|
|
print(f"Authentification réussie en tant que", end = " ")
|
|
|
|
except:
|
|
|
|
print("Erreur d'authentification.")
|
|
|
|
exit(1)
|
|
|
|
print(f"@{api.me()._json['screen_name']}.")
|
|
|
|
|
2021-08-04 00:04:38 +02:00
|
|
|
listener = Listener(api, users)
|
2021-08-03 19:19:30 +02:00
|
|
|
stream = Stream(auth = api.auth, listener = listener)
|
2021-08-04 14:08:10 +02:00
|
|
|
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, is_async = True)
|
2021-08-03 14:17:00 +02:00
|
|
|
|
2021-08-05 18:43:13 +02:00
|
|
|
if __name__ == "__main__":
|
2021-08-03 14:17:00 +02:00
|
|
|
"""
|
2021-08-03 19:19:30 +02:00
|
|
|
TOKEN is the Access Token available in the Authentication Tokens section under Access Token and Secret sub-heading.
|
|
|
|
TOKEN_SECRET is the Access Token Secret available in the Authentication Tokens section under Access Token and Secret sub-heading.
|
|
|
|
CONSUMER_KEY is the API Key available in the Consumer Keys section.
|
|
|
|
CONSUMER_SECRET is the API Secret Key available in the Consumer Keys section.
|
2021-08-03 14:17:00 +02:00
|
|
|
--
|
2021-08-03 23:11:17 +02:00
|
|
|
PSEUDO is the PSEUDO of the account you want to listen to snipe.
|
2021-08-03 14:17:00 +02:00
|
|
|
"""
|
2021-08-04 18:30:49 +02:00
|
|
|
errorMessage = "Une erreur survient !" # error message
|
2021-08-04 13:42:14 +02:00
|
|
|
|
2021-08-04 19:40:03 +02:00
|
|
|
# words to detect in lowercase
|
2021-08-04 18:30:49 +02:00
|
|
|
base = {
|
2021-08-04 19:40:03 +02:00
|
|
|
"quoi": ["quoi", "koi", "quoient"],
|
2021-08-04 18:30:49 +02:00
|
|
|
"oui": ["oui", "ui"],
|
2021-08-04 18:33:55 +02:00
|
|
|
"non": ["non", "nn"],
|
2021-08-05 12:32:05 +02:00
|
|
|
"nan": ["nan"],
|
2021-08-05 18:57:17 +02:00
|
|
|
"hein": ["hein", "1"],
|
2021-08-05 12:32:05 +02:00
|
|
|
"ci": ["ci", "si"],
|
|
|
|
"con": ["con"],
|
2021-08-05 18:57:17 +02:00
|
|
|
"ok": ["ok", "okay", "oké", "k"],
|
2021-08-05 12:32:05 +02:00
|
|
|
"ouais": ["ouais", "oué"],
|
|
|
|
"comment": ["comment"],
|
|
|
|
"mais": ["mais", "mé"]
|
2021-08-04 18:30:49 +02:00
|
|
|
}
|
2021-08-04 13:42:14 +02:00
|
|
|
|
|
|
|
# creation of answers
|
2021-08-04 18:30:49 +02:00
|
|
|
answers = {
|
|
|
|
"quoi": createBaseAnswers("feur") + [
|
2021-08-05 12:32:05 +02:00
|
|
|
"https://twitter.com/Myshawii/status/1423219640025722880/video/1",
|
2021-08-04 18:30:49 +02:00
|
|
|
"feur (-isson)",
|
2021-08-05 12:32:05 +02:00
|
|
|
"https://twitter.com/Myshawii/status/1423219684552417281/video/1",
|
2021-08-04 22:55:38 +02:00
|
|
|
"feur (-issonictalopediatreuil)"
|
2021-08-04 18:30:49 +02:00
|
|
|
],
|
|
|
|
"oui": createBaseAnswers("stiti"),
|
2021-08-04 18:33:55 +02:00
|
|
|
"non": createBaseAnswers("bril"),
|
2021-08-05 12:32:05 +02:00
|
|
|
"nan": createBaseAnswers("cy"),
|
|
|
|
"hein": createBaseAnswers("deux") + [
|
|
|
|
"2"
|
|
|
|
],
|
|
|
|
"ci": createBaseAnswers("tron"),
|
|
|
|
"con": createBaseAnswers("combre"),
|
|
|
|
"ok": createBaseAnswers("sur glace"),
|
|
|
|
"ouais": createBaseAnswers("stern"),
|
|
|
|
"comment": createBaseAnswers("tateur"),
|
|
|
|
"mais": createBaseAnswers("on")
|
2021-08-04 18:30:49 +02:00
|
|
|
}
|
2021-08-04 19:40:03 +02:00
|
|
|
|
|
|
|
# creation of a list of all the words (only lowercase)
|
|
|
|
universalBase = createBaseTrigger(list(base.values()))
|
|
|
|
|
|
|
|
# creation of a list of all the words (upper and lower case)
|
|
|
|
triggerWords = permute(universalBase)
|
2021-08-05 18:57:17 +02:00
|
|
|
print(len(triggerWords))
|
2021-08-04 19:40:03 +02:00
|
|
|
|
2021-08-04 13:42:14 +02:00
|
|
|
# loading environment variables and launching the bot
|
2021-08-04 21:01:30 +02:00
|
|
|
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE"])
|
2021-08-04 00:04:38 +02:00
|
|
|
main(keys["TOKEN"], keys["TOKEN_SECRET"], keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"], keys["PSEUDOS"])
|