update dependencies #2

Merged
Anri merged 7 commits from update/4.x.x into main 2022-08-06 02:56:06 +02:00
4 changed files with 151 additions and 129 deletions

View file

@ -1,54 +0,0 @@
###############################################################
# Setting I use for cleaning up image tags #
# - Running cleanup every week #
# - Keeping 1 tag per image name matching : (?:v.\d+|dev) #
# - Removing tags older than 7 days matching the default : .* #
###############################################################
image: docker:stable
stages:
- build
- push
services:
- docker:dind
before_script:
- echo -n $CI_JOB_TOKEN | docker login -u gitlab-ci-token --password-stdin $CI_REGISTRY
Build:
stage: build
script:
- docker pull $CI_REGISTRY_IMAGE:latest || true
- >
docker build
--pull
--build-arg VCS_REF=$CI_COMMIT_SHA
--build-arg VCS_URL=$CI_PROJECT_URL
--cache-from $CI_REGISTRY_IMAGE:latest
--tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
.
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
Push latest:
variables:
GIT_STRATEGY: none
stage: push
only:
- main
script:
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA $CI_REGISTRY_IMAGE:latest
- docker push $CI_REGISTRY_IMAGE:latest
Push tag:
variables:
GIT_STRATEGY: none
stage: push
only:
- tags
script:
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME

View file

@ -1,4 +1,4 @@
FROM python:3.9.5-slim
FROM python:3.10.4-slim
COPY . .
RUN pip install -r requirements.txt

218
main.py
View file

