rename files
This commit is contained in:
parent
f6bbed53d7
commit
adb4a3b675
3 changed files with 209 additions and 209 deletions
10
gui.py
10
gui.py
|
@ -1,10 +0,0 @@
|
|||
import kivy
|
||||
from kivy.app import App
|
||||
from kivy.uix.label import Label
|
||||
|
||||
class MobiDL(App):
|
||||
def build(self):
|
||||
return Label(text = "...")
|
||||
|
||||
if __name__ == "__main__":
|
||||
MobiDL().run()
|
206
main.py
206
main.py
|
@ -1,202 +1,10 @@
|
|||
from sys import argv
|
||||
from os import environ
|
||||
from dotenv import load_dotenv
|
||||
from cloudscraper import CloudScraper, create_scraper
|
||||
from re import findall, sub
|
||||
import kivy
|
||||
from kivy.app import App
|
||||
from kivy.uix.label import Label
|
||||
|
||||
class Scraper:
|
||||
def __init__(self, pseudo: str, password: str, app: str, debug: bool = False):
|
||||
self.debug = debug
|
||||
self.url = "https://forum.mobilism.org"
|
||||
self.requested_app = app
|
||||
self.loginData = {
|
||||
"username": pseudo,
|
||||
"password": password,
|
||||
"login": "Login"
|
||||
}
|
||||
|
||||
def errorFormat(self, code: int = None, message: str = "") -> str:
|
||||
"""Pretty error message."""
|
||||
return f"{f'[{code}]' if code else ''}{' ' if len(message) > 0 and code else ''}{message}."
|
||||
|
||||
def connect(self) -> CloudScraper:
|
||||
"""Login to the forum using credentials."""
|
||||
session = create_scraper(browser = {"browser": "chrome", "platform": "windows"}) # connect with cloudflare bypasser with a chrome browser on windows
|
||||
if not session:
|
||||
raise SystemError(self.errorFormat(message = "The creation of the session failed")) # called only if failed at creating the session
|
||||
|
||||
if self.debug: print("Connection attempt...")
|
||||
reponse = session.post(f"{self.url}/ucp.php", data = self.loginData, params = {"mode": "login"}) # connect to the forum using credentials - params are set by default but its in case forum changing that
|
||||
if reponse.status_code != 200:
|
||||
raise ConnectionRefusedError(self.errorFormat(code = reponse.status_code, message = "Unable to connect")) # called only status code isn't 200
|
||||
|
||||
return session
|
||||
|
||||
def search(self, session: CloudScraper) -> tuple[list[dict], list[dict]]:
|
||||
"""Do the research."""
|
||||
if self.debug: print("Going to search page and check connection...", end = " ")
|
||||
reponse = session.get(f"{self.url}/search.php", params = {"keywords": self.requested_app, "sr": "topics", "sf": "titleonly"}) # fetch results page
|
||||
if "Sorry but you are not permitted to use the search system. If you're not logged in please" in reponse.text:
|
||||
raise ConnectionError(self.errorFormat(message = "Connection failed, check credentials")) # called only if login failed
|
||||
if reponse.status_code != 200:
|
||||
raise ConnectionError(self.errorFormat(code = reponse.status_code, message = "Impossible to make the search")) # called only status code isn't 200
|
||||
if self.debug: print(f"Connected.")
|
||||
|
||||
if self.debug: print(f"Fetching results for {self.requested_app}...", end = " ")
|
||||
|
||||
topics = self.parse(reponse.text)
|
||||
|
||||
self.save(topics)
|
||||
|
||||
return topics, self.getInfos(session, topics)
|
||||
|
||||
def parse(self, htmlPage: str) -> list[dict]:
|
||||
"""Parse HTML reponse to a clean list"""
|
||||
if "No suitable matches were found." in htmlPage:
|
||||
return []
|
||||
elements = htmlPage.split("<tr>\n<td>")[1:]
|
||||
elements[-1] = elements[-1].split("</td>\n</tr>")[0]
|
||||
for i in range(0, len(elements)):
|
||||
try:
|
||||
_title = findall(r"class=\"topictitle\">(.*)<\/a>", elements[i])[0]
|
||||
_title = sub(r" ?& ?", " ", _title)
|
||||
except:
|
||||
_title = None
|
||||
try:
|
||||
_author = findall(r"(<br />|</strong>)\n\n?<i class=\"icon-user\"></i> by <a href=\"\./memberlist\.php\?mode=viewprofile&u=\d+\"( style=\"color: #.*;\" class=\"username-coloured\")?>(.*)</a>", elements[i])[0][-1]
|
||||
except:
|
||||
_author = None
|
||||
try:
|
||||
_link = findall(r"\./viewtopic\.php\?f=(\d*)&t=(\d*)&", elements[i])[0]
|
||||
_link = {"f": _link[0], "t": _link[1]}
|
||||
except:
|
||||
_link = None
|
||||
try:
|
||||
_date = findall(r"</a> <i class=\"icon-time\"></i> <small>(.*)</small>", elements[i])[0]
|
||||
except:
|
||||
_date = None
|
||||
print("\n" + elements[i] + "\n")
|
||||
elements[i] = {"title": _title, "author": _author, "date": _date, "link": f"{self.url}/viewtopic.php?f={_link['f']}&t={_link['t']}", "linkParams": _link}
|
||||
|
||||
return elements
|
||||
|
||||
def getInfos(self, session: CloudScraper, elements: list) -> list:
|
||||
"""Go to the first n pages and get a lot of infos"""
|
||||
size = len(elements)
|
||||
if size == 0:
|
||||
return []
|
||||
page = 3
|
||||
if self.debug: print(f"Going to the {page} first pages...", end = " ")
|
||||
results = []
|
||||
for i in range(0, size):
|
||||
if i < page:
|
||||
reponse = session.get(f"{self.url}/viewtopic.php", params = elements[i]["linkParams"]) # fetch results page
|
||||
results.append(reponse)
|
||||
if reponse.status_code != 200:
|
||||
raise ConnectionError(self.errorFormat(code = reponse.status_code, message = f"Error while doing the search n°{i}")) # called only status code isn't 200
|
||||
if self.debug: print(f"Done.")
|
||||
|
||||
if self.debug: print(f"Parsing results page...", end = " ")
|
||||
results = self.parsingInfos(results)
|
||||
if self.debug: print(f"Done.")
|
||||
|
||||
return results
|
||||
|
||||
def parsingInfos(self, elements: list) -> list[dict]:
|
||||
"""Parse infos from the page of the app"""
|
||||
for i in range(0, len(elements)):
|
||||
elements[i] = elements[i].text
|
||||
if "Download Instructions" not in elements[i]:
|
||||
elements[i] = {"changelogs": None, "downloadLinks": None}
|
||||
continue
|
||||
try:
|
||||
_changelogs = findall(r"What's New:</span> ?<br />(.*)<br /><br /><span style=\"c|font-weight: bold\">T", elements[i])[0]
|
||||
if len(_changelogs) < 2: # if result none, trying other method
|
||||
_changelogs = findall(r"What's New:</span> ?<br />(.*)<br /><br /><span style=\"font-weight: bold\">T", elements[i])[0]
|
||||
except:
|
||||
_changelogs = "No changelog found."
|
||||
try:
|
||||
elements[i] = sub(r"Download Instructions:</span>(.*)?<br /><s", "Download Instructions:</span><br /><s", elements[i])
|
||||
_downloadLinks = findall(r"Download Instructions:</span> ?<br />(.*|[\s\S]*)<br /><br />Trouble downloading|</a></div>", elements[i])[0]
|
||||
if len(_downloadLinks) < 2:# if result none, trying other method
|
||||
_downloadLinks = findall(r"Download Instructions:</span> ?<br />(.*|[\s\S]*)</a></div>", elements[i])[0]
|
||||
except:
|
||||
_downloadLinks = None
|
||||
_downloadLinks = sub(r"\n|<a class=\"postlink\" href=\"|\(Closed Filehost\) ?|<span style=\"font-weight: bold\">|</span>|\">(\S*)</a>", "", _downloadLinks) # remove html garbage
|
||||
_downloadLinks = sub(r"<br />\n?", "\n", _downloadLinks) # convert newline html to \n
|
||||
_downloadLinks = sub(r"Mirrors(?!:)|Mirror(?!s)(?!:)", "Mirror:", _downloadLinks) # add ":"
|
||||
_downloadLinks = _downloadLinks.split('">')[0]
|
||||
elements[i] = {"changelogs": _changelogs, "downloadLinks": _downloadLinks}
|
||||
|
||||
return elements
|
||||
|
||||
def prettyPrint(self, topics: tuple[list[dict], list[dict]]) -> list:
|
||||
"""Show a pretty message with all the specialized infos"""
|
||||
topics, topicsInfos = topics
|
||||
if len(topics) == 0:
|
||||
return []
|
||||
print("\n")
|
||||
result = []
|
||||
for i in range(0, len(topicsInfos)):
|
||||
result.append({
|
||||
"title": topics[i]["title"],
|
||||
"author": topics[i]["author"],
|
||||
"date": topics[i]["date"],
|
||||
"changelogs": str(topicsInfos[i]["changelogs"]).replace("<br />", "\n"),
|
||||
"downloadLinks": topicsInfos[i]["downloadLinks"]
|
||||
})
|
||||
print(f"Title: {result[i]['title']}\n")
|
||||
print(f"Author: {result[i]['author']}\n")
|
||||
print(f"Date of release: {result[i]['date']}\n")
|
||||
print(f"Changelogs: \n{result[i]['changelogs']}\n")
|
||||
print(f"Download links: \n{result[i]['downloadLinks']}")
|
||||
print("\n\n---\n")
|
||||
|
||||
return result
|
||||
|
||||
def work(self) -> list:
|
||||
"""Call all the others methods."""
|
||||
return self.prettyPrint(self.search(self.connect()))
|
||||
|
||||
def save(self, elements: list) -> None:
|
||||
"""Save all the results parsed to a CSV file."""
|
||||
size = len(elements)
|
||||
if size == 0:
|
||||
print("No elements were found with the search.")
|
||||
return
|
||||
filename = "results.csv"
|
||||
with open(filename, "w") as f:
|
||||
topCSV = list(elements[0].keys()) # create a copy of the first element keys
|
||||
topCSV.remove("linkParams") # remove linkParams
|
||||
f.write(";".join(topCSV))
|
||||
f.write("\n")
|
||||
for element in elements:
|
||||
if element != "linkParams":
|
||||
f.write(";".join(str(e) for e in list(element.values())[:-1]))
|
||||
f.write("\n")
|
||||
print(f"{size} elements have been registered in the {filename} file.")
|
||||
class MobiDL(App):
|
||||
def build(self):
|
||||
return Label(text = "...")
|
||||
|
||||
if __name__ == "__main__":
|
||||
argv = argv[1:]
|
||||
if len(argv) < 1: # no args
|
||||
print("No App to retrieve.")
|
||||
exit(1)
|
||||
load_dotenv() # load .env file
|
||||
try:
|
||||
try: # for logs
|
||||
debug = environ["DEBUG_MOBILISM"].lower() in ("yes", "true", "1")
|
||||
except:
|
||||
debug = False
|
||||
try: # try to fetch credentials from de .env first
|
||||
pseudoMobilism = environ["PSEUDO_MOBILISM"]
|
||||
passwordMobilism = environ["PASSWORD_MOBILISM"]
|
||||
except: # if it failed try to get from the cli
|
||||
if len(argv) >= 3:
|
||||
pseudoMobilism = argv[0]
|
||||
passwordMobilism = argv[1]
|
||||
argv = argv[-2:]
|
||||
else: # if it failed again there is a problem
|
||||
raise KeyError
|
||||
Scraper(pseudoMobilism, passwordMobilism, " ".join([n for n in argv]), debug).work() # call the work() function
|
||||
except KeyError:
|
||||
print('Please fill in the username and password (with quotes) by args or with .env file and give an app to retrieve.')
|
||||
MobiDL().run()
|
||||
|
|
202
scrapper.py
Normal file
202
scrapper.py
Normal file
|
@ -0,0 +1,202 @@
|
|||
from sys import argv
|
||||
from os import environ
|
||||
from dotenv import load_dotenv
|
||||
from cloudscraper import CloudScraper, create_scraper
|
||||
from re import findall, sub
|
||||
|
||||
class Scraper:
|
||||
def __init__(self, pseudo: str, password: str, app: str, debug: bool = False):
|
||||
self.debug = debug
|
||||
self.url = "https://forum.mobilism.org"
|
||||
self.requested_app = app
|
||||
self.loginData = {
|
||||
"username": pseudo,
|
||||
"password": password,
|
||||
"login": "Login"
|
||||
}
|
||||
|
||||
def errorFormat(self, code: int = None, message: str = "") -> str:
|
||||
"""Pretty error message."""
|
||||
return f"{f'[{code}]' if code else ''}{' ' if len(message) > 0 and code else ''}{message}."
|
||||
|
||||
def connect(self) -> CloudScraper:
|
||||
"""Login to the forum using credentials."""
|
||||
session = create_scraper(browser = {"browser": "chrome", "platform": "windows"}) # connect with cloudflare bypasser with a chrome browser on windows
|
||||
if not session:
|
||||
raise SystemError(self.errorFormat(message = "The creation of the session failed")) # called only if failed at creating the session
|
||||
|
||||
if self.debug: print("Connection attempt...")
|
||||
reponse = session.post(f"{self.url}/ucp.php", data = self.loginData, params = {"mode": "login"}) # connect to the forum using credentials - params are set by default but its in case forum changing that
|
||||
if reponse.status_code != 200:
|
||||
raise ConnectionRefusedError(self.errorFormat(code = reponse.status_code, message = "Unable to connect")) # called only status code isn't 200
|
||||
|
||||
return session
|
||||
|
||||
def search(self, session: CloudScraper) -> tuple[list[dict], list[dict]]:
|
||||
"""Do the research."""
|
||||
if self.debug: print("Going to search page and check connection...", end = " ")
|
||||
reponse = session.get(f"{self.url}/search.php", params = {"keywords": self.requested_app, "sr": "topics", "sf": "titleonly"}) # fetch results page
|
||||
if "Sorry but you are not permitted to use the search system. If you're not logged in please" in reponse.text:
|
||||
raise ConnectionError(self.errorFormat(message = "Connection failed, check credentials")) # called only if login failed
|
||||
if reponse.status_code != 200:
|
||||
raise ConnectionError(self.errorFormat(code = reponse.status_code, message = "Impossible to make the search")) # called only status code isn't 200
|
||||
if self.debug: print(f"Connected.")
|
||||
|
||||
if self.debug: print(f"Fetching results for {self.requested_app}...", end = " ")
|
||||
|
||||
topics = self.parse(reponse.text)
|
||||
|
||||
self.save(topics)
|
||||
|
||||
return topics, self.getInfos(session, topics)
|
||||
|
||||
def parse(self, htmlPage: str) -> list[dict]:
|
||||
"""Parse HTML reponse to a clean list"""
|
||||
if "No suitable matches were found." in htmlPage:
|
||||
return []
|
||||
elements = htmlPage.split("<tr>\n<td>")[1:]
|
||||
elements[-1] = elements[-1].split("</td>\n</tr>")[0]
|
||||
for i in range(0, len(elements)):
|
||||
try:
|
||||
_title = findall(r"class=\"topictitle\">(.*)<\/a>", elements[i])[0]
|
||||
_title = sub(r" ?& ?", " ", _title)
|
||||
except:
|
||||
_title = None
|
||||
try:
|
||||
_author = findall(r"(<br />|</strong>)\n\n?<i class=\"icon-user\"></i> by <a href=\"\./memberlist\.php\?mode=viewprofile&u=\d+\"( style=\"color: #.*;\" class=\"username-coloured\")?>(.*)</a>", elements[i])[0][-1]
|
||||
except:
|
||||
_author = None
|
||||
try:
|
||||
_link = findall(r"\./viewtopic\.php\?f=(\d*)&t=(\d*)&", elements[i])[0]
|
||||
_link = {"f": _link[0], "t": _link[1]}
|
||||
except:
|
||||
_link = None
|
||||
try:
|
||||
_date = findall(r"</a> <i class=\"icon-time\"></i> <small>(.*)</small>", elements[i])[0]
|
||||
except:
|
||||
_date = None
|
||||
print("\n" + elements[i] + "\n")
|
||||
elements[i] = {"title": _title, "author": _author, "date": _date, "link": f"{self.url}/viewtopic.php?f={_link['f']}&t={_link['t']}", "linkParams": _link}
|
||||
|
||||
return elements
|
||||
|
||||
def getInfos(self, session: CloudScraper, elements: list) -> list:
|
||||
"""Go to the first n pages and get a lot of infos"""
|
||||
size = len(elements)
|
||||
if size == 0:
|
||||
return []
|
||||
page = 3
|
||||
if self.debug: print(f"Going to the {page} first pages...", end = " ")
|
||||
results = []
|
||||
for i in range(0, size):
|
||||
if i < page:
|
||||
reponse = session.get(f"{self.url}/viewtopic.php", params = elements[i]["linkParams"]) # fetch results page
|
||||
results.append(reponse)
|
||||
if reponse.status_code != 200:
|
||||
raise ConnectionError(self.errorFormat(code = reponse.status_code, message = f"Error while doing the search n°{i}")) # called only status code isn't 200
|
||||
if self.debug: print(f"Done.")
|
||||
|
||||
if self.debug: print(f"Parsing results page...", end = " ")
|
||||
results = self.parsingInfos(results)
|
||||
if self.debug: print(f"Done.")
|
||||
|
||||
return results
|
||||
|
||||
def parsingInfos(self, elements: list) -> list[dict]:
|
||||
"""Parse infos from the page of the app"""
|
||||
for i in range(0, len(elements)):
|
||||
elements[i] = elements[i].text
|
||||
if "Download Instructions" not in elements[i]:
|
||||
elements[i] = {"changelogs": None, "downloadLinks": None}
|
||||
continue
|
||||
try:
|
||||
_changelogs = findall(r"What's New:</span> ?<br />(.*)<br /><br /><span style=\"c|font-weight: bold\">T", elements[i])[0]
|
||||
if len(_changelogs) < 2: # if result none, trying other method
|
||||
_changelogs = findall(r"What's New:</span> ?<br />(.*)<br /><br /><span style=\"font-weight: bold\">T", elements[i])[0]
|
||||
except:
|
||||
_changelogs = "No changelog found."
|
||||
try:
|
||||
elements[i] = sub(r"Download Instructions:</span>(.*)?<br /><s", "Download Instructions:</span><br /><s", elements[i])
|
||||
_downloadLinks = findall(r"Download Instructions:</span> ?<br />(.*|[\s\S]*)<br /><br />Trouble downloading|</a></div>", elements[i])[0]
|
||||
if len(_downloadLinks) < 2:# if result none, trying other method
|
||||
_downloadLinks = findall(r"Download Instructions:</span> ?<br />(.*|[\s\S]*)</a></div>", elements[i])[0]
|
||||
except:
|
||||
_downloadLinks = None
|
||||
_downloadLinks = sub(r"\n|<a class=\"postlink\" href=\"|\(Closed Filehost\) ?|<span style=\"font-weight: bold\">|</span>|\">(\S*)</a>", "", _downloadLinks) # remove html garbage
|
||||
_downloadLinks = sub(r"<br />\n?", "\n", _downloadLinks) # convert newline html to \n
|
||||
_downloadLinks = sub(r"Mirrors(?!:)|Mirror(?!s)(?!:)", "Mirror:", _downloadLinks) # add ":"
|
||||
_downloadLinks = _downloadLinks.split('">')[0]
|
||||
elements[i] = {"changelogs": _changelogs, "downloadLinks": _downloadLinks}
|
||||
|
||||
return elements
|
||||
|
||||
def prettyPrint(self, topics: tuple[list[dict], list[dict]]) -> list:
|
||||
"""Show a pretty message with all the specialized infos"""
|
||||
topics, topicsInfos = topics
|
||||
if len(topics) == 0:
|
||||
return []
|
||||
print("\n")
|
||||
result = []
|
||||
for i in range(0, len(topicsInfos)):
|
||||
result.append({
|
||||
"title": topics[i]["title"],
|
||||
"author": topics[i]["author"],
|
||||
"date": topics[i]["date"],
|
||||
"changelogs": str(topicsInfos[i]["changelogs"]).replace("<br />", "\n"),
|
||||
"downloadLinks": topicsInfos[i]["downloadLinks"]
|
||||
})
|
||||
print(f"Title: {result[i]['title']}\n")
|
||||
print(f"Author: {result[i]['author']}\n")
|
||||
print(f"Date of release: {result[i]['date']}\n")
|
||||
print(f"Changelogs: \n{result[i]['changelogs']}\n")
|
||||
print(f"Download links: \n{result[i]['downloadLinks']}")
|
||||
print("\n\n---\n")
|
||||
|
||||
return result
|
||||
|
||||
def work(self) -> list:
|
||||
"""Call all the others methods."""
|
||||
return self.prettyPrint(self.search(self.connect()))
|
||||
|
||||
def save(self, elements: list) -> None:
|
||||
"""Save all the results parsed to a CSV file."""
|
||||
size = len(elements)
|
||||
if size == 0:
|
||||
print("No elements were found with the search.")
|
||||
return
|
||||
filename = "results.csv"
|
||||
with open(filename, "w") as f:
|
||||
topCSV = list(elements[0].keys()) # create a copy of the first element keys
|
||||
topCSV.remove("linkParams") # remove linkParams
|
||||
f.write(";".join(topCSV))
|
||||
f.write("\n")
|
||||
for element in elements:
|
||||
if element != "linkParams":
|
||||
f.write(";".join(str(e) for e in list(element.values())[:-1]))
|
||||
f.write("\n")
|
||||
print(f"{size} elements have been registered in the {filename} file.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
argv = argv[1:]
|
||||
if len(argv) < 1: # no args
|
||||
print("No App to retrieve.")
|
||||
exit(1)
|
||||
load_dotenv() # load .env file
|
||||
try:
|
||||
try: # for logs
|
||||
debug = environ["DEBUG_MOBILISM"].lower() in ("yes", "true", "1")
|
||||
except:
|
||||
debug = False
|
||||
try: # try to fetch credentials from de .env first
|
||||
pseudoMobilism = environ["PSEUDO_MOBILISM"]
|
||||
passwordMobilism = environ["PASSWORD_MOBILISM"]
|
||||
except: # if it failed try to get from the cli
|
||||
if len(argv) >= 3:
|
||||
pseudoMobilism = argv[0]
|
||||
passwordMobilism = argv[1]
|
||||
argv = argv[-2:]
|
||||
else: # if it failed again there is a problem
|
||||
raise KeyError
|
||||
Scraper(pseudoMobilism, passwordMobilism, " ".join([n for n in argv]), debug).work() # call the work() function
|
||||
except KeyError:
|
||||
print('Please fill in the username and password (with quotes) by args or with .env file and give an app to retrieve.')
|
Reference in a new issue