update dependencies #2

Merged
Anri merged 7 commits from update/4.x.x into main 2022-08-06 02:56:06 +02:00
4 changed files with 151 additions and 129 deletions

View file

@ -1,54 +0,0 @@
###############################################################
# Setting I use for cleaning up image tags #
# - Running cleanup every week #
# - Keeping 1 tag per image name matching : (?:v.\d+|dev) #
# - Removing tags older than 7 days matching the default : .* #
###############################################################
image: docker:stable
stages:
- build
- push
services:
- docker:dind
before_script:
- echo -n $CI_JOB_TOKEN | docker login -u gitlab-ci-token --password-stdin $CI_REGISTRY
Build:
stage: build
script:
- docker pull $CI_REGISTRY_IMAGE:latest || true
- >
docker build
--pull
--build-arg VCS_REF=$CI_COMMIT_SHA
--build-arg VCS_URL=$CI_PROJECT_URL
--cache-from $CI_REGISTRY_IMAGE:latest
--tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
.
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
Push latest:
variables:
GIT_STRATEGY: none
stage: push
only:
- main
script:
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA $CI_REGISTRY_IMAGE:latest
- docker push $CI_REGISTRY_IMAGE:latest
Push tag:
variables:
GIT_STRATEGY: none
stage: push
only:
- tags
script:
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME

View file

@ -1,4 +1,4 @@
FROM python:3.9.5-slim FROM python:3.10.4-slim
COPY . . COPY . .
RUN pip install -r requirements.txt RUN pip install -r requirements.txt

218
main.py
View file

