|
|
|
@ -1,6 +1,6 @@
|
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
from os import environ
|
|
|
|
|
from tweepy import OAuthHandler, API, StreamListener, Stream
|
|
|
|
|
from tweepy import OAuth1UserHandler, API, Stream
|
|
|
|
|
from re import sub, findall
|
|
|
|
|
from random import choice
|
|
|
|
|
from datetime import datetime
|
|
|
|
@ -9,7 +9,7 @@ from queue import Queue
|
|
|
|
|
from json import loads
|
|
|
|
|
|
|
|
|
|
def load(variables) -> dict:
|
|
|
|
|
"""Load environment variables."""
|
|
|
|
|
"""Load environment variables"""
|
|
|
|
|
keys = {}
|
|
|
|
|
load_dotenv() # load .env file
|
|
|
|
|
for var in variables:
|
|
|
|
@ -40,58 +40,94 @@ def load(variables) -> dict:
|
|
|
|
|
return keys
|
|
|
|
|
|
|
|
|
|
def cleanTweet(tweet: str) -> str:
|
|
|
|
|
"""Remove all unwanted elements from the tweet."""
|
|
|
|
|
tweet = tweet.lower() # convert to lower case
|
|
|
|
|
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet) # remove URLs
|
|
|
|
|
hashtagMatch = findall(r"#\S+", tweet) # check all hashtags
|
|
|
|
|
if len(hashtagMatch) < 3: # if less than 3
|
|
|
|
|
tweet = sub(r"#\S+", " ", tweet) # remove them
|
|
|
|
|
"""Remove all unwanted elements from the tweet"""
|
|
|
|
|
# Convert to lower case
|
|
|
|
|
tweet = tweet.lower()
|
|
|
|
|
# Remove URLs
|
|
|
|
|
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet)
|
|
|
|
|
# Check all hashtags
|
|
|
|
|
hashtagMatch = findall(r"#\S+", tweet)
|
|
|
|
|
# If less than 3
|
|
|
|
|
if len(hashtagMatch) < 3:
|
|
|
|
|
# Remove them
|
|
|
|
|
tweet = sub(r"#\S+", " ", tweet)
|
|
|
|
|
else:
|
|
|
|
|
return "" # too much hashtags, ignoring tweet
|
|
|
|
|
tweet = sub(r"@\S+", " ", tweet) # remove usernames
|
|
|
|
|
tweet = sub(r" *?[^\w\s]+", " ", tweet) # remove everything who is not a letter or a number or a space
|
|
|
|
|
tweet = sub(r"\S+(?=si|ci)", " ", tweet) # remove element of the word only if the last syllable can be matched (so more words will be answered without adding them manually)
|
|
|
|
|
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet) # remove key smashing in certains words
|
|
|
|
|
# Too much hashtags in the tweet -> so ignore it
|
|
|
|
|
return ""
|
|
|
|
|
# Remove usernames
|
|
|
|
|
tweet = sub(r"@\S+", " ", tweet)
|
|
|
|
|
# Remove everything who isn't a letter/number/space
|
|
|
|
|
tweet = sub(r" *?[^\w\s]+", " ", tweet)
|
|
|
|
|
# Remove element of the word only if the last syllable can be matched
|
|
|
|
|
# (so more words will be answered without adding them manually)
|
|
|
|
|
tweet = sub(r"\S+(?=si|ci)", " ", tweet)
|
|
|
|
|
|
|
|
|
|
# Remove key smashing in certains words
|
|
|
|
|
# uiii naaaan quoiiii noooon heiiin siiii
|
|
|
|
|
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
|
|
|
|
|
|
|
|
|
|
return tweet.strip()
|
|
|
|
|
|
|
|
|
|
class Listener(StreamListener):
|
|
|
|
|
"""Watch for tweets that match criteria in real-time."""
|
|
|
|
|
def __init__(self, api = None, users = None, forcelist = None, q = Queue()):
|
|
|
|
|
super(Listener, self).__init__()
|
|
|
|
|
class Listener(Stream):
|
|
|
|
|
"""Watch for tweets that match criteria in real-time"""
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
consumer_key,
|
|
|
|
|
consumer_secret,
|
|
|
|
|
access_token,
|
|
|
|
|
access_token_secret,
|
|
|
|
|
api: API = None,
|
|
|
|
|
users: list = None,
|
|
|
|
|
forcelist: list = None,
|
|
|
|
|
q = Queue()
|
|
|
|
|
):
|
|
|
|
|
super(Listener, self).__init__(consumer_key, consumer_secret, access_token, access_token_secret)
|
|
|
|
|
self.q = q
|
|
|
|
|
self.api = api
|
|
|
|
|
self.accounts = users
|
|
|
|
|
self.forcelist = forcelist
|
|
|
|
|
self.accounts = [users, forcelist]
|
|
|
|
|
self.listOfFriendsID = getFriendsID(api, users) + getIDs(api, forcelist)
|
|
|
|
|
|
|
|
|
|
def on_connect(self):
|
|
|
|
|
print(f"Scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts)} comme timeline et ces comptes : @{', @'.join(self.forcelist)}...")
|
|
|
|
|
if self.accounts[1] == []:
|
|
|
|
|
forcelist = "Aucun"
|
|
|
|
|
else:
|
|
|
|
|
forcelist = f"@{', @'.join(self.accounts[1])}"
|
|
|
|
|
print(f"Début du scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts[0])} et ces comptes en plus : {forcelist} comme timeline...")
|
|
|
|
|
|
|
|
|
|
def on_disconnect(notice):
|
|
|
|
|
def on_disconnect_message(notice):
|
|
|
|
|
notice = notice["disconnect"]
|
|
|
|
|
print(f"Déconnexion (code {notice['code']}).", end = " ")
|
|
|
|
|
if len(notice["reason"]) > 0:
|
|
|
|
|
print(f"Raison : {notice['reason']}")
|
|
|
|
|
|
|
|
|
|
def on_status(self, status):
|
|
|
|
|
if status._json["user"]["id"] in self.listOfFriendsID and status._json["user"]["screen_name"] not in keys["WHITELIST"]: # verification of the author of the tweet
|
|
|
|
|
if seniority(status._json["created_at"]): # verification of the age of the tweet
|
|
|
|
|
if not hasattr(status, "retweeted_status"): # ignore Retweet
|
|
|
|
|
if "extended_tweet" in status._json:
|
|
|
|
|
json = status._json
|
|
|
|
|
# Verify the author of the tweet
|
|
|
|
|
if json["user"]["id"] in self.listOfFriendsID and json["user"]["screen_name"] not in keys["WHITELIST"]:
|
|
|
|
|
# Verify the age of the tweet
|
|
|
|
|
if seniority(json["created_at"]):
|
|
|
|
|
# Verify if the tweet isn't a retweet
|
|
|
|
|
if not hasattr(status, "retweeted_status"):
|
|
|
|
|
# Fetch the tweet
|
|
|
|
|
if "extended_tweet" in json:
|
|
|
|
|
tweet = cleanTweet(status.extended_tweet["full_text"])
|
|
|
|
|
else:
|
|
|
|
|
tweet = cleanTweet(status.text)
|
|
|
|
|
# Fetch the last word of the tweet
|
|
|
|
|
lastWord = tweet.split()[-1:][0]
|
|
|
|
|
if keys["VERBOSE"]:
|
|
|
|
|
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
|
|
|
|
|
print(f"Tweet trouvé de {status._json['user']['screen_name']} ({infoLastWord})...", end = " ")
|
|
|
|
|
if lastWord in universalBase: # check if the last word found is a supported word
|
|
|
|
|
print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ")
|
|
|
|
|
# Check if the last word found is a supported word
|
|
|
|
|
if lastWord in universalBase:
|
|
|
|
|
answer = None
|
|
|
|
|
# Fetch an adequate response
|
|
|
|
|
for mot in base.items():
|
|
|
|
|
if lastWord in mot[1]:
|
|
|
|
|
# Handle specific case
|
|
|
|
|
if mot[0] == "bon":
|
|
|
|
|
if datetime.now().hour in range(7, 17): # between 7am and 5pm
|
|
|
|
|
# Between 7am and 5pm
|
|
|
|
|
if datetime.now().hour in range(7, 17):
|
|
|
|
|
answer = answers[mot[0]][0] # jour
|
|
|
|
|
else:
|
|
|
|
|
answer = answers[mot[0]][1] # soir
|
|
|
|
@ -100,14 +136,17 @@ class Listener(StreamListener):
|
|
|
|
|
if answer == None:
|
|
|
|
|
if keys["VERBOSE"]:
|
|
|
|
|
print(f"{errorMessage} Aucune réponse trouvée.")
|
|
|
|
|
# If an answer has been found
|
|
|
|
|
else:
|
|
|
|
|
if keys["VERBOSE"]:
|
|
|
|
|
print(f"Envoie d'un {answer[0]}...", end = " ")
|
|
|
|
|
try: # send answer
|
|
|
|
|
self.api.update_status(status = choice(answer), in_reply_to_status_id = status._json["id"], auto_populate_reply_metadata = True)
|
|
|
|
|
print(f"{status._json['user']['screen_name']} s'est fait {answer[0]} !")
|
|
|
|
|
try:
|
|
|
|
|
# Send the tweet with the answer
|
|
|
|
|
self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True)
|
|
|
|
|
print(f"{json['user']['screen_name']} s'est fait {answer[0]} !")
|
|
|
|
|
except Exception as error:
|
|
|
|
|
error = loads(error.response.text)["errors"][0]
|
|
|
|
|
# https://developer.twitter.com/en/support/twitter-api/error-troubleshooting
|
|
|
|
|
if error["code"] == 385:
|
|
|
|
|
error["message"] = "Tweet supprimé ou auteur en privé/bloqué."
|
|
|
|
|
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
|
|
|
|
@ -116,11 +155,12 @@ class Listener(StreamListener):
|
|
|
|
|
print("Annulation car le dernier mot n'est pas intéressant.")
|
|
|
|
|
|
|
|
|
|
def do_stuff(self):
|
|
|
|
|
"""Loop for the Listener"""
|
|
|
|
|
while True:
|
|
|
|
|
self.q.get()
|
|
|
|
|
self.q.task_done()
|
|
|
|
|
|
|
|
|
|
def on_error(self, status_code):
|
|
|
|
|
def on_request_error(self, status_code):
|
|
|
|
|
print(f"{errorMessage[:-2]} ({status_code}) !", end = " ")
|
|
|
|
|
if status_code == 413:
|
|
|
|
|
if keys["VERBOSE"]:
|
|
|
|
@ -132,57 +172,80 @@ class Listener(StreamListener):
|
|
|
|
|
print("\n")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def getFriendsID(api, users: list) -> list:
|
|
|
|
|
"""Get all friends of choosen users."""
|
|
|
|
|
def getFriendsID(api: API, users: list[str]) -> list:
|
|
|
|
|
"""Get all friends of choosen users"""
|
|
|
|
|
liste = []
|
|
|
|
|
# Get IDs of the user's friends
|
|
|
|
|
for user in users:
|
|
|
|
|
liste.extend(api.friends_ids(user))
|
|
|
|
|
liste.extend(api.get_friend_ids(screen_name=user))
|
|
|
|
|
return list(set(liste))
|
|
|
|
|
|
|
|
|
|
def getIDs(api, users: list) -> list:
|
|
|
|
|
def getIDs(api: API, users: list[str]) -> list:
|
|
|
|
|
"""Get all the ID of users"""
|
|
|
|
|
liste = []
|
|
|
|
|
# Get IDs of the users
|
|
|
|
|
for user in users:
|
|
|
|
|
liste.append(api.get_user(user)._json["id"])
|
|
|
|
|
liste.append(api.get_user(screen_name=user)._json["id"])
|
|
|
|
|
return list(set(liste))
|
|
|
|
|
|
|
|
|
|
def seniority(date: str) -> bool:
|
|
|
|
|
"""Return True only if the given string date is less than one day old."""
|
|
|
|
|
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") # convert String format to datetime format
|
|
|
|
|
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC")) # Twitter give us an UTC time
|
|
|
|
|
age = datetime.now(timezone("UTC")) - datetimeObject # time now in UTC minus the time we got to get the age of the date
|
|
|
|
|
return False if age.days >= 1 else True # False if older than a day
|
|
|
|
|
"""Return True only if the given string date is less than one day old"""
|
|
|
|
|
# Convert string format to datetime format
|
|
|
|
|
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y")
|
|
|
|
|
# Twitter give us an UTC time
|
|
|
|
|
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC"))
|
|
|
|
|
# time now in UTC minus the time we got to get the age of the date
|
|
|
|
|
age = datetime.now(timezone("UTC")) - datetimeObject
|
|
|
|
|
# False if older than a day, else True
|
|
|
|
|
return False if age.days >= 1 else True
|
|
|
|
|
|
|
|
|
|
def permute(array: list) -> list:
|
|
|
|
|
"""Retrieves all possible combinations for the given list and returns the result as a list."""
|
|
|
|
|
def generateWords(array: list[str]) -> list:
|
|
|
|
|
"""
|
|
|
|
|
Retrieves all possible combinations for the given list and returns the result as a list
|
|
|
|
|
|
|
|
|
|
This is used for the filter in the stream (before calling the Listener::on_status)
|
|
|
|
|
"""
|
|
|
|
|
quoiListe = []
|
|
|
|
|
|
|
|
|
|
for text in array: # all element of the list
|
|
|
|
|
for text in array:
|
|
|
|
|
# Add all combinations
|
|
|
|
|
# Example for 'oui': ['OUI', 'OUi', 'OuI', 'Oui', 'oUI', 'oUi', 'ouI', 'oui']
|
|
|
|
|
#
|
|
|
|
|
# -> Depends on: from itertools import product
|
|
|
|
|
# -> Problem : Create a too long list (+1000 words, max is 400)
|
|
|
|
|
# -> Cf. https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
|
|
|
|
|
#
|
|
|
|
|
# quoiListe.extend(list(map(''.join, product(*zip(text.upper(), text.lower())))))
|
|
|
|
|
|
|
|
|
|
if text.lower() not in quoiListe:
|
|
|
|
|
quoiListe.append(text.lower()) # word fully in lowercase
|
|
|
|
|
# Word in lowercase
|
|
|
|
|
quoiListe.append(text.lower())
|
|
|
|
|
if text.upper() not in quoiListe:
|
|
|
|
|
quoiListe.append(text.upper()) # word fully in uppercase
|
|
|
|
|
# Word in uppercase
|
|
|
|
|
quoiListe.append(text.upper())
|
|
|
|
|
if text.capitalize() not in quoiListe:
|
|
|
|
|
quoiListe.append(text.capitalize()) # word with the first letter in uppercase
|
|
|
|
|
# Word capitalized
|
|
|
|
|
quoiListe.append(text.capitalize())
|
|
|
|
|
|
|
|
|
|
return quoiListe
|
|
|
|
|
|
|
|
|
|
def createBaseTrigger(lists) -> list:
|
|
|
|
|
"""Merges all given lists into one."""
|
|
|
|
|
def createBaseTrigger(lists: list[list]) -> list:
|
|
|
|
|
"""Merges all given lists into one"""
|
|
|
|
|
listing = []
|
|
|
|
|
for liste in lists:
|
|
|
|
|
listing.extend(liste)
|
|
|
|
|
return list(set(listing))
|
|
|
|
|
|
|
|
|
|
def createBaseAnswers(word) -> list:
|
|
|
|
|
"""Generates default answers for a given word."""
|
|
|
|
|
return [word, f"({word})", word.upper(), f"{word} lol"]
|
|
|
|
|
def createBaseAnswers(word: str) -> list:
|
|
|
|
|
"""Generates default answers for a given word"""
|
|
|
|
|
return [word, f"({word})", word.upper(), f"{word} lol", f"{word} 👀"]
|
|
|
|
|
|
|
|
|
|
def start():
|
|
|
|
|
"""Start the bot."""
|
|
|
|
|
auth = OAuthHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"])
|
|
|
|
|
"""Start the bot"""
|
|
|
|
|
auth = OAuth1UserHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"])
|
|
|
|
|
auth.set_access_token(keys["TOKEN"], keys["TOKEN_SECRET"])
|
|
|
|
|
|
|
|
|
|
api = API(auth_handler = auth, wait_on_rate_limit = True)
|
|
|
|
|
api = API(auth)
|
|
|
|
|
|
|
|
|
|
if keys["VERBOSE"]:
|
|
|
|
|
try:
|
|
|
|
@ -191,7 +254,7 @@ def start():
|
|
|
|
|
except:
|
|
|
|
|
print("Erreur d'authentification.")
|
|
|
|
|
exit(1)
|
|
|
|
|
print(f"@{api.me()._json['screen_name']}.")
|
|
|
|
|
print(f"@{api.verify_credentials().screen_name}.")
|
|
|
|
|
|
|
|
|
|
if keys['WHITELIST'] == []:
|
|
|
|
|
whitelist = "Aucun"
|
|
|
|
@ -199,9 +262,16 @@ def start():
|
|
|
|
|
whitelist = f"@{', @'.join(keys['WHITELIST'])}"
|
|
|
|
|
print(f"Liste des comptes ignorés : {whitelist}.")
|
|
|
|
|
|
|
|
|
|
listener = Listener(api, keys["PSEUDOS"], keys["FORCELIST"])
|
|
|
|
|
stream = Stream(auth = api.auth, listener = listener)
|
|
|
|
|
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, is_async = True)
|
|
|
|
|
stream = Listener(
|
|
|
|
|
consumer_key=keys["CONSUMER_KEY"],
|
|
|
|
|
consumer_secret=keys["CONSUMER_SECRET"],
|
|
|
|
|
access_token=keys["TOKEN"],
|
|
|
|
|
access_token_secret=keys["TOKEN_SECRET"],
|
|
|
|
|
api=api,
|
|
|
|
|
users=keys["PSEUDOS"],
|
|
|
|
|
forcelist=keys["FORCELIST"]
|
|
|
|
|
)
|
|
|
|
|
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, threaded = True)
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
"""
|
|
|
|
@ -212,9 +282,11 @@ if __name__ == "__main__":
|
|
|
|
|
--
|
|
|
|
|
PSEUDO is the PSEUDO of the account you want to listen to snipe.
|
|
|
|
|
"""
|
|
|
|
|
errorMessage = "Une erreur survient !" # error message
|
|
|
|
|
# Error message
|
|
|
|
|
errorMessage = "Une erreur survient !"
|
|
|
|
|
|
|
|
|
|
base = { # words to detect in lowercase
|
|
|
|
|
# Words who trigger the bot (keys in lowercase)
|
|
|
|
|
base = {
|
|
|
|
|
"quoi": ["quoi", "koi", "quoient"],
|
|
|
|
|
"oui": ["oui", "ui", "wi"],
|
|
|
|
|
"non": ["non", "nn"],
|
|
|
|
@ -252,13 +324,14 @@ if __name__ == "__main__":
|
|
|
|
|
"vois": ["vois", "voit", "voie", "voi"],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
answers = { # creation of answers
|
|
|
|
|
# Answers for all the triggers (keys in lowercase)
|
|
|
|
|
answers = {
|
|
|
|
|
"quoi": createBaseAnswers("feur")
|
|
|
|
|
+ createBaseAnswers("feuse")
|
|
|
|
|
+ [
|
|
|
|
|
"https://twitter.com/Myshawii/status/1423219640025722880/video/1",
|
|
|
|
|
"https://twitter.com/Myshawii/status/1423219684552417281/video/1",
|
|
|
|
|
f"{createBaseAnswers('feur')} (-isson -ictalope -diatre -uil)",
|
|
|
|
|
"feur (-isson -ictalope -diatre -uil)",
|
|
|
|
|
"https://twitter.com/Myshawii/status/1455469162202075138/video/1",
|
|
|
|
|
"https://twitter.com/Myshawii/status/1552026689101860865/video/1",
|
|
|
|
|
"https://twitter.com/Myshawii/status/1553112547678720001/photo/1"
|
|
|
|
@ -294,12 +367,12 @@ if __name__ == "__main__":
|
|
|
|
|
|
|
|
|
|
"mais": createBaseAnswers("on")
|
|
|
|
|
+ [
|
|
|
|
|
f"{createBaseAnswers('on')} (-dulation)"
|
|
|
|
|
"on (-dulation)"
|
|
|
|
|
],
|
|
|
|
|
|
|
|
|
|
"fort": createBaseAnswers("boyard")
|
|
|
|
|
+ [
|
|
|
|
|
f"{createBaseAnswers('boyard')} (-ennes)"
|
|
|
|
|
"boyard (-ennes)"
|
|
|
|
|
],
|
|
|
|
|
|
|
|
|
|
"coup": createBaseAnswers("teau"),
|
|
|
|
@ -308,7 +381,7 @@ if __name__ == "__main__":
|
|
|
|
|
+ createBaseAnswers("von")
|
|
|
|
|
+ createBaseAnswers("pristi")
|
|
|
|
|
+ [
|
|
|
|
|
f"{createBaseAnswers('pristi')} (-gnasse)"
|
|
|
|
|
"pristi (-gnasse)"
|
|
|
|
|
],
|
|
|
|
|
|
|
|
|
|
"bon": [
|
|
|
|
@ -374,11 +447,14 @@ if __name__ == "__main__":
|
|
|
|
|
"vois": createBaseAnswers("ture"),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
universalBase = createBaseTrigger(list(base.values())) # creation of a list of all the words
|
|
|
|
|
# List of all the trigger words
|
|
|
|
|
universalBase = createBaseTrigger(list(base.values()))
|
|
|
|
|
|
|
|
|
|
triggerWords = permute(universalBase) # creation of a list of all the words (upper and lower case)
|
|
|
|
|
# List of all the triggers words's variations
|
|
|
|
|
triggerWords = generateWords(universalBase)
|
|
|
|
|
|
|
|
|
|
# Loading environment variables and launching the bot
|
|
|
|
|
# Loading environment variables
|
|
|
|
|
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
|
|
|
|
|
print("") # just a newline
|
|
|
|
|
|
|
|
|
|
# Start the bot
|
|
|
|
|
start()
|
|
|
|
|