now print the 3 first pages fetched
This commit is contained in:
parent
5c4dd816f6
commit
c5ea9e4f39
1 changed files with 95 additions and 26 deletions
121
main.py
121
main.py
|
@ -2,7 +2,7 @@ from sys import argv
|
||||||
from os import environ
|
from os import environ
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from cloudscraper import CloudScraper, create_scraper
|
from cloudscraper import CloudScraper, create_scraper
|
||||||
from re import findall
|
from re import findall, sub
|
||||||
|
|
||||||
class Scraper:
|
class Scraper:
|
||||||
def __init__(self, pseudo, password, app, debug = False):
|
def __init__(self, pseudo, password, app, debug = False):
|
||||||
|
@ -32,7 +32,7 @@ class Scraper:
|
||||||
|
|
||||||
return session
|
return session
|
||||||
|
|
||||||
def search(self, session) -> list:
|
def search(self, session) -> tuple[list[dict], list[dict]]:
|
||||||
"""Do the research."""
|
"""Do the research."""
|
||||||
if self.debug: print("Going to search page and check connection...", end = " ")
|
if self.debug: print("Going to search page and check connection...", end = " ")
|
||||||
reponse = session.get(f"{self.url}/search.php", params = {"keywords": self.requested_app, "sr": "topics", "sf": "titleonly"}) # fetch results page
|
reponse = session.get(f"{self.url}/search.php", params = {"keywords": self.requested_app, "sr": "topics", "sf": "titleonly"}) # fetch results page
|
||||||
|
@ -44,9 +44,13 @@ class Scraper:
|
||||||
|
|
||||||
if self.debug: print(f"Fetching results for {self.requested_app}...", end = " ")
|
if self.debug: print(f"Fetching results for {self.requested_app}...", end = " ")
|
||||||
|
|
||||||
return self.parse(reponse.text)
|
topics = self.parse(reponse.text)
|
||||||
|
|
||||||
def parse(self, htmlPage: str) -> list:
|
self.save(topics)
|
||||||
|
|
||||||
|
return topics, self.getInfos(session, topics)
|
||||||
|
|
||||||
|
def parse(self, htmlPage: str) -> list[dict]:
|
||||||
"""Parse HTML reponse to a clean list"""
|
"""Parse HTML reponse to a clean list"""
|
||||||
if "No suitable matches were found." in htmlPage:
|
if "No suitable matches were found." in htmlPage:
|
||||||
return []
|
return []
|
||||||
|
@ -55,6 +59,7 @@ class Scraper:
|
||||||
for i in range(0, len(elements)):
|
for i in range(0, len(elements)):
|
||||||
try:
|
try:
|
||||||
_title = findall(r"class=\"topictitle\">(.*)<\/a>", elements[i])[0]
|
_title = findall(r"class=\"topictitle\">(.*)<\/a>", elements[i])[0]
|
||||||
|
_title = sub(r" ?& ?", " ", _title)
|
||||||
except:
|
except:
|
||||||
_title = None
|
_title = None
|
||||||
try:
|
try:
|
||||||
|
@ -71,34 +76,98 @@ class Scraper:
|
||||||
except:
|
except:
|
||||||
_date = None
|
_date = None
|
||||||
print("\n" + elements[i] + "\n")
|
print("\n" + elements[i] + "\n")
|
||||||
elements[i] = {"title": _title, "author": _author, "date": _date, "link": f"https://forum.mobilism.org/viewtopic.php?f={_link['f']}&t={_link['t']}", "linkParams": _link}
|
elements[i] = {"title": _title, "author": _author, "date": _date, "link": f"{self.url}/viewtopic.php?f={_link['f']}&t={_link['t']}", "linkParams": _link}
|
||||||
|
|
||||||
return elements
|
return elements
|
||||||
|
|
||||||
|
def getInfos(self, session, elements: list) -> list:
|
||||||
|
"""Go to the first n pages and get a lot of infos"""
|
||||||
|
page = 3
|
||||||
|
if self.debug: print(f"Going to the {page} first pages...", end = " ")
|
||||||
|
results = []
|
||||||
|
for i in range(0, len(elements)):
|
||||||
|
if i < page:
|
||||||
|
reponse = session.get(f"{self.url}/viewtopic.php", params = elements[i]["linkParams"]) # fetch results page
|
||||||
|
results.append(reponse)
|
||||||
|
if reponse.status_code != 200:
|
||||||
|
raise ConnectionError(self.errorFormat(code = reponse.status_code, message = f"Error while doing the search n°{i}")) # called only status code isn't 200
|
||||||
|
if self.debug: print(f"Done.")
|
||||||
|
|
||||||
|
if self.debug: print(f"Parsing results page...", end = " ")
|
||||||
|
results = self.parsingInfos(results)
|
||||||
|
if self.debug: print(f"Done.")
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def parsingInfos(self, elements: list) -> list[dict]:
|
||||||
|
"""Parse infos from the page of the app"""
|
||||||
|
for i in range(0, len(elements)):
|
||||||
|
elements[i] = elements[i].text
|
||||||
|
if "Download Instructions" not in elements[i]:
|
||||||
|
elements[i] = {"changelogs": None, "downloadLinks": None}
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
_changelogs = findall(r"What's New:</span> ?<br />(.*)<br /><br /><span style=\"c|font-weight: bold\">T", elements[i])[0]
|
||||||
|
if len(_changelogs) < 2: # if result none, trying other method
|
||||||
|
_changelogs = findall(r"What's New:</span> ?<br />(.*)<br /><br /><span style=\"font-weight: bold\">T", elements[i])[0]
|
||||||
|
except:
|
||||||
|
_changelogs = "No changelog found."
|
||||||
|
try:
|
||||||
|
_downloadLinks = findall(r"Download Instructions:</span> ?<br />(.*|[\s\S]*)<br /><br />Trouble downloading|</a></div>", elements[i])[0]
|
||||||
|
if len(_downloadLinks) < 2:# if result none, trying other method
|
||||||
|
_downloadLinks = findall(r"Download Instructions:</span> ?<br />(.*|[\s\S]*)</a></div>", elements[i])[0]
|
||||||
|
except:
|
||||||
|
_downloadLinks = None
|
||||||
|
_downloadLinks = sub(r"\n|<a class=\"postlink\" href=\"|\(Closed Filehost\) ?|<span style=\"font-weight: bold\">|</span>|\">(\S*)</a>", "", _downloadLinks)
|
||||||
|
_downloadLinks = sub(r"<br />\n?", "\n", _downloadLinks)
|
||||||
|
_downloadLinks = sub(r"Mirrors(?!:)|Mirror(?!s)(?!:)", "Mirror:", _downloadLinks)
|
||||||
|
elements[i] = {"changelogs": _changelogs, "downloadLinks": _downloadLinks}
|
||||||
|
|
||||||
|
return elements
|
||||||
|
|
||||||
|
def prettyPrint(self, topics: tuple[list[dict], list[dict]]):
|
||||||
|
"""Show a pretty message with all the specialized infos"""
|
||||||
|
topics, topicsInfos = topics
|
||||||
|
print("\n")
|
||||||
|
result = []
|
||||||
|
for i in range(0, len(topicsInfos)):
|
||||||
|
result.append({
|
||||||
|
"title": topics[i]["title"],
|
||||||
|
"author": topics[i]["author"],
|
||||||
|
"date": topics[i]["date"],
|
||||||
|
"changelogs": str(topicsInfos[i]["changelogs"]).replace("<br />", "\n"),
|
||||||
|
"downloadLinks": topicsInfos[i]["downloadLinks"]
|
||||||
|
})
|
||||||
|
print(f"Title: {result[i]['title']}\n")
|
||||||
|
print(f"Author: {result[i]['author']}\n")
|
||||||
|
print(f"Date of release: {result[i]['date']}\n")
|
||||||
|
print(f"Changelogs: \n{result[i]['changelogs']}\n")
|
||||||
|
print(f"Download links: \n{result[i]['downloadLinks']}")
|
||||||
|
print("\n\n---\n")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
def work(self) -> str:
|
def work(self) -> str:
|
||||||
"""Call all the others methods."""
|
"""Call all the others methods."""
|
||||||
session = self.connect()
|
return self.prettyPrint(self.search(self.connect()))
|
||||||
topics = self.search(session)
|
|
||||||
|
|
||||||
return topics
|
def save(self, elements):
|
||||||
|
"""Save all the results parsed to a CSV file."""
|
||||||
def save(elements):
|
taille = len(elements)
|
||||||
"""Save all the results parsed to a CSV file."""
|
if taille == 0:
|
||||||
taille = len(elements)
|
print("Aucun élément n'a été trouvé avec la recherche.")
|
||||||
if taille == 0:
|
return
|
||||||
print("Aucun élément n'a été trouvé avec la recherche.")
|
filename = "results.csv"
|
||||||
return
|
with open(filename, "w") as f:
|
||||||
filename = "results.csv"
|
topCSV = list(elements[0].keys()) # create a copy of the first element keys
|
||||||
with open(filename, "w") as f:
|
topCSV.remove("linkParams") # remove linkParams
|
||||||
topCSV = list(elements[0].keys()) # create a copy of the first element keys
|
f.write(";".join(topCSV))
|
||||||
topCSV.remove("linkParams") # remove linkParams
|
|
||||||
f.write(";".join(topCSV))
|
|
||||||
f.write("\n")
|
|
||||||
for element in elements:
|
|
||||||
if element != "linkParams":
|
|
||||||
f.write(";".join(str(e) for e in list(element.values())[:-1]))
|
|
||||||
f.write("\n")
|
f.write("\n")
|
||||||
print(f"{taille} éléments ont étés enrengistés dans le fichier {filename}.")
|
for element in elements:
|
||||||
|
if element != "linkParams":
|
||||||
|
f.write(";".join(str(e) for e in list(element.values())[:-1]))
|
||||||
|
f.write("\n")
|
||||||
|
print(f"{taille} éléments ont étés enrengistés dans le fichier {filename}.")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
argv = argv[1:]
|
argv = argv[1:]
|
||||||
|
@ -121,6 +190,6 @@ if __name__ == "__main__":
|
||||||
argv = argv[-2:]
|
argv = argv[-2:]
|
||||||
else: # if it failed again there is a problem
|
else: # if it failed again there is a problem
|
||||||
raise KeyError
|
raise KeyError
|
||||||
save(Scraper(pseudoMobilism, passwordMobilism, " ".join([n for n in argv]), debug).work()) # call the work() function
|
Scraper(pseudoMobilism, passwordMobilism, " ".join([n for n in argv]), debug).work() # call the work() function
|
||||||
except KeyError:
|
except KeyError:
|
||||||
print('Please fill in the username and password (with quotes) by args or with .env file and give an app to retrieve.')
|
print('Please fill in the username and password (with quotes) by args or with .env file and give an app to retrieve.')
|
||||||
|
|
Reference in a new issue