From adb4a3b6751033454759ada120f4bc9088158d15 Mon Sep 17 00:00:00 2001 From: Mylloon Date: Wed, 25 Aug 2021 18:38:26 +0200 Subject: [PATCH] rename files --- gui.py | 10 --- main.py | 206 ++-------------------------------------------------- scrapper.py | 202 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 209 insertions(+), 209 deletions(-) delete mode 100644 gui.py create mode 100644 scrapper.py diff --git a/gui.py b/gui.py deleted file mode 100644 index b101304..0000000 --- a/gui.py +++ /dev/null @@ -1,10 +0,0 @@ -import kivy -from kivy.app import App -from kivy.uix.label import Label - -class MobiDL(App): - def build(self): - return Label(text = "...") - -if __name__ == "__main__": - MobiDL().run() diff --git a/main.py b/main.py index 2721d7c..b101304 100644 --- a/main.py +++ b/main.py @@ -1,202 +1,10 @@ -from sys import argv -from os import environ -from dotenv import load_dotenv -from cloudscraper import CloudScraper, create_scraper -from re import findall, sub +import kivy +from kivy.app import App +from kivy.uix.label import Label -class Scraper: - def __init__(self, pseudo: str, password: str, app: str, debug: bool = False): - self.debug = debug - self.url = "https://forum.mobilism.org" - self.requested_app = app - self.loginData = { - "username": pseudo, - "password": password, - "login": "Login" - } - - def errorFormat(self, code: int = None, message: str = "") -> str: - """Pretty error message.""" - return f"{f'[{code}]' if code else ''}{' ' if len(message) > 0 and code else ''}{message}." - - def connect(self) -> CloudScraper: - """Login to the forum using credentials.""" - session = create_scraper(browser = {"browser": "chrome", "platform": "windows"}) # connect with cloudflare bypasser with a chrome browser on windows - if not session: - raise SystemError(self.errorFormat(message = "The creation of the session failed")) # called only if failed at creating the session - - if self.debug: print("Connection attempt...") - reponse = session.post(f"{self.url}/ucp.php", data = self.loginData, params = {"mode": "login"}) # connect to the forum using credentials - params are set by default but its in case forum changing that - if reponse.status_code != 200: - raise ConnectionRefusedError(self.errorFormat(code = reponse.status_code, message = "Unable to connect")) # called only status code isn't 200 - - return session - - def search(self, session: CloudScraper) -> tuple[list[dict], list[dict]]: - """Do the research.""" - if self.debug: print("Going to search page and check connection...", end = " ") - reponse = session.get(f"{self.url}/search.php", params = {"keywords": self.requested_app, "sr": "topics", "sf": "titleonly"}) # fetch results page - if "Sorry but you are not permitted to use the search system. If you're not logged in please" in reponse.text: - raise ConnectionError(self.errorFormat(message = "Connection failed, check credentials")) # called only if login failed - if reponse.status_code != 200: - raise ConnectionError(self.errorFormat(code = reponse.status_code, message = "Impossible to make the search")) # called only status code isn't 200 - if self.debug: print(f"Connected.") - - if self.debug: print(f"Fetching results for {self.requested_app}...", end = " ") - - topics = self.parse(reponse.text) - - self.save(topics) - - return topics, self.getInfos(session, topics) - - def parse(self, htmlPage: str) -> list[dict]: - """Parse HTML reponse to a clean list""" - if "No suitable matches were found." in htmlPage: - return [] - elements = htmlPage.split("\n")[1:] - elements[-1] = elements[-1].split("\n")[0] - for i in range(0, len(elements)): - try: - _title = findall(r"class=\"topictitle\">(.*)<\/a>", elements[i])[0] - _title = sub(r" ?& ?", " ", _title) - except: - _title = None - try: - _author = findall(r"(
|)\n\n? by (.*)", elements[i])[0][-1] - except: - _author = None - try: - _link = findall(r"\./viewtopic\.php\?f=(\d*)&t=(\d*)&", elements[i])[0] - _link = {"f": _link[0], "t": _link[1]} - except: - _link = None - try: - _date = findall(r" (.*)", elements[i])[0] - except: - _date = None - print("\n" + elements[i] + "\n") - elements[i] = {"title": _title, "author": _author, "date": _date, "link": f"{self.url}/viewtopic.php?f={_link['f']}&t={_link['t']}", "linkParams": _link} - - return elements - - def getInfos(self, session: CloudScraper, elements: list) -> list: - """Go to the first n pages and get a lot of infos""" - size = len(elements) - if size == 0: - return [] - page = 3 - if self.debug: print(f"Going to the {page} first pages...", end = " ") - results = [] - for i in range(0, size): - if i < page: - reponse = session.get(f"{self.url}/viewtopic.php", params = elements[i]["linkParams"]) # fetch results page - results.append(reponse) - if reponse.status_code != 200: - raise ConnectionError(self.errorFormat(code = reponse.status_code, message = f"Error while doing the search n°{i}")) # called only status code isn't 200 - if self.debug: print(f"Done.") - - if self.debug: print(f"Parsing results page...", end = " ") - results = self.parsingInfos(results) - if self.debug: print(f"Done.") - - return results - - def parsingInfos(self, elements: list) -> list[dict]: - """Parse infos from the page of the app""" - for i in range(0, len(elements)): - elements[i] = elements[i].text - if "Download Instructions" not in elements[i]: - elements[i] = {"changelogs": None, "downloadLinks": None} - continue - try: - _changelogs = findall(r"What's New: ?
(.*)

T", elements[i])[0] - if len(_changelogs) < 2: # if result none, trying other method - _changelogs = findall(r"What's New: ?
(.*)

T", elements[i])[0] - except: - _changelogs = "No changelog found." - try: - elements[i] = sub(r"Download Instructions:(.*)?

?
(.*|[\s\S]*)

Trouble downloading|", elements[i])[0] - if len(_downloadLinks) < 2:# if result none, trying other method - _downloadLinks = findall(r"Download Instructions: ?
(.*|[\s\S]*)", elements[i])[0] - except: - _downloadLinks = None - _downloadLinks = sub(r"\n|||\">(\S*)", "", _downloadLinks) # remove html garbage - _downloadLinks = sub(r"
\n?", "\n", _downloadLinks) # convert newline html to \n - _downloadLinks = sub(r"Mirrors(?!:)|Mirror(?!s)(?!:)", "Mirror:", _downloadLinks) # add ":" - _downloadLinks = _downloadLinks.split('">')[0] - elements[i] = {"changelogs": _changelogs, "downloadLinks": _downloadLinks} - - return elements - - def prettyPrint(self, topics: tuple[list[dict], list[dict]]) -> list: - """Show a pretty message with all the specialized infos""" - topics, topicsInfos = topics - if len(topics) == 0: - return [] - print("\n") - result = [] - for i in range(0, len(topicsInfos)): - result.append({ - "title": topics[i]["title"], - "author": topics[i]["author"], - "date": topics[i]["date"], - "changelogs": str(topicsInfos[i]["changelogs"]).replace("
", "\n"), - "downloadLinks": topicsInfos[i]["downloadLinks"] - }) - print(f"Title: {result[i]['title']}\n") - print(f"Author: {result[i]['author']}\n") - print(f"Date of release: {result[i]['date']}\n") - print(f"Changelogs: \n{result[i]['changelogs']}\n") - print(f"Download links: \n{result[i]['downloadLinks']}") - print("\n\n---\n") - - return result - - def work(self) -> list: - """Call all the others methods.""" - return self.prettyPrint(self.search(self.connect())) - - def save(self, elements: list) -> None: - """Save all the results parsed to a CSV file.""" - size = len(elements) - if size == 0: - print("No elements were found with the search.") - return - filename = "results.csv" - with open(filename, "w") as f: - topCSV = list(elements[0].keys()) # create a copy of the first element keys - topCSV.remove("linkParams") # remove linkParams - f.write(";".join(topCSV)) - f.write("\n") - for element in elements: - if element != "linkParams": - f.write(";".join(str(e) for e in list(element.values())[:-1])) - f.write("\n") - print(f"{size} elements have been registered in the {filename} file.") +class MobiDL(App): + def build(self): + return Label(text = "...") if __name__ == "__main__": - argv = argv[1:] - if len(argv) < 1: # no args - print("No App to retrieve.") - exit(1) - load_dotenv() # load .env file - try: - try: # for logs - debug = environ["DEBUG_MOBILISM"].lower() in ("yes", "true", "1") - except: - debug = False - try: # try to fetch credentials from de .env first - pseudoMobilism = environ["PSEUDO_MOBILISM"] - passwordMobilism = environ["PASSWORD_MOBILISM"] - except: # if it failed try to get from the cli - if len(argv) >= 3: - pseudoMobilism = argv[0] - passwordMobilism = argv[1] - argv = argv[-2:] - else: # if it failed again there is a problem - raise KeyError - Scraper(pseudoMobilism, passwordMobilism, " ".join([n for n in argv]), debug).work() # call the work() function - except KeyError: - print('Please fill in the username and password (with quotes) by args or with .env file and give an app to retrieve.') + MobiDL().run() diff --git a/scrapper.py b/scrapper.py new file mode 100644 index 0000000..2721d7c --- /dev/null +++ b/scrapper.py @@ -0,0 +1,202 @@ +from sys import argv +from os import environ +from dotenv import load_dotenv +from cloudscraper import CloudScraper, create_scraper +from re import findall, sub + +class Scraper: + def __init__(self, pseudo: str, password: str, app: str, debug: bool = False): + self.debug = debug + self.url = "https://forum.mobilism.org" + self.requested_app = app + self.loginData = { + "username": pseudo, + "password": password, + "login": "Login" + } + + def errorFormat(self, code: int = None, message: str = "") -> str: + """Pretty error message.""" + return f"{f'[{code}]' if code else ''}{' ' if len(message) > 0 and code else ''}{message}." + + def connect(self) -> CloudScraper: + """Login to the forum using credentials.""" + session = create_scraper(browser = {"browser": "chrome", "platform": "windows"}) # connect with cloudflare bypasser with a chrome browser on windows + if not session: + raise SystemError(self.errorFormat(message = "The creation of the session failed")) # called only if failed at creating the session + + if self.debug: print("Connection attempt...") + reponse = session.post(f"{self.url}/ucp.php", data = self.loginData, params = {"mode": "login"}) # connect to the forum using credentials - params are set by default but its in case forum changing that + if reponse.status_code != 200: + raise ConnectionRefusedError(self.errorFormat(code = reponse.status_code, message = "Unable to connect")) # called only status code isn't 200 + + return session + + def search(self, session: CloudScraper) -> tuple[list[dict], list[dict]]: + """Do the research.""" + if self.debug: print("Going to search page and check connection...", end = " ") + reponse = session.get(f"{self.url}/search.php", params = {"keywords": self.requested_app, "sr": "topics", "sf": "titleonly"}) # fetch results page + if "Sorry but you are not permitted to use the search system. If you're not logged in please" in reponse.text: + raise ConnectionError(self.errorFormat(message = "Connection failed, check credentials")) # called only if login failed + if reponse.status_code != 200: + raise ConnectionError(self.errorFormat(code = reponse.status_code, message = "Impossible to make the search")) # called only status code isn't 200 + if self.debug: print(f"Connected.") + + if self.debug: print(f"Fetching results for {self.requested_app}...", end = " ") + + topics = self.parse(reponse.text) + + self.save(topics) + + return topics, self.getInfos(session, topics) + + def parse(self, htmlPage: str) -> list[dict]: + """Parse HTML reponse to a clean list""" + if "No suitable matches were found." in htmlPage: + return [] + elements = htmlPage.split("\n")[1:] + elements[-1] = elements[-1].split("\n")[0] + for i in range(0, len(elements)): + try: + _title = findall(r"class=\"topictitle\">(.*)<\/a>", elements[i])[0] + _title = sub(r" ?& ?", " ", _title) + except: + _title = None + try: + _author = findall(r"(
|)\n\n? by (.*)", elements[i])[0][-1] + except: + _author = None + try: + _link = findall(r"\./viewtopic\.php\?f=(\d*)&t=(\d*)&", elements[i])[0] + _link = {"f": _link[0], "t": _link[1]} + except: + _link = None + try: + _date = findall(r" (.*)", elements[i])[0] + except: + _date = None + print("\n" + elements[i] + "\n") + elements[i] = {"title": _title, "author": _author, "date": _date, "link": f"{self.url}/viewtopic.php?f={_link['f']}&t={_link['t']}", "linkParams": _link} + + return elements + + def getInfos(self, session: CloudScraper, elements: list) -> list: + """Go to the first n pages and get a lot of infos""" + size = len(elements) + if size == 0: + return [] + page = 3 + if self.debug: print(f"Going to the {page} first pages...", end = " ") + results = [] + for i in range(0, size): + if i < page: + reponse = session.get(f"{self.url}/viewtopic.php", params = elements[i]["linkParams"]) # fetch results page + results.append(reponse) + if reponse.status_code != 200: + raise ConnectionError(self.errorFormat(code = reponse.status_code, message = f"Error while doing the search n°{i}")) # called only status code isn't 200 + if self.debug: print(f"Done.") + + if self.debug: print(f"Parsing results page...", end = " ") + results = self.parsingInfos(results) + if self.debug: print(f"Done.") + + return results + + def parsingInfos(self, elements: list) -> list[dict]: + """Parse infos from the page of the app""" + for i in range(0, len(elements)): + elements[i] = elements[i].text + if "Download Instructions" not in elements[i]: + elements[i] = {"changelogs": None, "downloadLinks": None} + continue + try: + _changelogs = findall(r"What's New: ?
(.*)

T", elements[i])[0] + if len(_changelogs) < 2: # if result none, trying other method + _changelogs = findall(r"What's New: ?
(.*)

T", elements[i])[0] + except: + _changelogs = "No changelog found." + try: + elements[i] = sub(r"Download Instructions:(.*)?

?
(.*|[\s\S]*)

Trouble downloading|", elements[i])[0] + if len(_downloadLinks) < 2:# if result none, trying other method + _downloadLinks = findall(r"Download Instructions: ?
(.*|[\s\S]*)", elements[i])[0] + except: + _downloadLinks = None + _downloadLinks = sub(r"\n|||\">(\S*)", "", _downloadLinks) # remove html garbage + _downloadLinks = sub(r"
\n?", "\n", _downloadLinks) # convert newline html to \n + _downloadLinks = sub(r"Mirrors(?!:)|Mirror(?!s)(?!:)", "Mirror:", _downloadLinks) # add ":" + _downloadLinks = _downloadLinks.split('">')[0] + elements[i] = {"changelogs": _changelogs, "downloadLinks": _downloadLinks} + + return elements + + def prettyPrint(self, topics: tuple[list[dict], list[dict]]) -> list: + """Show a pretty message with all the specialized infos""" + topics, topicsInfos = topics + if len(topics) == 0: + return [] + print("\n") + result = [] + for i in range(0, len(topicsInfos)): + result.append({ + "title": topics[i]["title"], + "author": topics[i]["author"], + "date": topics[i]["date"], + "changelogs": str(topicsInfos[i]["changelogs"]).replace("
", "\n"), + "downloadLinks": topicsInfos[i]["downloadLinks"] + }) + print(f"Title: {result[i]['title']}\n") + print(f"Author: {result[i]['author']}\n") + print(f"Date of release: {result[i]['date']}\n") + print(f"Changelogs: \n{result[i]['changelogs']}\n") + print(f"Download links: \n{result[i]['downloadLinks']}") + print("\n\n---\n") + + return result + + def work(self) -> list: + """Call all the others methods.""" + return self.prettyPrint(self.search(self.connect())) + + def save(self, elements: list) -> None: + """Save all the results parsed to a CSV file.""" + size = len(elements) + if size == 0: + print("No elements were found with the search.") + return + filename = "results.csv" + with open(filename, "w") as f: + topCSV = list(elements[0].keys()) # create a copy of the first element keys + topCSV.remove("linkParams") # remove linkParams + f.write(";".join(topCSV)) + f.write("\n") + for element in elements: + if element != "linkParams": + f.write(";".join(str(e) for e in list(element.values())[:-1])) + f.write("\n") + print(f"{size} elements have been registered in the {filename} file.") + +if __name__ == "__main__": + argv = argv[1:] + if len(argv) < 1: # no args + print("No App to retrieve.") + exit(1) + load_dotenv() # load .env file + try: + try: # for logs + debug = environ["DEBUG_MOBILISM"].lower() in ("yes", "true", "1") + except: + debug = False + try: # try to fetch credentials from de .env first + pseudoMobilism = environ["PSEUDO_MOBILISM"] + passwordMobilism = environ["PASSWORD_MOBILISM"] + except: # if it failed try to get from the cli + if len(argv) >= 3: + pseudoMobilism = argv[0] + passwordMobilism = argv[1] + argv = argv[-2:] + else: # if it failed again there is a problem + raise KeyError + Scraper(pseudoMobilism, passwordMobilism, " ".join([n for n in argv]), debug).work() # call the work() function + except KeyError: + print('Please fill in the username and password (with quotes) by args or with .env file and give an app to retrieve.')