track on user and not on keyword basis

This commit is contained in:
Mylloon 2022-08-07 20:16:07 +02:00
parent 4e437ad675
commit f99ab71369
Signed by: Anri
GPG key ID: A82D63DFF8D1317F

235
main.py
View file

@ -74,25 +74,14 @@ class Listener(StreamingClient):
self, self,
bearer_token, bearer_token,
client: Client, client: Client,
users: list = [],
forcelist: list = [],
q = Queue() q = Queue()
): ):
super(Listener, self).__init__(bearer_token) super(Listener, self).__init__(bearer_token)
self.q = q self.q = q
self.client = client self.client = client
self.accounts = {
"users": users,
"forcelist": forcelist
}
self.victim_list = getFriends(client, users) + getIDs(client, forcelist)
def on_connect(self): def on_connect(self):
if self.accounts['forcelist'] == []: print(f"Début du scroll sur Twitter...")
forcelist = "Aucun"
else:
forcelist = f"@{', @'.join(self.accounts['forcelist'])}"
print(f"Début du scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts['users'])} et ces comptes en plus : {forcelist} comme timeline...")
def on_disconnect_message(notice): def on_disconnect_message(notice):
notice = notice["disconnect"] notice = notice["disconnect"]
@ -100,75 +89,73 @@ class Listener(StreamingClient):
if len(notice["reason"]) > 0: if len(notice["reason"]) > 0:
print(f"Raison : {notice['reason']}") print(f"Raison : {notice['reason']}")
def on_status(self, status): def on_tweet(self, tweet):
print(status) print(tweet)
exit(0) exit(0)
json = status._json json = status._json
# Verify the author of the tweet # Verify the age of the tweet
if json["user"]["id"] in self.victim_list and json["user"]["screen_name"] not in keys["WHITELIST"]: if seniority(json["created_at"]):
# Verify the age of the tweet # Verify if the tweet isn't a retweet
if seniority(json["created_at"]): if not hasattr(status, "retweeted_status"):
# Verify if the tweet isn't a retweet # Fetch the tweet
if not hasattr(status, "retweeted_status"): if "extended_tweet" in json:
# Fetch the tweet tweet = cleanTweet(status.extended_tweet["full_text"])
if "extended_tweet" in json: else:
tweet = cleanTweet(status.extended_tweet["full_text"]) tweet = cleanTweet(status.text)
else: # Fetch the last word of the tweet
tweet = cleanTweet(status.text) lastWord = tweet.split()[-1:][0]
# Fetch the last word of the tweet if keys["VERBOSE"]:
lastWord = tweet.split()[-1:][0] infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
if keys["VERBOSE"]: print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ")
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags" # Check if the last word found is a supported word
print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ") if lastWord in universalBase:
# Check if the last word found is a supported word answer = None
if lastWord in universalBase:
answer = None
# Check repetition # Check repetition
repetition = findall(r"di(\S+)", lastWord) repetition = findall(r"di(\S+)", lastWord)
if(len(repetition) > 0): if(len(repetition) > 0):
# We need to repeat something... # We need to repeat something...
answer = repeater(repetition[0]) answer = repeater(repetition[0])
# Fetch an other adequate (better) response # Fetch an other adequate (better) response
for mot in base.items(): for mot in base.items():
if lastWord in mot[1]: if lastWord in mot[1]:
# Handle specific case # Handle specific case
if mot[0] == "bon": if mot[0] == "bon":
# Between 7am and 5pm # Between 7am and 5pm
if datetime.now().hour in range(7, 17): if datetime.now().hour in range(7, 17):
answer = answers[mot[0]][0] # jour answer = answers[mot[0]][0] # jour
else:
answer = answers[mot[0]][1] # soir
else: else:
# Normal answer answer = answers[mot[0]][1] # soir
answer = answers[mot[0]] else:
if answer == None: # Normal answer
if keys["VERBOSE"]: answer = answers[mot[0]]
print(f"{errorMessage} Aucune réponse trouvée.") if answer == None:
# If an answer has been found if keys["VERBOSE"]:
else: print(f"{errorMessage} Aucune réponse trouvée.")
if keys["VERBOSE"]: # If an answer has been found
print(f"Envoie d'un {answer[0]}...", end = " ")
try:
# Send the tweet with the answer
# TODO: Update Twitter API V2
self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True)
print(f"{json['user']['screen_name']} s'est fait {answer[0]} !")
except Exception as error:
error = loads(error.response.text)["errors"][0]
# https://developer.twitter.com/en/support/twitter-api/error-troubleshooting
show_error = True
if error["code"] == 385:
error["message"] = f"Tweet supprimé ou auteur ({json['user']['screen_name']}) en privé/bloqué."
show_error = False
# Show error only if relevant, always in verbose
if show_error or keys["VERBOSE"]:
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
else: else:
if keys["VERBOSE"]: if keys["VERBOSE"]:
print("Annulation car le dernier mot n'est pas intéressant.") print(f"Envoie d'un {answer[0]}...", end = " ")
try:
# Send the tweet with the answer
# TODO: Update Twitter API V2
self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True)
print(f"{json['user']['screen_name']} s'est fait {answer[0]} !")
except Exception as error:
error = loads(error.response.text)["errors"][0]
# https://developer.twitter.com/en/support/twitter-api/error-troubleshooting
show_error = True
if error["code"] == 385:
error["message"] = f"Tweet supprimé ou auteur ({json['user']['screen_name']}) en privé/bloqué."
show_error = False
# Show error only if relevant, always in verbose
if show_error or keys["VERBOSE"]:
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
else:
if keys["VERBOSE"]:
print("Annulation car le dernier mot n'est pas intéressant.")
def do_stuff(self): def do_stuff(self):
"""Loop for the Listener""" """Loop for the Listener"""
@ -178,10 +165,7 @@ class Listener(StreamingClient):
def on_request_error(self, status_code): def on_request_error(self, status_code):
print(f"{errorMessage[:-2]} ({status_code}) !", end = " ") print(f"{errorMessage[:-2]} ({status_code}) !", end = " ")
if status_code == 413: if status_code == 420:
if keys["VERBOSE"]:
print("La liste des mots est trop longue (triggerWords).")
elif status_code == 420:
if keys["VERBOSE"]: if keys["VERBOSE"]:
print("Déconnecter du flux.") print("Déconnecter du flux.")
else: else:
@ -228,36 +212,6 @@ def seniority(date: str) -> bool:
# False if older than a day, else True # False if older than a day, else True
return False if age.days >= 1 else True return False if age.days >= 1 else True
def generateWords(array: list[str]) -> list:
"""
Retrieves all possible combinations for the given list and returns the result as a list
This is used for the filter in the stream (before calling the Listener::on_status)
"""
quoiListe = []
for text in array:
# Add all combinations
# Example for 'oui': ['OUI', 'OUi', 'OuI', 'Oui', 'oUI', 'oUi', 'ouI', 'oui']
#
# -> Depends on: from itertools import product
# -> Problem : Create a too long list (+1000 words, max is 400)
# -> Cf. https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
#
# quoiListe.extend(list(map(''.join, product(*zip(text.upper(), text.lower())))))
if text.lower() not in quoiListe:
# Word in lowercase
quoiListe.append(text.lower())
if text.upper() not in quoiListe:
# Word in uppercase
quoiListe.append(text.upper())
if text.capitalize() not in quoiListe:
# Word capitalized
quoiListe.append(text.capitalize())
return quoiListe
def createBaseTrigger(lists: list[list]) -> list: def createBaseTrigger(lists: list[list]) -> list:
"""Merges all given lists into one""" """Merges all given lists into one"""
listing = [] listing = []
@ -293,38 +247,58 @@ def createClient(consumer_key, consumer_secret, access_token, access_token_secre
if keys["VERBOSE"]: if keys["VERBOSE"]:
try: try:
client.get_me().data.username client.get_me().data.username
print(f"Authentification réussie en tant que @{client.get_me().data.username}.", end = " ") print(f"Authentification réussie en tant que @{client.get_me().data.username}.\n")
# Compte ignorés
if keys['WHITELIST'] == []:
whitelist = "Aucun"
else:
whitelist = f"@{', @'.join(keys['WHITELIST'])}"
print(f"Liste des comptes ignorés : {whitelist}.")
# Compte forcés
if keys['FORCELIST'] == []:
forcelist = "Aucun"
else:
forcelist = f"@{', @'.join(keys['FORCELIST'])}"
print(f"Liste des comptes forcés : {forcelist}.")
# Compte aux following suivis
if keys['PSEUDOS'] == []:
pseudos = "Aucun"
else:
pseudos = f"@{', @'.join(keys['PSEUDOS'])}"
print(f"Les comptes suivis par ces comptes sont traqués : {pseudos}.\n")
print("Notez que si un compte est dans la whiteliste, il sera dans tout les cas ignoré.\n")
except: except:
print("Erreur d'authentification.") print("Erreur d'authentification.")
exit(1) exit(1)
if keys['WHITELIST'] == []:
whitelist = "Aucun"
else:
whitelist = f"@{', @'.join(keys['WHITELIST'])}"
print(f"Liste des comptes ignorés : {whitelist}.")
return client return client
def start(): def start():
"""Start the bot""" """Start the bot"""
stream = Listener( client = createClient(
bearer_token=keys["BEARER_TOKEN"], keys["CONSUMER_KEY"],
client=createClient( keys["CONSUMER_SECRET"],
keys["CONSUMER_KEY"], keys["TOKEN"],
keys["CONSUMER_SECRET"], keys["TOKEN_SECRET"],
keys["TOKEN"],
keys["TOKEN_SECRET"],
),
users=keys["PSEUDOS"],
forcelist=keys["FORCELIST"],
) )
# TODO: Break in v2 API : max 25 rule (https://docs.tweepy.org/en/stable/streamrule.html#tweepy.StreamRule) stream = Listener(
# Need research, is it possible to run the stream without filter and still catching everything we want? bearer_token=keys["BEARER_TOKEN"],
client=client,
)
# Only track specifics words # Only track specifics users
stream.add_rules([StreamRule(word) for word in triggerWords]) # Including users in forcelist and removing users in whitelist
tracked_users = [
i for i in [
user.data["username"] for user in getFriends(client, keys["PSEUDOS"])
] + keys["FORCELIST"] if i not in keys["WHITELIST"]
]
stream.add_rules(StreamRule(''.join([f'from:{word} OR ' for word in tracked_users])[:-3]))
stream.filter(threaded=True) stream.filter(threaded=True)
if __name__ == "__main__": if __name__ == "__main__":
@ -512,9 +486,6 @@ if __name__ == "__main__":
# List of all the trigger words # List of all the trigger words
universalBase = createBaseTrigger(list(base.values())) universalBase = createBaseTrigger(list(base.values()))
# List of all the triggers words's variations
triggerWords = generateWords(universalBase)
# Loading environment variables # Loading environment variables
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "BEARER_TOKEN", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"]) keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "BEARER_TOKEN", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])