wip: add bearer token, migrate from Stream to StreamingClient, add some TODOs to complete

This commit is contained in:
Mylloon 2022-08-07 16:47:10 +02:00
parent 756016ae8d
commit 0874d5211d
Signed by: Anri
GPG key ID: A82D63DFF8D1317F

87
main.py
View file

@ -1,6 +1,6 @@
from dotenv import load_dotenv from dotenv import load_dotenv
from os import environ from os import environ
from tweepy import OAuth1UserHandler, API, Stream from tweepy import StreamingClient, Client, StreamRule
from re import sub, findall from re import sub, findall
from random import choice from random import choice
from datetime import datetime from datetime import datetime
@ -68,31 +68,31 @@ def cleanTweet(tweet: str) -> str:
return tweet.strip() return tweet.strip()
class Listener(Stream): class Listener(StreamingClient):
"""Watch for tweets that match criteria in real-time""" """Watch for tweets that match criteria in real-time"""
def __init__( def __init__(
self, self,
consumer_key, bearer_token,
consumer_secret, client: Client,
access_token, users: list = [],
access_token_secret, forcelist: list = [],
api: API = None,
users: list = None,
forcelist: list = None,
q = Queue() q = Queue()
): ):
super(Listener, self).__init__(consumer_key, consumer_secret, access_token, access_token_secret) super(Listener, self).__init__(bearer_token)
self.q = q self.q = q
self.api = api self.client = client
self.accounts = [users, forcelist] self.accounts = {
self.listOfFriendsID = getFriendsID(api, users) + getIDs(api, forcelist) "users": users,
"forcelist": forcelist
}
self.listOfFriendsID = getFriendsID(client, users) + getIDs(client, forcelist)
def on_connect(self): def on_connect(self):
if self.accounts[1] == []: if self.accounts['forcelist'] == []:
forcelist = "Aucun" forcelist = "Aucun"
else: else:
forcelist = f"@{', @'.join(self.accounts[1])}" forcelist = f"@{', @'.join(self.accounts['forcelist'])}"
print(f"Début du scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts[0])} et ces comptes en plus : {forcelist} comme timeline...") print(f"Début du scroll sur Twitter avec les abonnements de @{', @'.join(self.accounts['users'])} et ces comptes en plus : {forcelist} comme timeline...")
def on_disconnect_message(notice): def on_disconnect_message(notice):
notice = notice["disconnect"] notice = notice["disconnect"]
@ -150,6 +150,7 @@ class Listener(Stream):
print(f"Envoie d'un {answer[0]}...", end = " ") print(f"Envoie d'un {answer[0]}...", end = " ")
try: try:
# Send the tweet with the answer # Send the tweet with the answer
# TODO: Update Twitter API V2
self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True) self.api.update_status(status = choice(answer), in_reply_to_status_id = json["id"], auto_populate_reply_metadata = True)
print(f"{json['user']['screen_name']} s'est fait {answer[0]} !") print(f"{json['user']['screen_name']} s'est fait {answer[0]} !")
except Exception as error: except Exception as error:
@ -197,20 +198,22 @@ def repeater(word: str) -> str:
# Random format from the base answer # Random format from the base answer
return createBaseAnswers(word) return createBaseAnswers(word)
def getFriendsID(api: API, users: list[str]) -> list: def getFriendsID(client: Client, users: list[str]) -> list:
"""Get all friends of choosen users""" """Get all friends of choosen users"""
liste = [] liste = []
# Get IDs of the user's friends # Get IDs of the user's friends
for user in users: for user in users:
liste.extend(api.get_friend_ids(screen_name=user)) # TODO: Update Twitter API V2
liste.extend(client.get_friend_ids(screen_name=user))
return list(set(liste)) return list(set(liste))
def getIDs(api: API, users: list[str]) -> list: def getIDs(client: Client, users: list[str]) -> list:
"""Get all the ID of users""" """Get all the ID of users"""
liste = [] liste = []
# Get IDs of the users # Get IDs of the users
for user in users: for user in users:
liste.append(api.get_user(screen_name=user)._json["id"]) # TODO: Update Twitter API V2
liste.append(client.get_user(screen_name=user)._json["id"])
return list(set(liste)) return list(set(liste))
def seniority(date: str) -> bool: def seniority(date: str) -> bool:
@ -277,21 +280,22 @@ def createBaseAnswers(word: str) -> list:
f"{word}...", f"{word}...",
] ]
def start(): def createClient(consumer_key, consumer_secret, access_token, access_token_secret) -> Client:
"""Start the bot""" """Create a client for the Twitter API v2"""
auth = OAuth1UserHandler(keys["CONSUMER_KEY"], keys["CONSUMER_SECRET"]) client = Client(
auth.set_access_token(keys["TOKEN"], keys["TOKEN_SECRET"]) consumer_key=consumer_key,
consumer_secret=consumer_secret,
api = API(auth) access_token=access_token,
access_token_secret=access_token_secret
)
if keys["VERBOSE"]: if keys["VERBOSE"]:
try: try:
api.verify_credentials() client.get_me().data.username
print(f"Authentification réussie en tant que", end = " ") print(f"Authentification réussie en tant que @{client.get_me().data.username}.", end = " ")
except: except:
print("Erreur d'authentification.") print("Erreur d'authentification.")
exit(1) exit(1)
print(f"@{api.verify_credentials().screen_name}.")
if keys['WHITELIST'] == []: if keys['WHITELIST'] == []:
whitelist = "Aucun" whitelist = "Aucun"
@ -299,16 +303,25 @@ def start():
whitelist = f"@{', @'.join(keys['WHITELIST'])}" whitelist = f"@{', @'.join(keys['WHITELIST'])}"
print(f"Liste des comptes ignorés : {whitelist}.") print(f"Liste des comptes ignorés : {whitelist}.")
return client
def start():
"""Start the bot"""
stream = Listener( stream = Listener(
consumer_key=keys["CONSUMER_KEY"], bearer_token=keys["BEARER_TOKEN"],
consumer_secret=keys["CONSUMER_SECRET"], client=createClient(
access_token=keys["TOKEN"], keys["CONSUMER_KEY"],
access_token_secret=keys["TOKEN_SECRET"], keys["CONSUMER_SECRET"],
api=api, keys["TOKEN"],
keys["TOKEN_SECRET"],
),
users=keys["PSEUDOS"], users=keys["PSEUDOS"],
forcelist=keys["FORCELIST"] forcelist=keys["FORCELIST"],
) )
stream.filter(track = triggerWords, languages = ["fr"], stall_warnings = True, threaded = True)
# Only track specifics words
stream.add_rules([StreamRule(word) for word in triggerWords])
stream.filter(threaded=True)
if __name__ == "__main__": if __name__ == "__main__":
""" """
@ -494,7 +507,7 @@ if __name__ == "__main__":
triggerWords = generateWords(universalBase) triggerWords = generateWords(universalBase)
# Loading environment variables # Loading environment variables
keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST"]) keys = load(["TOKEN", "TOKEN_SECRET", "CONSUMER_KEY", "CONSUMER_SECRET", "PSEUDOS", "VERBOSE", "WHITELIST", "FORCELIST", "BEARER_TOKEN"])
# Start the bot # Start the bot
start() start()