2021-08-03 14:17:00 +02:00
from dotenv import load_dotenv
from os import environ
from tweepy import OAuthHandler , API , StreamListener , Stream
from re import sub
2021-08-03 19:19:30 +02:00
from random import choice
2021-08-03 20:27:06 +02:00
from datetime import datetime
2021-08-03 19:59:32 +02:00
from pytz import timezone
2021-08-04 14:04:52 +02:00
from queue import Queue
2021-08-03 14:17:00 +02:00
2021-08-04 13:42:14 +02:00
def load ( variables ) - > dict :
""" Load environment variables. """
2021-08-03 14:17:00 +02:00
keys = { }
load_dotenv ( ) # load .env file
for var in variables :
try :
2021-08-04 00:04:38 +02:00
res = environ [ var ]
if var == " PSEUDOS " :
res = list ( set ( res . split ( ' , ' ) ) - { " " } ) # create a list for the channels and remove blank channels and doubles
keys [ var ] = res
2021-08-03 14:17:00 +02:00
except KeyError :
print ( f " Please set the environment variable { var } (.env file supported) " )
exit ( 1 )
return keys
class Listener ( StreamListener ) :
2021-08-04 14:04:52 +02:00
def __init__ ( self , api = None , users = None , q = Queue ( ) ) :
2021-08-03 14:17:00 +02:00
super ( Listener , self ) . __init__ ( )
2021-08-04 14:04:52 +02:00
self . q = q
2021-08-03 14:17:00 +02:00
self . api = api
2021-08-04 00:04:38 +02:00
self . listOfFriendsID = getFriendsID ( api , users )
2021-08-03 14:17:00 +02:00
def on_status ( self , status ) :
2021-08-03 19:59:32 +02:00
""" Answer to tweets. """
2021-08-04 16:00:05 +02:00
if status . _json [ " user " ] [ " id " ] in self . listOfFriendsID : # verification of the author of the tweet
if seniority ( status . _json [ " created_at " ] ) : # verification of the age of the tweet
# recovery of the last "usable" word of the tweet
2021-08-04 17:42:06 +02:00
tweetText = sub ( r " https?: \ / \ / \ S+| +? \ ?| \ ?| +? \ !| ? \ !|-|~|(?<=ui)i+|@ \ S+| \ .+ " , " " , status . _json [ " text " ] . lower ( ) )
2021-08-04 17:38:15 +02:00
lastWord = tweetText . split ( ) [ - 1 : ] [ 0 ]
2021-08-04 17:50:10 +02:00
print ( f " Tweet trouvé (dernier mot: \" { lastWord } \" )... " , end = " " )
2021-08-04 16:00:05 +02:00
if lastWord in universalBase : # check if the last word found is a supported word
2021-08-04 14:50:09 +02:00
if lastWord in quoiBase :
answer = feur
elif lastWord in ouiBase :
answer = stiti
elif lastWord in nonBase :
answer = bril
2021-08-04 17:50:10 +02:00
print ( f " Envoie de d ' un { answer [ 0 ] } ... " , end = " " )
2021-08-04 16:00:05 +02:00
try : # send answer
2021-08-04 13:52:14 +02:00
self . api . update_status ( status = choice ( answer ) , in_reply_to_status_id = status . _json [ " id " ] , auto_populate_reply_metadata = True )
2021-08-04 14:32:46 +02:00
print ( f " { status . _json [ ' user ' ] [ ' screen_name ' ] } s ' est fait { answer [ 0 ] } ! " )
2021-08-03 19:59:32 +02:00
except Exception as error :
2021-08-04 17:50:10 +02:00
print ( f " \n { errorMessage } { error } " )
2021-08-04 14:05:20 +02:00
2021-08-04 14:04:52 +02:00
def do_stuff ( self ) :
while True :
self . q . get ( )
self . q . task_done ( )
2021-08-03 19:59:32 +02:00
2021-08-04 13:42:14 +02:00
def getFriendsID ( api , users : list ) - > list :
""" Get all friends of choosen users. """
2021-08-04 00:04:38 +02:00
liste = [ ]
for user in users :
liste . extend ( api . friends_ids ( user ) )
2021-08-04 01:06:16 +02:00
return list ( set ( liste ) )
2021-08-04 00:04:38 +02:00
2021-08-04 13:42:14 +02:00
def seniority ( date : str ) - > bool :
""" Return True only if the given string date is less than one day old. """
2021-08-03 19:59:32 +02:00
datetimeObject = datetime . strptime ( date , ' %a % b %d % H: % M: % S +0000 % Y ' ) # Convert String format to datetime format
datetimeObject = datetimeObject . replace ( tzinfo = timezone ( ' UTC ' ) ) # Twitter give us an UTC time
age = datetime . now ( timezone ( ' UTC ' ) ) - datetimeObject # Time now in UTC minus the time we got to get the age of the date
return False if age . days > = 1 else True # False if older than a day
2021-08-03 14:17:00 +02:00
2021-08-04 13:42:14 +02:00
def permute ( array : list ) - > list :
""" Retrieves all possible combinations for the given list and returns the result as a list. """
2021-08-03 23:33:53 +02:00
quoiListe = [ ]
2021-08-03 21:19:22 +02:00
for text in array : # all element of the list
n = len ( text )
mx = 1 << n # Number of permutations is 2^n
text = text . lower ( ) # Converting string to lower case
for i in range ( mx ) : # Using all subsequences and permuting them
combination = [ k for k in text ]
for j in range ( n ) :
if ( ( ( i >> j ) & 1 ) == 1 ) : # If j-th bit is set, we convert it to upper case
combination [ j ] = text [ j ] . upper ( )
temp = " "
for i in combination :
temp + = i
2021-08-03 23:33:53 +02:00
quoiListe . append ( temp )
return quoiListe
2021-08-03 21:19:22 +02:00
2021-08-04 13:42:14 +02:00
def createBaseTrigger ( * lists ) - > list :
2021-08-04 16:00:05 +02:00
""" Merges all given lists into one. """
2021-08-04 13:42:14 +02:00
listing = [ ]
for liste in lists :
listing . extend ( liste )
return list ( set ( listing ) )
def createBaseAnswers ( word ) - > list :
2021-08-04 16:00:05 +02:00
""" Generates default answers for a given word. """
2021-08-04 15:45:11 +02:00
return [ word , f " ( { word } ) " , word . upper ( ) , f " { word } lol " ]
2021-08-04 13:42:14 +02:00
2021-08-04 00:04:38 +02:00
def main ( accessToken : str , accessTokenSecret : str , consumerKey : str , consumerSecret : str , users : list ) :
2021-08-03 14:17:00 +02:00
""" Main method. """
auth = OAuthHandler ( consumerKey , consumerSecret )
auth . set_access_token ( accessToken , accessTokenSecret )
2021-08-03 23:11:17 +02:00
api = API ( auth_handler = auth , wait_on_rate_limit = True )
2021-08-03 14:17:00 +02:00
2021-08-04 00:04:38 +02:00
listener = Listener ( api , users )
2021-08-03 19:19:30 +02:00
stream = Stream ( auth = api . auth , listener = listener )
2021-08-04 00:04:38 +02:00
print ( f " Scroll sur Twitter avec les abonnements de @ { ' , @ ' . join ( users ) } ... " )
2021-08-04 14:08:10 +02:00
stream . filter ( track = triggerWords , languages = [ " fr " ] , stall_warnings = True , is_async = True )
2021-08-03 14:17:00 +02:00
if __name__ == ' __main__ ' :
"""
2021-08-03 19:19:30 +02:00
TOKEN is the Access Token available in the Authentication Tokens section under Access Token and Secret sub - heading .
TOKEN_SECRET is the Access Token Secret available in the Authentication Tokens section under Access Token and Secret sub - heading .
CONSUMER_KEY is the API Key available in the Consumer Keys section .
CONSUMER_SECRET is the API Secret Key available in the Consumer Keys section .
2021-08-03 14:17:00 +02:00
- -
2021-08-03 23:11:17 +02:00
PSEUDO is the PSEUDO of the account you want to listen to snipe .
2021-08-03 14:17:00 +02:00
"""
2021-08-04 13:52:14 +02:00
errorMessage = " Error happens! " # error message
2021-08-04 13:42:14 +02:00
# words to detect
2021-08-04 17:42:06 +02:00
quoiBase = [ " quoi " , " koi " , " quoient " , " q u o i " ]
2021-08-04 13:42:14 +02:00
ouiBase = [ " oui " , " ui " ]
nonBase = [ " non " , " nn " ]
2021-08-04 13:52:14 +02:00
universalBase = createBaseTrigger ( quoiBase , ouiBase , nonBase )
2021-08-04 13:42:14 +02:00
# creation of the list with all alternatives (upper/lower case)
2021-08-04 13:52:14 +02:00
triggerWords = permute ( universalBase )
2021-08-04 13:42:14 +02:00
# creation of answers
feur = createBaseAnswers ( " feur " )
2021-08-04 17:42:06 +02:00
feur . extend ( [ " https://twitter.com/shukuzi62/status/1422611919538724868/video/1 " , " feur (-isson) " , " https://twitter.com/antoinelae/status/1422943594403581957/video/1 " ] ) # add a message in addition to the default answers
2021-08-04 13:42:14 +02:00
stiti = createBaseAnswers ( " stiti " )
bril = createBaseAnswers ( " bril " )
# loading environment variables and launching the bot
2021-08-04 00:04:38 +02:00
keys = load ( [ " TOKEN " , " TOKEN_SECRET " , " CONSUMER_KEY " , " CONSUMER_SECRET " , " PSEUDOS " ] )
main ( keys [ " TOKEN " ] , keys [ " TOKEN_SECRET " ] , keys [ " CONSUMER_KEY " ] , keys [ " CONSUMER_SECRET " ] , keys [ " PSEUDOS " ] )