from sys import argv from os import environ from dotenv import load_dotenv from cloudscraper import CloudScraper, create_scraper from re import findall, sub class Scraper: def __init__(self, pseudo: str, password: str, app: str, debug: bool = False): self.debug = debug self.url = "https://forum.mobilism.org" self.requested_app = app self.loginData = { "username": pseudo, "password": password, "login": "Login" } def errorFormat(self, code: int = None, message: str = "") -> str: """Pretty error message.""" return f"{f'[{code}]' if code else ''}{' ' if len(message) > 0 and code else ''}{message}." def connect(self) -> CloudScraper: """Login to the forum using credentials.""" session = create_scraper(browser = {"browser": "chrome", "platform": "windows"}) # connect with cloudflare bypasser with a chrome browser on windows if not session: raise SystemError(self.errorFormat(message = "The creation of the session failed")) # called only if failed at creating the session if self.debug: print("Connection attempt...") reponse = session.post(f"{self.url}/ucp.php", data = self.loginData, params = {"mode": "login"}) # connect to the forum using credentials - params are set by default but its in case forum changing that if reponse.status_code != 200: raise ConnectionRefusedError(self.errorFormat(code = reponse.status_code, message = "Unable to connect")) # called only status code isn't 200 return session def search(self, session: CloudScraper) -> tuple[list[dict], list[dict]]: """Do the research.""" if self.debug: print("Going to search page and check connection...", end = " ") reponse = session.get(f"{self.url}/search.php", params = {"keywords": self.requested_app, "sr": "topics", "sf": "titleonly"}) # fetch results page if "Sorry but you are not permitted to use the search system. If you're not logged in please" in reponse.text: raise ConnectionError(self.errorFormat(message = "Connection failed, check credentials")) # called only if login failed if reponse.status_code != 200: raise ConnectionError(self.errorFormat(code = reponse.status_code, message = "Impossible to make the search")) # called only status code isn't 200 if self.debug: print(f"Connected.") if self.debug: print(f"Fetching results for {self.requested_app}...", end = " ") topics = self.parse(reponse.text) self.save(topics) return topics, self.getInfos(session, topics) def parse(self, htmlPage: str) -> list[dict]: """Parse HTML reponse to a clean list""" if "No suitable matches were found." in htmlPage: return [] elements = htmlPage.split("\n")[1:] elements[-1] = elements[-1].split("\n")[0] for i in range(0, len(elements)): try: _title = findall(r"class=\"topictitle\">(.*)<\/a>", elements[i])[0] _title = sub(r" ?& ?", " ", _title) except: _title = None try: _author = findall(r"(
|)\n\n? by (.*)", elements[i])[0][-1] except: _author = None try: _link = findall(r"\./viewtopic\.php\?f=(\d*)&t=(\d*)&", elements[i])[0] _link = {"f": _link[0], "t": _link[1]} except: _link = None try: _date = findall(r" (.*)", elements[i])[0] except: _date = None print("\n" + elements[i] + "\n") elements[i] = {"title": _title, "author": _author, "date": _date, "link": f"{self.url}/viewtopic.php?f={_link['f']}&t={_link['t']}", "linkParams": _link} return elements def getInfos(self, session: CloudScraper, elements: list) -> list: """Go to the first n pages and get a lot of infos""" page = 3 if self.debug: print(f"Going to the {page} first pages...", end = " ") results = [] for i in range(0, len(elements)): if i < page: reponse = session.get(f"{self.url}/viewtopic.php", params = elements[i]["linkParams"]) # fetch results page results.append(reponse) if reponse.status_code != 200: raise ConnectionError(self.errorFormat(code = reponse.status_code, message = f"Error while doing the search n°{i}")) # called only status code isn't 200 if self.debug: print(f"Done.") if self.debug: print(f"Parsing results page...", end = " ") results = self.parsingInfos(results) if self.debug: print(f"Done.") return results def parsingInfos(self, elements: list) -> list[dict]: """Parse infos from the page of the app""" for i in range(0, len(elements)): elements[i] = elements[i].text if "Download Instructions" not in elements[i]: elements[i] = {"changelogs": None, "downloadLinks": None} continue try: _changelogs = findall(r"What's New: ?
(.*)

T", elements[i])[0] if len(_changelogs) < 2: # if result none, trying other method _changelogs = findall(r"What's New: ?
(.*)

T", elements[i])[0] except: _changelogs = "No changelog found." try: elements[i] = sub(r"Download Instructions:(.*)?

?
(.*|[\s\S]*)

Trouble downloading|", elements[i])[0] if len(_downloadLinks) < 2:# if result none, trying other method _downloadLinks = findall(r"Download Instructions: ?
(.*|[\s\S]*)", elements[i])[0] except: _downloadLinks = None _downloadLinks = sub(r"\n|||\">(\S*)", "", _downloadLinks) # remove html garbage _downloadLinks = sub(r"
\n?", "\n", _downloadLinks) # convert newline html to \n _downloadLinks = sub(r"Mirrors(?!:)|Mirror(?!s)(?!:)", "Mirror:", _downloadLinks) # add ":" elements[i] = {"changelogs": _changelogs, "downloadLinks": _downloadLinks} return elements def prettyPrint(self, topics: tuple[list[dict], list[dict]]) -> list: """Show a pretty message with all the specialized infos""" topics, topicsInfos = topics print("\n") result = [] for i in range(0, len(topicsInfos)): result.append({ "title": topics[i]["title"], "author": topics[i]["author"], "date": topics[i]["date"], "changelogs": str(topicsInfos[i]["changelogs"]).replace("
", "\n"), "downloadLinks": topicsInfos[i]["downloadLinks"] }) print(f"Title: {result[i]['title']}\n") print(f"Author: {result[i]['author']}\n") print(f"Date of release: {result[i]['date']}\n") print(f"Changelogs: \n{result[i]['changelogs']}\n") print(f"Download links: \n{result[i]['downloadLinks']}") print("\n\n---\n") return result def work(self) -> list: """Call all the others methods.""" return self.prettyPrint(self.search(self.connect())) def save(self, elements: list) -> None: """Save all the results parsed to a CSV file.""" taille = len(elements) if taille == 0: print("No elements were found with the search.") return filename = "results.csv" with open(filename, "w") as f: topCSV = list(elements[0].keys()) # create a copy of the first element keys topCSV.remove("linkParams") # remove linkParams f.write(";".join(topCSV)) f.write("\n") for element in elements: if element != "linkParams": f.write(";".join(str(e) for e in list(element.values())[:-1])) f.write("\n") print(f"{taille} elements have been registered in the {filename} file.") if __name__ == "__main__": argv = argv[1:] if len(argv) < 1: # no args print("No App to retrieve.") exit(1) load_dotenv() # load .env file try: try: # for logs debug = environ["DEBUG_MOBILISM"].lower() in ("yes", "true", "1") except: debug = False try: # try to fetch credentials from de .env first pseudoMobilism = environ["PSEUDO_MOBILISM"] passwordMobilism = environ["PASSWORD_MOBILISM"] except: # if it failed try to get from the cli if len(argv) >= 3: pseudoMobilism = argv[0] passwordMobilism = argv[1] argv = argv[-2:] else: # if it failed again there is a problem raise KeyError Scraper(pseudoMobilism, passwordMobilism, " ".join([n for n in argv]), debug).work() # call the work() function except KeyError: print('Please fill in the username and password (with quotes) by args or with .env file and give an app to retrieve.')