2021-08-03 14:17:00 +02:00
|
|
|
from dotenv import load_dotenv
|
|
|
|
from os import environ
|
|
|
|
from tweepy import OAuthHandler, API, StreamListener, Stream
|
|
|
|
from re import sub
|
2021-08-03 19:19:30 +02:00
|
|
|
from random import choice
|
2021-08-03 20:27:06 +02:00
|
|
|
from datetime import datetime
|
2021-08-03 19:59:32 +02:00
|
|
|
from pytz import timezone
|
2021-08-04 14:04:52 +02:00
|
|
|
from queue import Queue
|
2021-08-03 14:17:00 +02:00
|
|
|
|
2021-08-04 13:42:14 +02:00
|
|
|
def load(variables) -> dict:
|
|
|
|
"""Load environment variables."""
|
2021-08-03 14:17:00 +02:00
|
|
|
keys = {}
|
|
|
|
load_dotenv() # load .env file
|
|
|
|
for var in variables:
|
|
|
|
try:
|
2021-08-04 21:01:30 +02:00
|
|
|
if var == "VERBOSE":
|
|
|
|
try:
|
|
|
|
res = bool(environ[var])
|
|
|
|
except:
|
|
|
|
res = False
|
|
|
|
else:
|
|
|
|
res = environ[var]
|
2021-08-04 00:04:38 +02:00
|
|
|
if var == "PSEUDOS":
|
|
|
|
res = list(set(res.split(',')) - {""}) # create a list for the channels and remove blank channels and doubles
|
|
|
|
keys[var] = res
|
2021-08-03 14:17:00 +02:00
|
|
|
except KeyError:
|
2021-08-04 18:30:49 +02:00
|
|
|
print(f"Veuillez définir la variable d'environnement {var} (fichier .env supporté)")
|
2021-08-03 14:17:00 +02:00
|
|
|
exit(1)
|
|
|
|
return keys
|
|
|
|
|
|
|
|
class Listener(StreamListener):
|
2021-08-04 14:04:52 +02:00
|
|
|
def __init__(self, api = None, users = None, q = Queue()):
|
2021-08-03 14:17:00 +02:00
|
|
|
super(Listener, self).__init__()
|
2021-08-04 14:04:52 +02:00
|
|
|
self.q = q
|
2021-08-03 14:17:00 +02:00
|
|
|
self.api = api
|
2021-08-04 00:04:38 +02:00
|
|
|
self.listOfFriendsID = getFriendsID(api, users)
|
2021-08-03 14:17:00 +02:00
|
|
|
|
|
|
|
def on_status(self, status):
|
2021-08-03 19:59:32 +02:00
|
|
|
"""Answer to tweets."""
|
2021-08-04 16:00:05 +02:00
|
|
|
if status._json["user"]["id"] in self.listOfFriendsID: # verification of the author of the tweet
|
|
|
|
if seniority(status._json["created_at"]): # verification of the age of the tweet
|
2021-08-04 20:24:22 +02:00
|
|
|
if not hasattr(status, "retweeted_status"): # ignore Retweet
|
|
|
|
try: # retrieve the entire tweet
|
|
|
|
tweet = status.extended_tweet["full_text"]
|
|
|
|
except AttributeError:
|
|
|
|
tweet = status.text
|
|
|
|
# recovery of the last "usable" word of the tweet
|
2021-08-04 21:02:59 +02:00
|
|
|
regex = r"https?:\/\/\S+| +?\?|\?| +?\!| ?\!|-|~|(?<=ui)i+|@\S+|\.+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|…|\^+"
|
2021-08-04 22:07:04 +02:00
|
|
|
tweetText = sub(regex, " ", tweet.lower())
|
2021-08-04 20:24:22 +02:00
|
|
|
lastWord = tweetText.split()[-1:][0]
|
2021-08-04 21:01:30 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
print(f"Tweet trouvé de {status._json['user']['screen_name']} (dernier mot : \"{lastWord}\")...", end = " ")
|
2021-08-04 20:24:22 +02:00
|
|
|
if lastWord in universalBase: # check if the last word found is a supported word
|
|
|
|
answer = None
|
|
|
|
for mot in base.items():
|
|
|
|
if lastWord in mot[1]:
|
|
|
|
answer = answers[mot[0]]
|
|
|
|
if answer == None:
|
2021-08-04 21:01:30 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
print(f"{errorMessage} Aucune réponse trouvée.")
|
2021-08-04 20:24:22 +02:00
|
|
|
else:
|
2021-08-04 21:01:30 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
print(f"Envoie d'un {answer[0]}...", end = " ")
|
2021-08-04 20:24:22 +02:00
|
|
|
try: # send answer
|
|
|
|
self.api.update_status(status = choice(answer), in_reply_to_status_id = status._json["id"], auto_populate_reply_metadata = True)
|
|
|
|
print(f"{status._json['user']['screen_name']} s'est fait {answer[0]} !")
|
|
|
|
except Exception as error:
|
|
|
|
print(f"\n{errorMessage} {error}")
|
2021-08-04 18:30:49 +02:00
|
|
|
else:
|
2021-08-04 21:01:30 +02:00
|
|
|
if keys["VERBOSE"]:
|
|
|
|
print("Annulation car le dernier mot n'est pas intéressant.")
|
2021-08-04 14:05:20 +02:00
|
|
|
|
2021-08-04 14:04:52 +02:00
|
|
|
def do_stuff(self):
|
|
|
|
while True:
|
|
|
|
self.q.get()
|
|
|
|
self.q.task_done()
|
2021-08-03 19:59:32 +02:00
|
|
|
|
2021-08-04 13:42:14 +02:00
|
|
|
def getFriendsID(api, users: list) -> list:
|
|
|
|
"""Get all friends of choosen users."""
|
2021-08-04 00:04:38 +02:00
|
|
|
liste = []
|
|
|
|
for user in users:
|
|
|
|
liste.extend(api.friends_ids(user))
|
2021-08-04 01:06:16 +02:00
|
|
|
return list(set(liste))
|
2021-08-04 00:04:38 +02:00
|
|
|
|
2021-08-04 13:42:14 +02:00
|
|
|
def seniority(date: str) -> bool:
|
|
|
|
"""Return True only if the given string date is less than one day old."""
|
2021-08-03 19:59:32 +02:00
|
|
|
datetimeObject = datetime.strptime(date, '%a %b %d %H:%M:%S +0000 %Y') # Convert String format to datetime format
|
|
|
|
datetimeObject = datetimeObject.replace(tzinfo = timezone('UTC')) # Twitter give us an UTC time
|
|
|
|
age = datetime.now(timezone('UTC')) - datetimeObject # Time now in UTC minus the time we got to get the age of the date
|
|
|
|
return False if age.days >= 1 else True # False if older than a day
|
2021-08-03 14:17:00 +02:00
|
|
|
|
2021-08-04 13:42:14 +02:00
|
|
|
def permute(array: list) -> list:
|
|
|
|
"""Retrieves all possible combinations for the given list and returns the result as a list."""
|
2021-08-03 23:33:53 +02:00
|
|
|
quoiListe = []
|
2021-08-03 21:19:22 +02:00
|
|
|
|
|
|
|
for text in array: # all element of the list
|
|
|
|
n = len(text)
|
|
|
|
mx = 1 << n # Number of permutations is 2^n
|
|
|
|
text = text.lower() # Converting string to lower case
|
|
|
|
|
|
|
|
for i in range(mx): # Using all subsequences and permuting them
|
|
|
|
combination = [k for k in text]
|
|
|
|
for j in range(n):
|
|
|
|
if (((i >> j) & 1) == 1): # If j-th bit is set, we convert it to upper case
|
|
|
|
combination[j] = text[j].upper()
|
|
|
|
|
|
|
|
temp = ""
|
|
|
|
for i in combination:
|
|
|
|
temp += i
|
2021-08-03 23:33:53 +02:00
|
|
|
quoiListe.append(temp)
|
|
|
|
return quoiListe
|
2021-08-03 21:19:22 +02:00
|
|
|
|
2021-08-04 18:30:49 +02:00
|
|
|
def createBaseTrigger(lists) -> list:
|
2021-08-04 16:00:05 +02:00
|
|
|
"""Merges all given lists into one."""
|
2021-08-04 13:42:14 +02:00
|
|
|
listing = []
|
|
|
|
for liste in lists:
|
|
|
|
listing.extend(liste)
|
|
|
|
return list(set(listing))
|
|
|
|
|
|
|
|
def createBaseAnswers(word) -> list:
|
2021-08-04 16:00:05 +02:00
|
|
|
"""Generates default answers for a given word."""
|
2021-08-04 15:45:11 +02:00
|
|
|
return [word, f"({word})", word.upper(), f"{word} lol"]
|
2021-08-04 13:42:14 +02:00
|
|
|
|
2021-08-04 00:04:38 +02:00
|
|
|
def main(accessToken: str, accessTokenSecret: str, consumerKey: str, consumerSecret: str, users: list):
|
2021-08-03 14:17:00 +02:00
|
|
|
"""Main method."""
|
|
|
|
auth = OAuthHandler(consumerKey, consumerSecret)
|
|
|
|
auth.set_access_token(accessToken, accessTokenSecret)
|
|
|
|
|
2021-08-03 23:11:17 +02:00
|
|
|
api = API(auth_handler = auth, wait_on_rate_limit = True)
|
2021-08-03 14:17:00 +02:00
|
|
|
|
2021-08-04 00:04:38 +02:00
|
|
|
listener = Listener(api, users)
|
2021-08-03 19:19:30 +02:00
|
|
|
stream = Stream(auth = api.auth, listener = listener)
|
|
|
|
|
2021-08-04 00:04:38 +02:00
|
|
|
print(f"Scroll sur Twitter avec les abonnements de @{', @'.join(users)}...")
|
2021-08-04 14:08:10 +02:00
|
|
|
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, is_async = True)
|
2021-08-03 14:17:00 +02:00
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
"""
|
2021-08-03 19:19:30 +02:00
|
|
|
TOKEN is the Access Token available in the Authentication Tokens section under Access Token and Secret sub-heading.
|
|
|
|
TOKEN_SECRET is the Access Token Secret available in the Authentication Tokens section under Access Token and Secret sub-heading.
|
|
|
|
CONSUMER_KEY is the API Key available in the Consumer Keys section.
|
|
|
|
CONSUMER_SECRET is the API Secret Key available in the Consumer Keys section.
|
2021-08-03 14:17:00 +02:00
|
|
|
--
|
2021-08-03 23:11:17 +02:00
|
|
|
PSEUDO is the PSEUDO of the account you want to listen to snipe.
|
2021-08-03 14:17:00 +02:00
|
|
|
"""
|
2021-08-04 18:30:49 +02:00
|
|
|
errorMessage = "Une erreur survient !" # error message
|
2021-08-04 13:42:14 +02:00
|
|
|
|
2021-08-04 19:40:03 +02:00
|
|
|
# words to detect in lowercase
|
2021-08-04 18:30:49 +02:00
|
|
|
base = {
|
2021-08-04 19:40:03 +02:00
|
|
|
"quoi": ["quoi", "koi", "quoient"],
|
2021-08-04 18:30:49 +02:00
|
|
|
"oui": ["oui", "ui"],
|
2021-08-04 18:33:55 +02:00
|
|
|
"non": ["non", "nn"],
|
|
|
|
"nan": ["nan"]
|
2021-08-04 18:30:49 +02:00
|
|
|
}
|
2021-08-04 13:42:14 +02:00
|
|
|
|
|
|
|
# creation of answers
|
2021-08-04 18:30:49 +02:00
|
|
|
answers = {
|
|
|
|
"quoi": createBaseAnswers("feur") + [
|
|
|
|
"https://twitter.com/shukuzi62/status/1422611919538724868/video/1",
|
|
|
|
"feur (-isson)",
|
|
|
|
"https://twitter.com/antoinelae/status/1422943594403581957/video/1"
|
|
|
|
],
|
|
|
|
"oui": createBaseAnswers("stiti"),
|
2021-08-04 18:33:55 +02:00
|
|
|
"non": createBaseAnswers("bril"),
|
|
|
|
"nan": createBaseAnswers("cy")
|
2021-08-04 18:30:49 +02:00
|
|
|
}
|
2021-08-04 19:40:03 +02:00
|
|
|
|
|
|
|
# creation of a list of all the words (only lowercase)
|
|
|
|
universalBase = createBaseTrigger(list(base.values()))
|
|
|
|
|
|
|
|
# creation of a list of all the words (upper and lower case)
|
|
|
|
triggerWords = permute(universalBase)
|
|
|
|
|
2021-08-04 13:42:14 +02:00
|
|
|
# loading environment variables and launching the bot
|
2021-08-04 21:01:30 +02:00
|
|
|
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE"])
|
2021-08-04 00:04:38 +02:00
|
|
|
main(keys["TOKEN"], keys["TOKEN_SECRET"], keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"], keys["PSEUDOS"])
|