2021-08-23 12:03:00 +02:00
from sys import argv
from os import environ
from dotenv import load_dotenv
2021-08-23 13:56:36 +02:00
from cloudscraper import CloudScraper , create_scraper
2021-08-24 12:51:15 +02:00
from re import findall , sub
2021-08-23 12:03:00 +02:00
class Scraper :
2021-08-24 13:00:28 +02:00
def __init__ ( self , pseudo : str , password : str , app : str , debug : bool = False ) :
2021-08-23 12:22:31 +02:00
self . debug = debug
2021-08-23 12:03:00 +02:00
self . url = " https://forum.mobilism.org "
self . requested_app = app
self . loginData = {
" username " : pseudo ,
" password " : password ,
2021-08-23 13:39:55 +02:00
" login " : " Login "
2021-08-23 12:03:00 +02:00
}
2021-08-23 13:56:36 +02:00
def errorFormat ( self , code : int = None , message : str = " " ) - > str :
2021-08-24 01:51:53 +02:00
""" Pretty error message. """
2021-08-23 21:04:31 +02:00
return f " { f ' [ { code } ] ' if code else ' ' } { ' ' if len ( message ) > 0 and code else ' ' } { message } . "
2021-08-23 12:22:31 +02:00
2021-08-23 13:56:36 +02:00
def connect ( self ) - > CloudScraper :
2021-08-24 01:51:53 +02:00
""" Login to the forum using credentials. """
2021-08-23 12:30:10 +02:00
session = create_scraper ( browser = { " browser " : " chrome " , " platform " : " windows " } ) # connect with cloudflare bypasser with a chrome browser on windows
2021-08-23 21:04:31 +02:00
if not session :
2021-08-24 01:51:53 +02:00
raise SystemError ( self . errorFormat ( message = " The creation of the session failed " ) ) # called only if failed at creating the session
2021-08-23 12:30:10 +02:00
2021-08-24 01:43:26 +02:00
if self . debug : print ( " Connection attempt... " )
2021-08-24 01:51:53 +02:00
reponse = session . post ( f " { self . url } /ucp.php " , data = self . loginData , params = { " mode " : " login " } ) # connect to the forum using credentials - params are set by default but its in case forum changing that
2021-08-23 21:04:31 +02:00
if reponse . status_code != 200 :
2021-08-24 01:51:53 +02:00
raise ConnectionRefusedError ( self . errorFormat ( code = reponse . status_code , message = " Unable to connect " ) ) # called only status code isn't 200
2021-08-23 12:27:11 +02:00
return session
2021-08-24 13:00:28 +02:00
def search ( self , session : CloudScraper ) - > tuple [ list [ dict ] , list [ dict ] ] :
2021-08-24 01:51:53 +02:00
""" Do the research. """
if self . debug : print ( " Going to search page and check connection... " , end = " " )
reponse = session . get ( f " { self . url } /search.php " , params = { " keywords " : self . requested_app , " sr " : " topics " , " sf " : " titleonly " } ) # fetch results page
2021-08-24 01:43:26 +02:00
if " Sorry but you are not permitted to use the search system. If you ' re not logged in please " in reponse . text :
2021-08-24 01:51:53 +02:00
raise ConnectionError ( self . errorFormat ( message = " Connection failed, check credentials " ) ) # called only if login failed
2021-08-23 21:04:31 +02:00
if reponse . status_code != 200 :
2021-08-24 01:51:53 +02:00
raise ConnectionError ( self . errorFormat ( code = reponse . status_code , message = " Impossible to make the search " ) ) # called only status code isn't 200
if self . debug : print ( f " Connected. " )
2021-08-23 18:19:04 +02:00
2021-08-24 01:51:53 +02:00
if self . debug : print ( f " Fetching results for { self . requested_app } ... " , end = " " )
2021-08-23 12:28:44 +02:00
2021-08-24 12:51:15 +02:00
topics = self . parse ( reponse . text )
2021-08-23 12:27:11 +02:00
2021-08-24 12:51:15 +02:00
self . save ( topics )
return topics , self . getInfos ( session , topics )
def parse ( self , htmlPage : str ) - > list [ dict ] :
2021-08-24 01:51:53 +02:00
""" Parse HTML reponse to a clean list """
2021-08-23 21:04:31 +02:00
if " No suitable matches were found. " in htmlPage :
return [ ]
2021-08-23 18:19:04 +02:00
elements = htmlPage . split ( " <tr> \n <td> " ) [ 1 : ]
elements [ - 1 ] = elements [ - 1 ] . split ( " </td> \n </tr> " ) [ 0 ]
for i in range ( 0 , len ( elements ) ) :
try :
_title = findall ( r " class= \" topictitle \" >(.*)< \ /a> " , elements [ i ] ) [ 0 ]
2021-08-24 12:51:15 +02:00
_title = sub ( r " ?& ? " , " " , _title )
2021-08-23 18:19:04 +02:00
except :
_title = None
try :
2021-08-24 10:09:59 +02:00
_author = findall ( r " (<br />|</strong>) \ n \ n?<i class= \" icon-user \" ></i> by <a href= \" \ ./memberlist \ .php \ ?mode=viewprofile&u= \ d+ \" ( style= \" color: #.*; \" class= \" username-coloured \" )?>(.*)</a> " , elements [ i ] ) [ 0 ] [ - 1 ]
2021-08-23 18:19:04 +02:00
except :
_author = None
try :
_link = findall ( r " \ ./viewtopic \ .php \ ?f=( \ d*)&t=( \ d*)& " , elements [ i ] ) [ 0 ]
_link = { " f " : _link [ 0 ] , " t " : _link [ 1 ] }
except :
_link = None
2021-08-24 10:19:33 +02:00
try :
_date = findall ( r " </a> <i class= \" icon-time \" ></i> <small>(.*)</small> " , elements [ i ] ) [ 0 ]
except :
_date = None
print ( " \n " + elements [ i ] + " \n " )
2021-08-24 12:51:15 +02:00
elements [ i ] = { " title " : _title , " author " : _author , " date " : _date , " link " : f " { self . url } /viewtopic.php?f= { _link [ ' f ' ] } &t= { _link [ ' t ' ] } " , " linkParams " : _link }
2021-08-23 18:19:04 +02:00
return elements
2021-08-24 13:00:28 +02:00
def getInfos ( self , session : CloudScraper , elements : list ) - > list :
2021-08-24 12:51:15 +02:00
""" Go to the first n pages and get a lot of infos """
2021-08-24 13:30:53 +02:00
size = len ( elements )
if size == 0 :
return [ ]
2021-08-24 12:51:15 +02:00
page = 3
if self . debug : print ( f " Going to the { page } first pages... " , end = " " )
results = [ ]
2021-08-24 13:30:53 +02:00
for i in range ( 0 , size ) :
2021-08-24 12:51:15 +02:00
if i < page :
reponse = session . get ( f " { self . url } /viewtopic.php " , params = elements [ i ] [ " linkParams " ] ) # fetch results page
results . append ( reponse )
if reponse . status_code != 200 :
raise ConnectionError ( self . errorFormat ( code = reponse . status_code , message = f " Error while doing the search n° { i } " ) ) # called only status code isn't 200
if self . debug : print ( f " Done. " )
if self . debug : print ( f " Parsing results page... " , end = " " )
results = self . parsingInfos ( results )
if self . debug : print ( f " Done. " )
return results
def parsingInfos ( self , elements : list ) - > list [ dict ] :
""" Parse infos from the page of the app """
for i in range ( 0 , len ( elements ) ) :
elements [ i ] = elements [ i ] . text
if " Download Instructions " not in elements [ i ] :
elements [ i ] = { " changelogs " : None , " downloadLinks " : None }
continue
try :
_changelogs = findall ( r " What ' s New:</span> ?<br />(.*)<br /><br /><span style= \" c|font-weight: bold \" >T " , elements [ i ] ) [ 0 ]
if len ( _changelogs ) < 2 : # if result none, trying other method
_changelogs = findall ( r " What ' s New:</span> ?<br />(.*)<br /><br /><span style= \" font-weight: bold \" >T " , elements [ i ] ) [ 0 ]
except :
_changelogs = " No changelog found. "
try :
2021-08-24 13:22:43 +02:00
elements [ i ] = sub ( r " Download Instructions:</span>(.*)?<br /><s " , " Download Instructions:</span><br /><s " , elements [ i ] )
2021-08-24 12:51:15 +02:00
_downloadLinks = findall ( r " Download Instructions:</span> ?<br />(.*|[ \ s \ S]*)<br /><br />Trouble downloading|</a></div> " , elements [ i ] ) [ 0 ]
if len ( _downloadLinks ) < 2 : # if result none, trying other method
_downloadLinks = findall ( r " Download Instructions:</span> ?<br />(.*|[ \ s \ S]*)</a></div> " , elements [ i ] ) [ 0 ]
except :
_downloadLinks = None
2021-08-24 13:00:28 +02:00
_downloadLinks = sub ( r " \ n|<a class= \" postlink \" href= \" | \ (Closed Filehost \ ) ?|<span style= \" font-weight: bold \" >|</span>| \" >( \ S*)</a> " , " " , _downloadLinks ) # remove html garbage
_downloadLinks = sub ( r " <br /> \ n? " , " \n " , _downloadLinks ) # convert newline html to \n
_downloadLinks = sub ( r " Mirrors(?!:)|Mirror(?!s)(?!:) " , " Mirror: " , _downloadLinks ) # add ":"
2021-08-24 13:30:53 +02:00
_downloadLinks = _downloadLinks . split ( ' " > ' ) [ 0 ]
2021-08-24 12:51:15 +02:00
elements [ i ] = { " changelogs " : _changelogs , " downloadLinks " : _downloadLinks }
return elements
2021-08-24 13:00:28 +02:00
def prettyPrint ( self , topics : tuple [ list [ dict ] , list [ dict ] ] ) - > list :
2021-08-24 12:51:15 +02:00
""" Show a pretty message with all the specialized infos """
topics , topicsInfos = topics
2021-08-24 13:30:53 +02:00
if len ( topics ) == 0 :
return [ ]
2021-08-24 12:51:15 +02:00
print ( " \n " )
result = [ ]
for i in range ( 0 , len ( topicsInfos ) ) :
result . append ( {
" title " : topics [ i ] [ " title " ] ,
" author " : topics [ i ] [ " author " ] ,
" date " : topics [ i ] [ " date " ] ,
" changelogs " : str ( topicsInfos [ i ] [ " changelogs " ] ) . replace ( " <br /> " , " \n " ) ,
" downloadLinks " : topicsInfos [ i ] [ " downloadLinks " ]
} )
print ( f " Title: { result [ i ] [ ' title ' ] } \n " )
print ( f " Author: { result [ i ] [ ' author ' ] } \n " )
print ( f " Date of release: { result [ i ] [ ' date ' ] } \n " )
print ( f " Changelogs: \n { result [ i ] [ ' changelogs ' ] } \n " )
print ( f " Download links: \n { result [ i ] [ ' downloadLinks ' ] } " )
print ( " \n \n --- \n " )
return result
2021-08-24 13:00:28 +02:00
def work ( self ) - > list :
2021-08-24 01:51:53 +02:00
""" Call all the others methods. """
2021-08-24 12:51:15 +02:00
return self . prettyPrint ( self . search ( self . connect ( ) ) )
2021-08-24 13:00:28 +02:00
def save ( self , elements : list ) - > None :
2021-08-24 12:51:15 +02:00
""" Save all the results parsed to a CSV file. """
2021-08-24 13:30:53 +02:00
size = len ( elements )
if size == 0 :
2021-08-24 13:00:28 +02:00
print ( " No elements were found with the search. " )
2021-08-24 12:51:15 +02:00
return
filename = " results.csv "
with open ( filename , " w " ) as f :
topCSV = list ( elements [ 0 ] . keys ( ) ) # create a copy of the first element keys
topCSV . remove ( " linkParams " ) # remove linkParams
f . write ( " ; " . join ( topCSV ) )
2021-08-23 21:04:31 +02:00
f . write ( " \n " )
2021-08-24 12:51:15 +02:00
for element in elements :
if element != " linkParams " :
f . write ( " ; " . join ( str ( e ) for e in list ( element . values ( ) ) [ : - 1 ] ) )
f . write ( " \n " )
2021-08-24 13:30:53 +02:00
print ( f " { size } elements have been registered in the { filename } file. " )
2021-08-23 21:04:31 +02:00
2021-08-23 12:03:00 +02:00
if __name__ == " __main__ " :
argv = argv [ 1 : ]
2021-08-24 10:22:50 +02:00
if len ( argv ) < 1 : # no args
2021-08-24 01:43:26 +02:00
print ( " No App to retrieve. " )
exit ( 1 )
2021-08-24 10:22:50 +02:00
load_dotenv ( ) # load .env file
2021-08-24 01:43:26 +02:00
try :
2021-08-24 10:22:50 +02:00
try : # for logs
2021-08-24 01:43:26 +02:00
debug = environ [ " DEBUG_MOBILISM " ] . lower ( ) in ( " yes " , " true " , " 1 " )
except :
debug = False
2021-08-24 10:22:50 +02:00
try : # try to fetch credentials from de .env first
2021-08-24 01:43:26 +02:00
pseudoMobilism = environ [ " PSEUDO_MOBILISM " ]
passwordMobilism = environ [ " PASSWORD_MOBILISM " ]
2021-08-24 10:22:50 +02:00
except : # if it failed try to get from the cli
2021-08-24 01:43:26 +02:00
if len ( argv ) > = 3 :
pseudoMobilism = argv [ 0 ]
passwordMobilism = argv [ 1 ]
argv = argv [ - 2 : ]
2021-08-24 10:22:50 +02:00
else : # if it failed again there is a problem
2021-08-24 01:43:26 +02:00
raise KeyError
2021-08-24 12:51:15 +02:00
Scraper ( pseudoMobilism , passwordMobilism , " " . join ( [ n for n in argv ] ) , debug ) . work ( ) # call the work() function
2021-08-24 01:43:26 +02:00
except KeyError :
print ( ' Please fill in the username and password (with quotes) by args or with .env file and give an app to retrieve. ' )