@ -1,6 +1,6 @@
from dotenv import load_dotenv from dotenv import load_dotenv
from os import environ from os import environ
from tweepy import OAuthHandler, API, StreamListener, Stream from tweepy import OAuth1UserHandler, API, Stream
from re import sub, findall from re import sub, findall
from random import choice from random import choice
from datetime import datetime from datetime import datetime
@ -9,7 +9,7 @@ from queue import Queue
from json import loads from json import loads
def load(variables) -> dict: def load(variables) -> dict:
"""Load environment variables.""" """Load environment variables"""
keys = {} keys = {}
load_dotenv() # load .env file load_dotenv() # load .env file
for var in variables: for var in variables:
@ -40,58 +40,94 @@ def load(variables) -> dict:
return keys return keys
def cleanTweet(tweet: str) -> str: def cleanTweet(tweet: str) -> str:
"""Remove all unwanted elements from the tweet.""" """Remove all unwanted elements from the tweet"""
tweet = tweet.lower() # convert to lower case # Convert to lower case
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet) # remove URLs tweet = tweet.lower()
hashtagMatch = findall(r"#\S+", tweet) # check all hashtags # Remove URLs
if len(hashtagMatch) < 3: # if less than 3 tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet)
tweet = sub(r"#\S+", " ", tweet) # remove them # Check all hashtags
hashtagMatch = findall(r"#\S+", tweet)
# If less than 3
if len(hashtagMatch) < 3:
# Remove them
tweet = sub(r"#\S+", " ", tweet)
else: else:
return "" # too much hashtags, ignoring tweet # Too much hashtags in the tweet -> so ignore it
tweet = sub(r"@\S+", " ", tweet) # remove usernames return ""
tweet = sub(r" *?[^\w\s]+", " ", tweet) # remove everything who is not a letter or a number or a space # Remove usernames
tweet = sub(r"\S+(?=si|ci)", " ", tweet) # remove element of the word only if the last syllable can be matched (so more words will be answered without adding them manually) tweet = sub(r"@\S+", " ", tweet)
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet) # remove key smashing in certains words # Remove everything who isn't a letter/number/space
tweet = sub(r" *?[^\w\s]+", " ", tweet)
# Remove element of the word only if the last syllable can be matched
# (so more words will be answered without adding them manually)
tweet = sub(r"\S+(?=si|ci)", " ", tweet)
# Remove key smashing in certains words
# uiii naaaan quoiiii noooon heiiin siiii
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
return tweet.strip() return tweet.strip()
class Listener(StreamListener): class Listener(Stream):
"""Watch for tweets that match criteria in real-time.""" """Watch for tweets that match criteria in real-time"""
def __init__(self, api = None, users = None, forcelist = None, q = Queue()): def __init__(
super(Listener, self).__init__() self,
consumer_key,
consumer_secret,
access_token,
access_token_secret,
api: API = None,
users: list = None,
forcelist: list = None,
q = Queue()
):
super(Listener, self).__init__(consumer_key, consumer_secret, access_token, access_token_secret)
self.q = q self.q = q
self.api = api self.api = api
self.accounts = users self.accounts = [users, forcelist]
self.forcelist = forcelist
self.listOfFriendsID = getFriendsID(api, users) + getIDs(api, forcelist) self.listOfFriendsID = getFriendsID(api, users) + getIDs(api, forcelist)
def on_connect(self): def on_connect(self):
print(f"Scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts)} comme timeline et ces comptes : @{', @'.join(self.forcelist)}...") if self.accounts[1] == []:
forcelist = "Aucun"
else:
forcelist = f"@{', @'.join(self.accounts[1])}"
print(f"Début du scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts[0])} et ces comptes en plus : {forcelist} comme timeline...")
def on_disconnect(notice): def on_disconnect_message(notice):
notice = notice["disconnect"] notice = notice["disconnect"]
print(f"Déconnexion (code {notice['code']}).", end = " ") print(f"Déconnexion (code {notice['code']}).", end = " ")
if len(notice["reason"]) > 0: if len(notice["reason"]) > 0:
print(f"Raison : {notice['reason']}") print(f"Raison : {notice['reason']}")
def on_status(self, status): def on_status(self, status):
if status._json["user"]["id"] in self.listOfFriendsID and status._json["user"]["screen_name"] not in keys["WHITELIST"]: # verification of the author of the tweet json = status._json
if seniority(status._json["created_at"]): # verification of the age of the tweet # Verify the author of the tweet
if not hasattr(status, "retweeted_status"): # ignore Retweet if json["user"]["id"] in self.listOfFriendsID and json["user"]["screen_name"] not in keys["WHITELIST"]:
if "extended_tweet" in status._json: # Verify the age of the tweet
if seniority(json["created_at"]):
# Verify if the tweet isn't a retweet
if not hasattr(status, "retweeted_status"):
# Fetch the tweet
if "extended_tweet" in json:
tweet = cleanTweet(status.extended_tweet["full_text"]) tweet = cleanTweet(status.extended_tweet["full_text"])
else: else:
tweet = cleanTweet(status.text) tweet = cleanTweet(status.text)
# Fetch the last word of the tweet
lastWord = tweet.split()[-1:][0] lastWord = tweet.split()[-1:][0]
if keys["VERBOSE"]: if keys["VERBOSE"]:
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags" infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(f"Tweet trouvé de {status._json['user']['screen_name']} ({infoLastWord})...", end = " ") print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ")
if lastWord in universalBase: # check if the last word found is a supported word # Check if the last word found is a supported word
if lastWord in universalBase:
answer = None answer = None
# Fetch an adequate response
for mot in base.items(): for mot in base.items():
if lastWord in mot[1]: if lastWord in mot[1]:
# Handle specific case
if mot[0] == "bon": if mot[0] == "bon":
if datetime.now().hour in range(7, 17): # between 7am and 5pm # Between 7am and 5pm
if datetime.now().hour in range(7, 17):
answer = answers[mot[0]][0] # jour answer = answers[mot[0]][0] # jour
else: else:
answer = answers[mot[0]][1] # soir answer = answers[mot[0]][1] # soir
@ -100,14 +136,17 @@ class Listener(StreamListener):
if answer == None: if answer == None:
if keys["VERBOSE"]: if keys["VERBOSE"]:
print(f"{errorMessage} Aucune réponse trouvée.") print(f"{errorMessage} Aucune réponse trouvée.")
# If an answer has been found
else: else:
if keys["VERBOSE"]: if keys["VERBOSE"]:
print(f"Envoie d'un {answer[0]}...", end = " ") print(f"Envoie d'un {answer[0]}...", end = " ")
try: # send answer try:
self.api.update_status(status = choice(answer), in_reply_to_status_id = status._json["id"], auto_populate_reply_metadata = True) # Send the tweet with the answer
print(f"{status._json['user']['screen_name']} s'est fait {answer[0]} !") self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True)
print(f"{json['user']['screen_name']} s'est fait {answer[0]} !")
except Exception as error: except Exception as error:
error = loads(error.response.text)["errors"][0] error = loads(error.response.text)["errors"][0]
# https://developer.twitter.com/en/support/twitter-api/error-troubleshooting
if error["code"] == 385: if error["code"] == 385:
error["message"] = "Tweet supprimé ou auteur en privé/bloqué." error["message"] = "Tweet supprimé ou auteur en privé/bloqué."
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}") print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
@ -116,11 +155,12 @@ class Listener(StreamListener):
print("Annulation car le dernier mot n'est pas intéressant.") print("Annulation car le dernier mot n'est pas intéressant.")
def do_stuff(self): def do_stuff(self):
"""Loop for the Listener"""
while True: while True:
self.q.get() self.q.get()
self.q.task_done() self.q.task_done()
def on_error(self, status_code): def on_request_error(self, status_code):
print(f"{errorMessage[:-2]} ({status_code}) !", end = " ") print(f"{errorMessage[:-2]} ({status_code}) !", end = " ")
if status_code == 413: if status_code == 413:
if keys["VERBOSE"]: if keys["VERBOSE"]:
@ -132,57 +172,80 @@ class Listener(StreamListener):
print("\n") print("\n")
return False return False
def getFriendsID(api, users: list) -> list: def getFriendsID(api: API, users: list[str]) -> list:
"""Get all friends of choosen users.""" """Get all friends of choosen users"""
liste = [] liste = []
# Get IDs of the user's friends
for user in users: for user in users:
liste.extend(api.friends_ids(user)) liste.extend(api.get_friend_ids(screen_name=user))
return list(set(liste)) return list(set(liste))
def getIDs(api, users: list) -> list: def getIDs(api: API, users: list[str]) -> list:
"""Get all the ID of users""" """Get all the ID of users"""
liste = [] liste = []
# Get IDs of the users
for user in users: for user in users:
liste.append(api.get_user(user)._json["id"]) liste.append(api.get_user(screen_name=user)._json["id"])
return list(set(liste)) return list(set(liste))
def seniority(date: str) -> bool: def seniority(date: str) -> bool:
"""Return True only if the given string date is less than one day old.""" """Return True only if the given string date is less than one day old"""
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") # convert String format to datetime format # Convert string format to datetime format
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC")) # Twitter give us an UTC time datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y")
age = datetime.now(timezone("UTC")) - datetimeObject # time now in UTC minus the time we got to get the age of the date # Twitter give us an UTC time
return False if age.days >= 1 else True # False if older than a day datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC"))
# time now in UTC minus the time we got to get the age of the date
age = datetime.now(timezone("UTC")) - datetimeObject
# False if older than a day, else True
return False if age.days >= 1 else True
def permute(array: list) -> list: def generateWords(array: list[str]) -> list:
"""Retrieves all possible combinations for the given list and returns the result as a list.""" """
Retrieves all possible combinations for the given list and returns the result as a list
This is used for the filter in the stream (before calling the Listener::on_status)
"""
quoiListe = [] quoiListe = []
for text in array: # all element of the list for text in array:
# Add all combinations
# Example for 'oui': ['OUI', 'OUi', 'OuI', 'Oui', 'oUI', 'oUi', 'ouI', 'oui']
#
# -> Depends on: from itertools import product
# -> Problem : Create a too long list (+1000 words, max is 400)
# -> Cf. https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
#
# quoiListe.extend(list(map(''.join, product(*zip(text.upper(), text.lower())))))
if text.lower() not in quoiListe: if text.lower() not in quoiListe:
quoiListe.append(text.lower()) # word fully in lowercase # Word in lowercase
quoiListe.append(text.lower())
if text.upper() not in quoiListe: if text.upper() not in quoiListe:
quoiListe.append(text.upper()) # word fully in uppercase # Word in uppercase
quoiListe.append(text.upper())
if text.capitalize() not in quoiListe: if text.capitalize() not in quoiListe:
quoiListe.append(text.capitalize()) # word with the first letter in uppercase # Word capitalized
quoiListe.append(text.capitalize())
return quoiListe return quoiListe
def createBaseTrigger(lists) -> list: def createBaseTrigger(lists: list[list]) -> list:
"""Merges all given lists into one.""" """Merges all given lists into one"""
listing = [] listing = []
for liste in lists: for liste in lists:
listing.extend(liste) listing.extend(liste)
return list(set(listing)) return list(set(listing))
def createBaseAnswers(word) -> list: def createBaseAnswers(word: str) -> list:
"""Generates default answers for a given word.""" """Generates default answers for a given word"""
return [word, f"({word})", word.upper(), f"{word} lol"] return [word, f"({word})", word.upper(), f"{word} lol", f"{word} 👀"]
def start(): def start():
"""Start the bot.""" """Start the bot"""
auth = OAuthHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"]) auth = OAuth1UserHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"])
auth.set_access_token(keys["TOKEN"], keys["TOKEN_SECRET"]) auth.set_access_token(keys["TOKEN"], keys["TOKEN_SECRET"])
api = API(auth_handler = auth, wait_on_rate_limit = True) api = API(auth)
if keys["VERBOSE"]: if keys["VERBOSE"]:
try: try:
@ -191,7 +254,7 @@ def start():
except: except:
print("Erreur d'authentification.") print("Erreur d'authentification.")
exit(1) exit(1)
print(f"@{api.me()._json['screen_name']}.") print(f"@{api.verify_credentials().screen_name}.")
if keys['WHITELIST'] == []: if keys['WHITELIST'] == []:
whitelist = "Aucun" whitelist = "Aucun"
@ -199,9 +262,16 @@ def start():
whitelist = f"@{', @'.join(keys['WHITELIST'])}" whitelist = f"@{', @'.join(keys['WHITELIST'])}"
print(f"Liste des comptes ignorés : {whitelist}.") print(f"Liste des comptes ignorés : {whitelist}.")
listener = Listener(api, keys["PSEUDOS"], keys["FORCELIST"]) stream = Listener(
stream = Stream(auth = api.auth, listener = listener) consumer_key=keys["CONSUMER_KEY"],
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, is_async = True) consumer_secret=keys["CONSUMER_SECRET"],
access_token=keys["TOKEN"],
access_token_secret=keys["TOKEN_SECRET"],
api=api,
users=keys["PSEUDOS"],
forcelist=keys["FORCELIST"]
)
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, threaded = True)
if __name__ == "__main__": if __name__ == "__main__":
""" """
@ -212,9 +282,11 @@ if __name__ == "__main__":
-- --
PSEUDO is the PSEUDO of the account you want to listen to snipe. PSEUDO is the PSEUDO of the account you want to listen to snipe.
""" """
errorMessage = "Une erreur survient !" # error message # Error message
errorMessage = "Une erreur survient !"
base = { # words to detect in lowercase # Words who trigger the bot (keys in lowercase)
base = {
"quoi": ["quoi", "koi", "quoient"], "quoi": ["quoi", "koi", "quoient"],
"oui": ["oui", "ui", "wi"], "oui": ["oui", "ui", "wi"],
"non": ["non", "nn"], "non": ["non", "nn"],
@ -252,13 +324,14 @@ if __name__ == "__main__":
"vois": ["vois", "voit", "voie", "voi"], "vois": ["vois", "voit", "voie", "voi"],
} }
answers = { # creation of answers # Answers for all the triggers (keys in lowercase)
answers = {
"quoi": createBaseAnswers("feur") "quoi": createBaseAnswers("feur")
+ createBaseAnswers("feuse") + createBaseAnswers("feuse")
+ [ + [
"https://twitter.com/Myshawii/status/1423219640025722880/video/1", "https://twitter.com/Myshawii/status/1423219640025722880/video/1",
"https://twitter.com/Myshawii/status/1423219684552417281/video/1", "https://twitter.com/Myshawii/status/1423219684552417281/video/1",
f"{createBaseAnswers('feur')} (-isson -ictalope -diatre -uil)", "feur (-isson -ictalope -diatre -uil)",
"https://twitter.com/Myshawii/status/1455469162202075138/video/1", "https://twitter.com/Myshawii/status/1455469162202075138/video/1",
"https://twitter.com/Myshawii/status/1552026689101860865/video/1", "https://twitter.com/Myshawii/status/1552026689101860865/video/1",
"https://twitter.com/Myshawii/status/1553112547678720001/photo/1" "https://twitter.com/Myshawii/status/1553112547678720001/photo/1"
@ -294,12 +367,12 @@ if __name__ == "__main__":
"mais": createBaseAnswers("on") "mais": createBaseAnswers("on")
+ [ + [
f"{createBaseAnswers('on')} (-dulation)" "on (-dulation)"
], ],
"fort": createBaseAnswers("boyard") "fort": createBaseAnswers("boyard")
+ [ + [
f"{createBaseAnswers('boyard')} (-ennes)" "boyard (-ennes)"
], ],
"coup": createBaseAnswers("teau"), "coup": createBaseAnswers("teau"),
@ -308,7 +381,7 @@ if __name__ == "__main__":
+ createBaseAnswers("von") + createBaseAnswers("von")
+ createBaseAnswers("pristi") + createBaseAnswers("pristi")
+ [ + [
f"{createBaseAnswers('pristi')} (-gnasse)" "pristi (-gnasse)"
], ],
"bon": [ "bon": [
@ -374,11 +447,14 @@ if __name__ == "__main__":
"vois": createBaseAnswers("ture"), "vois": createBaseAnswers("ture"),
} }
universalBase = createBaseTrigger(list(base.values())) # creation of a list of all the words # List of all the trigger words
universalBase = createBaseTrigger(list(base.values()))
triggerWords = permute(universalBase) # creation of a list of all the words (upper and lower case) # List of all the triggers words's variations
triggerWords = generateWords(universalBase)
# Loading environment variables and launching the bot # Loading environment variables
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"]) keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
print("") # just a newline
# Start the bot
start() start()

View file

@ -1,3 +1,3 @@
python-dotenv==0.18.0 python-dotenv==0.20.0
tweepy==3.10.0 tweepy==4.10.0
pytz==2021.1 pytz==2022.1