@ -1,6 +1,6 @@
from dotenv import load_dotenv
from os import environ
from tweepy import OAuthHandler, API, StreamListener, Stream
from tweepy import OAuth1UserHandler, API, Stream
from re import sub, findall
from random import choice
from datetime import datetime
@ -9,7 +9,7 @@ from queue import Queue
from json import loads
def load(variables) -> dict:
"""Load environment variables."""
"""Load environment variables"""
keys = {}
load_dotenv() # load .env file
for var in variables:
@ -40,58 +40,94 @@ def load(variables) -> dict:
return keys
def cleanTweet(tweet: str) -> str:
"""Remove all unwanted elements from the tweet."""
tweet = tweet.lower() # convert to lower case
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet) # remove URLs
hashtagMatch = findall(r"#\S+", tweet) # check all hashtags
if len(hashtagMatch) < 3: # if less than 3
tweet = sub(r"#\S+", " ", tweet) # remove them
"""Remove all unwanted elements from the tweet"""
# Convert to lower case
tweet = tweet.lower()
# Remove URLs
tweet = sub(r"(https?:\/\/\S+|www.\S+)", " ", tweet)
# Check all hashtags
hashtagMatch = findall(r"#\S+", tweet)
# If less than 3
if len(hashtagMatch) < 3:
# Remove them
tweet = sub(r"#\S+", " ", tweet)
else:
return "" # too much hashtags, ignoring tweet
tweet = sub(r"@\S+", " ", tweet) # remove usernames
tweet = sub(r" *?[^\w\s]+", " ", tweet) # remove everything who is not a letter or a number or a space
tweet = sub(r"\S+(?=si|ci)", " ", tweet) # remove element of the word only if the last syllable can be matched (so more words will be answered without adding them manually)
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet) # remove key smashing in certains words
# Too much hashtags in the tweet -> so ignore it
return ""
# Remove usernames
tweet = sub(r"@\S+", " ", tweet)
# Remove everything who isn't a letter/number/space
tweet = sub(r" *?[^\w\s]+", " ", tweet)
# Remove element of the word only if the last syllable can be matched
# (so more words will be answered without adding them manually)
tweet = sub(r"\S+(?=si|ci)", " ", tweet)
# Remove key smashing in certains words
# uiii naaaan quoiiii noooon heiiin siiii
tweet = sub(r"(?<=ui)i+|(?<=na)a+(?<!n)|(?<=quoi)i+|(?<=no)o+(?<!n)|(?<=hei)i+(?<!n)|(?<=si)i+", "", tweet)
return tweet.strip()
class Listener(StreamListener):
"""Watch for tweets that match criteria in real-time."""
def __init__(self, api = None, users = None, forcelist = None, q = Queue()):
super(Listener, self).__init__()
class Listener(Stream):
"""Watch for tweets that match criteria in real-time"""
def __init__(
self,
consumer_key,
consumer_secret,
access_token,
access_token_secret,
api: API = None,
users: list = None,
forcelist: list = None,
q = Queue()
):
super(Listener, self).__init__(consumer_key, consumer_secret, access_token, access_token_secret)
self.q = q
self.api = api
self.accounts = users
self.forcelist = forcelist
self.accounts = [users, forcelist]
self.listOfFriendsID = getFriendsID(api, users) + getIDs(api, forcelist)
def on_connect(self):
print(f"Scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts)} comme timeline et ces comptes : @{', @'.join(self.forcelist)}...")
if self.accounts[1] == []:
forcelist = "Aucun"
else:
forcelist = f"@{', @'.join(self.accounts[1])}"
print(f"Début du scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts[0])} et ces comptes en plus : {forcelist} comme timeline...")
def on_disconnect(notice):
def on_disconnect_message(notice):
notice = notice["disconnect"]
print(f"Déconnexion (code {notice['code']}).", end = " ")
if len(notice["reason"]) > 0:
print(f"Raison : {notice['reason']}")
def on_status(self, status):
if status._json["user"]["id"] in self.listOfFriendsID and status._json["user"]["screen_name"] not in keys["WHITELIST"]: # verification of the author of the tweet
if seniority(status._json["created_at"]): # verification of the age of the tweet
if not hasattr(status, "retweeted_status"): # ignore Retweet
if "extended_tweet" in status._json:
json = status._json
# Verify the author of the tweet
if json["user"]["id"] in self.listOfFriendsID and json["user"]["screen_name"] not in keys["WHITELIST"]:
# Verify the age of the tweet
if seniority(json["created_at"]):
# Verify if the tweet isn't a retweet
if not hasattr(status, "retweeted_status"):
# Fetch the tweet
if "extended_tweet" in json:
tweet = cleanTweet(status.extended_tweet["full_text"])
else:
tweet = cleanTweet(status.text)
# Fetch the last word of the tweet
lastWord = tweet.split()[-1:][0]
if keys["VERBOSE"]:
infoLastWord = f"dernier mot : \"{lastWord}\"" if len(lastWord) > 0 else "tweet ignoré car trop de hashtags"
print(f"Tweet trouvé de {status._json['user']['screen_name']} ({infoLastWord})...", end = " ")
if lastWord in universalBase: # check if the last word found is a supported word
print(f"Tweet trouvé de {json['user']['screen_name']} ({infoLastWord})...", end = " ")
# Check if the last word found is a supported word
if lastWord in universalBase:
answer = None
# Fetch an adequate response
for mot in base.items():
if lastWord in mot[1]:
# Handle specific case
if mot[0] == "bon":
if datetime.now().hour in range(7, 17): # between 7am and 5pm
# Between 7am and 5pm
if datetime.now().hour in range(7, 17):
answer = answers[mot[0]][0] # jour
else:
answer = answers[mot[0]][1] # soir
@ -100,14 +136,17 @@ class Listener(StreamListener):
if answer == None:
if keys["VERBOSE"]:
print(f"{errorMessage} Aucune réponse trouvée.")
# If an answer has been found
else:
if keys["VERBOSE"]:
print(f"Envoie d'un {answer[0]}...", end = " ")
try: # send answer
self.api.update_status(status = choice(answer), in_reply_to_status_id = status._json["id"], auto_populate_reply_metadata = True)
print(f"{status._json['user']['screen_name']} s'est fait {answer[0]} !")
try:
# Send the tweet with the answer
self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True)
print(f"{json['user']['screen_name']} s'est fait {answer[0]} !")
except Exception as error:
error = loads(error.response.text)["errors"][0]
# https://developer.twitter.com/en/support/twitter-api/error-troubleshooting
if error["code"] == 385:
error["message"] = "Tweet supprimé ou auteur en privé/bloqué."
print(f"{errorMessage[:-2]} ({error['code']}) ! {error['message']}")
@ -116,11 +155,12 @@ class Listener(StreamListener):
print("Annulation car le dernier mot n'est pas intéressant.")
def do_stuff(self):
"""Loop for the Listener"""
while True:
self.q.get()
self.q.task_done()
def on_error(self, status_code):
def on_request_error(self, status_code):
print(f"{errorMessage[:-2]} ({status_code}) !", end = " ")
if status_code == 413:
if keys["VERBOSE"]:
@ -132,57 +172,80 @@ class Listener(StreamListener):
print("\n")
return False
def getFriendsID(api, users: list) -> list:
"""Get all friends of choosen users."""
def getFriendsID(api: API, users: list[str]) -> list:
"""Get all friends of choosen users"""
liste = []
# Get IDs of the user's friends
for user in users:
liste.extend(api.friends_ids(user))
liste.extend(api.get_friend_ids(screen_name=user))
return list(set(liste))
def getIDs(api, users: list) -> list:
def getIDs(api: API, users: list[str]) -> list:
"""Get all the ID of users"""
liste = []
# Get IDs of the users
for user in users:
liste.append(api.get_user(user)._json["id"])
liste.append(api.get_user(screen_name=user)._json["id"])
return list(set(liste))
def seniority(date: str) -> bool:
"""Return True only if the given string date is less than one day old."""
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") # convert String format to datetime format
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC")) # Twitter give us an UTC time
age = datetime.now(timezone("UTC")) - datetimeObject # time now in UTC minus the time we got to get the age of the date
return False if age.days >= 1 else True # False if older than a day
"""Return True only if the given string date is less than one day old"""
# Convert string format to datetime format
datetimeObject = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y")
# Twitter give us an UTC time
datetimeObject = datetimeObject.replace(tzinfo = timezone("UTC"))
# time now in UTC minus the time we got to get the age of the date
age = datetime.now(timezone("UTC")) - datetimeObject
# False if older than a day, else True
return False if age.days >= 1 else True
def permute(array: list) -> list:
"""Retrieves all possible combinations for the given list and returns the result as a list."""
def generateWords(array: list[str]) -> list:
"""
Retrieves all possible combinations for the given list and returns the result as a list
This is used for the filter in the stream (before calling the Listener::on_status)
"""
quoiListe = []
for text in array: # all element of the list
for text in array:
# Add all combinations
# Example for 'oui': ['OUI', 'OUi', 'OuI', 'Oui', 'oUI', 'oUi', 'ouI', 'oui']
#
# -> Depends on: from itertools import product
# -> Problem : Create a too long list (+1000 words, max is 400)
# -> Cf. https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
#
# quoiListe.extend(list(map(''.join, product(*zip(text.upper(), text.lower())))))
if text.lower() not in quoiListe:
quoiListe.append(text.lower()) # word fully in lowercase
# Word in lowercase
quoiListe.append(text.lower())
if text.upper() not in quoiListe:
quoiListe.append(text.upper()) # word fully in uppercase
# Word in uppercase
quoiListe.append(text.upper())
if text.capitalize() not in quoiListe:
quoiListe.append(text.capitalize()) # word with the first letter in uppercase
# Word capitalized
quoiListe.append(text.capitalize())
return quoiListe
def createBaseTrigger(lists) -> list:
"""Merges all given lists into one."""
def createBaseTrigger(lists: list[list]) -> list:
"""Merges all given lists into one"""
listing = []
for liste in lists:
listing.extend(liste)
return list(set(listing))
def createBaseAnswers(word) -> list:
"""Generates default answers for a given word."""
return [word, f"({word})", word.upper(), f"{word} lol"]
def createBaseAnswers(word: str) -> list:
"""Generates default answers for a given word"""
return [word, f"({word})", word.upper(), f"{word} lol", f"{word} 👀"]
def start():
"""Start the bot."""
auth = OAuthHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"])
"""Start the bot"""
auth = OAuth1UserHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"])
auth.set_access_token(keys["TOKEN"], keys["TOKEN_SECRET"])
api = API(auth_handler = auth, wait_on_rate_limit = True)
api = API(auth)
if keys["VERBOSE"]:
try:
@ -191,7 +254,7 @@ def start():
except:
print("Erreur d'authentification.")
exit(1)
print(f"@{api.me()._json['screen_name']}.")
print(f"@{api.verify_credentials().screen_name}.")
if keys['WHITELIST'] == []:
whitelist = "Aucun"
@ -199,9 +262,16 @@ def start():
whitelist = f"@{', @'.join(keys['WHITELIST'])}"
print(f"Liste des comptes ignorés : {whitelist}.")
listener = Listener(api, keys["PSEUDOS"], keys["FORCELIST"])
stream = Stream(auth = api.auth, listener = listener)
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, is_async = True)
stream = Listener(
consumer_key=keys["CONSUMER_KEY"],
consumer_secret=keys["CONSUMER_SECRET"],
access_token=keys["TOKEN"],
access_token_secret=keys["TOKEN_SECRET"],
api=api,
users=keys["PSEUDOS"],
forcelist=keys["FORCELIST"]
)
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, threaded = True)
if __name__ == "__main__":
"""
@ -212,9 +282,11 @@ if __name__ == "__main__":
--
PSEUDO is the PSEUDO of the account you want to listen to snipe.
"""
errorMessage = "Une erreur survient !" # error message
# Error message
errorMessage = "Une erreur survient !"
base = { # words to detect in lowercase
# Words who trigger the bot (keys in lowercase)
base = {
"quoi": ["quoi", "koi", "quoient"],
"oui": ["oui", "ui", "wi"],
"non": ["non", "nn"],
@ -252,13 +324,14 @@ if __name__ == "__main__":
"vois": ["vois", "voit", "voie", "voi"],
}
answers = { # creation of answers
# Answers for all the triggers (keys in lowercase)
answers = {
"quoi": createBaseAnswers("feur")
+ createBaseAnswers("feuse")
+ [
"https://twitter.com/Myshawii/status/1423219640025722880/video/1",
"https://twitter.com/Myshawii/status/1423219684552417281/video/1",
f"{createBaseAnswers('feur')} (-isson -ictalope -diatre -uil)",
"feur (-isson -ictalope -diatre -uil)",
"https://twitter.com/Myshawii/status/1455469162202075138/video/1",
"https://twitter.com/Myshawii/status/1552026689101860865/video/1",
"https://twitter.com/Myshawii/status/1553112547678720001/photo/1"
@ -294,12 +367,12 @@ if __name__ == "__main__":
"mais": createBaseAnswers("on")
+ [
f"{createBaseAnswers('on')} (-dulation)"
"on (-dulation)"
],
"fort": createBaseAnswers("boyard")
+ [
f"{createBaseAnswers('boyard')} (-ennes)"
"boyard (-ennes)"
],
"coup": createBaseAnswers("teau"),
@ -308,7 +381,7 @@ if __name__ == "__main__":
+ createBaseAnswers("von")
+ createBaseAnswers("pristi")
+ [
f"{createBaseAnswers('pristi')} (-gnasse)"
"pristi (-gnasse)"
],
"bon": [
@ -374,11 +447,14 @@ if __name__ == "__main__":
"vois": createBaseAnswers("ture"),
}
universalBase = createBaseTrigger(list(base.values())) # creation of a list of all the words
# List of all the trigger words
universalBase = createBaseTrigger(list(base.values()))
triggerWords = permute(universalBase) # creation of a list of all the words (upper and lower case)
# List of all the triggers words's variations
triggerWords = generateWords(universalBase)
# Loading environment variables and launching the bot
# Loading environment variables
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"])
print("") # just a newline
# Start the bot
start()

View file

@ -1,3 +1,3 @@
python-dotenv==0.18.0
tweepy==3.10.0
pytz==2021.1
python-dotenv==0.20.0
tweepy==4.10.0
pytz==2022.1