This repository has been archived on 2022-08-14. You can view files and clone it, but cannot push or open issues or pull requests.
feurBot/main.py
2021-08-04 17:38:15 +02:00

144 lines
6.1 KiB
Python

from dotenv import load_dotenv
from os import environ
from tweepy import OAuthHandler, API, StreamListener, Stream
from re import sub
from random import choice
from datetime import datetime
from pytz import timezone
from queue import Queue
def load(variables) -> dict:
"""Load environment variables."""
keys = {}
load_dotenv() # load .env file
for var in variables:
try:
res = environ[var]
if var == "PSEUDOS":
res = list(set(res.split(',')) - {""}) # create a list for the channels and remove blank channels and doubles
keys[var] = res
except KeyError:
print(f"Please set the environment variable {var} (.env file supported)")
exit(1)
return keys
class Listener(StreamListener):
def __init__(self, api = None, users = None, q = Queue()):
super(Listener, self).__init__()
self.q = q
self.api = api
self.listOfFriendsID = getFriendsID(api, users)
def on_status(self, status):
"""Answer to tweets."""
if status._json["user"]["id"] in self.listOfFriendsID: # verification of the author of the tweet
if seniority(status._json["created_at"]): # verification of the age of the tweet
# recovery of the last "usable" word of the tweet
tweetText = sub(r"https?:\/\/\S+| +?\?|\?| +?\!| ?\!|-|~|(?<=ui)i+|@\S+", "", status._json["text"].lower())
lastWord = tweetText.split()[-1:][0]
if lastWord in universalBase: # check if the last word found is a supported word
if lastWord in quoiBase:
answer = feur
elif lastWord in ouiBase:
answer = stiti
elif lastWord in nonBase:
answer = bril
try: # send answer
self.api.update_status(status = choice(answer), in_reply_to_status_id = status._json["id"], auto_populate_reply_metadata = True)
print(f"{status._json['user']['screen_name']} s'est fait {answer[0]} !")
except Exception as error:
print(f"{errorMessage} {error}")
def do_stuff(self):
while True:
self.q.get()
self.q.task_done()
def getFriendsID(api, users: list) -> list:
"""Get all friends of choosen users."""
liste = []
for user in users:
liste.extend(api.friends_ids(user))
return list(set(liste))
def seniority(date: str) -> bool:
"""Return True only if the given string date is less than one day old."""
datetimeObject = datetime.strptime(date, '%a %b %d %H:%M:%S +0000 %Y') # Convert String format to datetime format
datetimeObject = datetimeObject.replace(tzinfo = timezone('UTC')) # Twitter give us an UTC time
age = datetime.now(timezone('UTC')) - datetimeObject # Time now in UTC minus the time we got to get the age of the date
return False if age.days >= 1 else True # False if older than a day
def permute(array: list) -> list:
"""Retrieves all possible combinations for the given list and returns the result as a list."""
quoiListe = []
for text in array: # all element of the list
n = len(text)
mx = 1 << n # Number of permutations is 2^n
text = text.lower() # Converting string to lower case
for i in range(mx): # Using all subsequences and permuting them
combination = [k for k in text]
for j in range(n):
if (((i >> j) & 1) == 1): # If j-th bit is set, we convert it to upper case
combination[j] = text[j].upper()
temp = ""
for i in combination:
temp += i
quoiListe.append(temp)
return quoiListe
def createBaseTrigger(*lists) -> list:
"""Merges all given lists into one."""
listing = []
for liste in lists:
listing.extend(liste)
return list(set(listing))
def createBaseAnswers(word) -> list:
"""Generates default answers for a given word."""
return [word, f"({word})", word.upper(), f"{word} lol"]
def main(accessToken: str, accessTokenSecret: str, consumerKey: str, consumerSecret: str, users: list):
"""Main method."""
auth = OAuthHandler(consumerKey, consumerSecret)
auth.set_access_token(accessToken, accessTokenSecret)
api = API(auth_handler = auth, wait_on_rate_limit = True)
listener = Listener(api, users)
stream = Stream(auth = api.auth, listener = listener)
print(f"Scroll sur Twitter avec les abonnements de @{', @'.join(users)}...")
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, is_async = True)
if __name__ == '__main__':
"""
TOKEN is the Access Token available in the Authentication Tokens section under Access Token and Secret sub-heading.
TOKEN_SECRET is the Access Token Secret available in the Authentication Tokens section under Access Token and Secret sub-heading.
CONSUMER_KEY is the API Key available in the Consumer Keys section.
CONSUMER_SECRET is the API Secret Key available in the Consumer Keys section.
--
PSEUDO is the PSEUDO of the account you want to listen to snipe.
"""
errorMessage = "Error happens!" # error message
# words to detect
quoiBase = ["quoi", "koi", "quoient"]
ouiBase = ["oui", "ui"]
nonBase = ["non", "nn"]
universalBase = createBaseTrigger(quoiBase, ouiBase, nonBase)
# creation of the list with all alternatives (upper/lower case)
triggerWords = permute(universalBase)
# creation of answers
feur = createBaseAnswers("feur")
feur.extend(["https://twitter.com/shukuzi62/status/1422611919538724868/video/1", "feur (-isson)"]) # add a message in addition to the default answers
stiti = createBaseAnswers("stiti")
bril = createBaseAnswers("bril")
# loading environment variables and launching the bot
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS"])
main(keys["TOKEN"], keys["TOKEN_SECRET"], keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"], keys["PSEUDOS"])