Compare commits

...

2 commits

Author SHA1 Message Date
37e4e5e874
* fix missings words for the listener-filter
* add types
* update comments
2022-08-06 01:35:32 +02:00
7f1f0fad8b
update python 2022-08-06 01:32:35 +02:00
2 changed files with 103 additions and 58 deletions

View file

@ -1,4 +1,4 @@
FROM python:3.9.5-slim
FROM python:3.10.4-slim
COPY . .
RUN pip install -r requirements.txt

159
main.py
View file

@ -7,9 +7,10 @@ from datetime import datetime
from pytz import timezone
from queue import Queue
from json import loads
from itertools import product
def load(variables) -> dict:
"""Load environment variables."""
"""Load environment variables"""
keys = {}
load_dotenv() # load .env file
for var in variables:
@ -40,33 +41,49 @@ def load(variables) -> dict:
return keys
def cleanTweet(tweet: str) -> str:
"""Remove all unwanted elements from the tweet."""
tweet = tweet.lower() # convert to lower case
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet) # remove URLs
hashtagMatch = findall(r"#\S+", tweet) # check all hashtags
if len(hashtagMatch) < 3: # if less than 3
tweet = sub(r"#\S+", " ", tweet) # remove them
"""Remove all unwanted elements from the tweet"""
# Convert to lower case
tweet = tweet.lower()
# Remove URLs
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet)
# Check all hashtags
hashtagMatch = findall(r"#\S+", tweet)
# If less than 3
if len(hashtagMatch) < 3:
# Remove them
tweet = sub(r"#\S+", " ", tweet)
else:
return "" # too much hashtags, ignoring tweet
tweet = sub(r"@\S+", " ", tweet) # remove usernames
tweet = sub(r" *?[^\w\s]+", " ", tweet) # remove everything who is not a letter or a number or a space
tweet = sub(r"\S+(?=si|ci)", " ", tweet) # remove element of the word only if the last syllable can be matched (so more words will be answered without adding them manually)
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet) # remove key smashing in certains words
# Too much hashtags in the tweet -> so ignore it
return ""
# Remove usernames
tweet = sub(r"@\S+", " ", tweet)
# Remove everything who isn't a letter/number/space
tweet = sub(r" *?[^\w\s]+", " ", tweet)
# Remove element of the word only if the last syllable can be matched
# (so more words will be answered without adding them manually)
tweet = sub(r"\S+(?=si|ci)", " ", tweet)
# Remove key smashing in certains words
# uiii naaaan quoiiii noooon heiiin siiii
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
return tweet.strip()
class Listener(StreamListener):
"""Watch for tweets that match criteria in real-time."""
def __init__(self, api = None, users = None, forcelist = None, q = Queue()):
"""Watch for tweets that match criteria in real-time"""
def __init__(self, api: API = None, users: list = None, forcelist: list = None, q = Queue()):
super(Listener, self).__init__()
self.q = q
self.api = api
self.accounts = users
self.forcelist = forcelist
self.accounts = [users, forcelist]
self.listOfFriendsID = getFriendsID(api, users) + getIDs(api, forcelist)
def on_connect(self):
print(f"Scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts)} comme timeline et ces comptes : @{', @'.join(self.forcelist)}...")
if self.accounts[1] == []:
forcelist = "Aucun"
else:
forcelist = f"@{', @'.join(self.accounts[1])}"
print(f"Début du scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts[0])} et ces comptes en plus : {forcelist} comme timeline...")
def on_disconnect(notice):
notice = notice["disconnect"]
@ -75,23 +92,33 @@ class Listener(StreamListener):
print(f"Raison : {notice['reason']}")
def on_status(self, status):
if status._json["user"]["id"] in self.listOfFriendsID and status._json["user"]["screen_name"] not in keys["WHITELIST"]: # verification of the author of the tweet
if seniority(status._json["created_at"]): # verification of the age of the tweet
if not hasattr(status, "retweeted_status"): # ignore Retweet
if "extended_tweet" in status._json:
json = status._json
# Verify the author of the tweet
if json["user"]["id"] in self.listOfFriendsID and json["user"]["screen_name"] not in keys["WHITELIST"]:
# Verify the age of the tweet
if seniority(json["created_at"]):
# Verify if the tweet isn't a retweet
if not hasattr(status, "retweeted_status"):
# Fetch the tweet
if "extended_tweet" in json:
tweet = cleanTweet(status.extended_tweet["full_text"])
else:
tweet = cleanTweet(status.text)
# Fetch the last word of the tweet
lastWord = tweet.split()[-1:][0]
if keys["VERBOSE"]:
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(f"Tweet trouvé de {status._json['user']['screen_name']} ({infoLastWord})...", end = " ")
if lastWord in universalBase: # check if the last word found is a supported word
print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ")
# Check if the last word found is a supported word
if lastWord in universalBase:
answer = None
# Fetch an adequate response
for mot in base.items():
if lastWord in mot[1]:
# Handle specific case
if mot[0] == "bon":
if datetime.now().hour in range(7, 17): # between 7am and 5pm
# Between 7am and 5pm
if datetime.now().hour in range(7, 17):
answer = answers[mot[0]][0] # jour
else:
answer = answers[mot[0]][1] # soir
@ -100,14 +127,17 @@ class Listener(StreamListener):
if answer == None:
if keys["VERBOSE"]:
print(f"{errorMessage} Aucune réponse trouvée.")
# If an answer has been found
else:
if keys["VERBOSE"]:
print(f"Envoie d'un {answer[0]}...", end = " ")
try: # send answer
self.api.update_status(status = choice(answer), in_reply_to_status_id = status._json["id"], auto_populate_reply_metadata = True)
print(f"{status._json['user']['screen_name']} s'est fait {answer[0]} !")
try:
# Send the tweet with the answer
self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True)
print(f"{json['user']['screen_name']} s'est fait {answer[0]} !")
except Exception as error:
error = loads(error.response.text)["errors"][0]
# https://developer.twitter.com/en/support/twitter-api/error-troubleshooting
if error["code"] == 385:
error["message"] = "Tweet supprimé ou auteur en privé/bloqué."
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
@ -116,6 +146,7 @@ class Listener(StreamListener):
print("Annulation car le dernier mot n'est pas intéressant.")
def do_stuff(self):
"""Loop for the Listener"""
while True:
self.q.get()
self.q.task_done()
@ -132,53 +163,61 @@ class Listener(StreamListener):
print("\n")
return False
def getFriendsID(api, users: list) -> list:
"""Get all friends of choosen users."""
def getFriendsID(api: API, users: list) -> list:
"""Get all friends of choosen users"""
liste = []
# Get IDs of the user's friends
for user in users:
liste.extend(api.friends_ids(user))
return list(set(liste))
def getIDs(api, users: list) -> list:
def getIDs(api: API, users: list) -> list:
"""Get all the ID of users"""
liste = []
# Get IDs of the users
for user in users:
liste.append(api.get_user(user)._json["id"])
return list(set(liste))
def seniority(date: str) -> bool:
"""Return True only if the given string date is less than one day old."""
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") # convert String format to datetime format
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC")) # Twitter give us an UTC time
age = datetime.now(timezone("UTC")) - datetimeObject # time now in UTC minus the time we got to get the age of the date
return False if age.days >= 1 else True # False if older than a day
"""Return True only if the given string date is less than one day old"""
# Convert string format to datetime format
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y")
# Twitter give us an UTC time
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC"))
# time now in UTC minus the time we got to get the age of the date
age = datetime.now(timezone("UTC")) - datetimeObject
# False if older than a day, else True
return False if age.days >= 1 else True
def permute(array: list) -> list:
"""Retrieves all possible combinations for the given list and returns the result as a list."""
def generateWords(array: list) -> list:
"""
Retrieves all possible combinations for the given list and returns the result as a list
This is used for the filter in the stream (before calling the Listener)
"""
quoiListe = []
for text in array: # all element of the list
if text.lower() not in quoiListe:
quoiListe.append(text.lower()) # word fully in lowercase
if text.upper() not in quoiListe:
quoiListe.append(text.upper()) # word fully in uppercase
if text.capitalize() not in quoiListe:
quoiListe.append(text.capitalize()) # word with the first letter in uppercase
for text in array:
# Add all combinations
# Example for 'oui': ['OUI', 'OUi', 'OuI', 'Oui', 'oUI', 'oUi', 'ouI', 'oui']
quoiListe.extend(list(map(''.join, product(*zip(text.upper(), text.lower())))))
return quoiListe
def createBaseTrigger(lists) -> list:
"""Merges all given lists into one."""
def createBaseTrigger(lists: list) -> list:
"""Merges all given lists into one"""
listing = []
for liste in lists:
listing.extend(liste)
return list(set(listing))
def createBaseAnswers(word) -> list:
"""Generates default answers for a given word."""
return [word, f"({word})", word.upper(), f"{word} lol"]
def createBaseAnswers(word: str) -> list:
"""Generates default answers for a given word"""
return [word, f"({word})", word.upper(), f"{word} lol", f"{word} 👀"]
def start():
"""Start the bot."""
"""Start the bot"""
auth = OAuthHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"])
auth.set_access_token(keys["TOKEN"], keys["TOKEN_SECRET"])
@ -212,9 +251,11 @@ if __name__ == "__main__":
--
PSEUDO is the PSEUDO of the account you want to listen to snipe.
"""
errorMessage = "Une erreur survient !" # error message
# Error message
errorMessage = "Une erreur survient !"
base = { # words to detect in lowercase
# Words who trigger the bot (keys in lowercase)
base = {
"quoi": ["quoi", "koi", "quoient"],
"oui": ["oui", "ui", "wi"],
"non": ["non", "nn"],
@ -252,7 +293,8 @@ if __name__ == "__main__":
"vois": ["vois", "voit", "voie", "voi"],
}
answers = { # creation of answers
# Answers for all the triggers (keys in lowercase)
answers = {
"quoi": createBaseAnswers("feur")
+ createBaseAnswers("feuse")
+ [
@ -374,11 +416,14 @@ if __name__ == "__main__":
"vois": createBaseAnswers("ture"),
}
universalBase = createBaseTrigger(list(base.values())) # creation of a list of all the words
# List of all the trigger words
universalBase = createBaseTrigger(list(base.values()))
triggerWords = permute(universalBase) # creation of a list of all the words (upper and lower case)
# List of all the triggers words's variations
triggerWords = generateWords(universalBase)
# Loading environment variables and launching the bot
# Loading environment variables
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
print("") # just a newline
# Start the bot
start()