add save and message error
This commit is contained in:
parent
2fc183e10a
commit
e51102cfd8
1 changed files with 34 additions and 15 deletions
49
main.py
49
main.py
|
@ -16,15 +16,17 @@ class Scraper:
|
||||||
}
|
}
|
||||||
|
|
||||||
def errorFormat(self, code: int = None, message: str = "") -> str:
|
def errorFormat(self, code: int = None, message: str = "") -> str:
|
||||||
return f"{f'[{code}]' if code else ''}{' ' if len(message) > 0 and code else ''}{message}"
|
return f"{f'[{code}]' if code else ''}{' ' if len(message) > 0 and code else ''}{message}."
|
||||||
|
|
||||||
def connect(self) -> CloudScraper:
|
def connect(self) -> CloudScraper:
|
||||||
session = create_scraper(browser = {"browser": "chrome", "platform": "windows"}) # connect with cloudflare bypasser with a chrome browser on windows
|
session = create_scraper(browser = {"browser": "chrome", "platform": "windows"}) # connect with cloudflare bypasser with a chrome browser on windows
|
||||||
|
if not session:
|
||||||
|
raise SystemError(self.errorFormat(message = "The creation of the session failed"))
|
||||||
|
|
||||||
if self.debug: print("Retrieval of the login SID...", end = " ")
|
if self.debug: print("Retrieval of the login SID...", end = " ")
|
||||||
reponse = session.get(f"{self.url}/ucp.php", params = {"mode": "login"}) # get login page to get "sid"
|
reponse = session.get(f"{self.url}/ucp.php", params = {"mode": "login"}) # get login page to get "sid"
|
||||||
if reponse.status_code != 200:
|
if reponse.status_code != 200:
|
||||||
raise ConnectionError(self.errorFormat(code = reponse.status_code))
|
raise ConnectionError(self.errorFormat(code = reponse.status_code, message = "Login page not available"))
|
||||||
try:
|
try:
|
||||||
self.loginData["sid"] = reponse.cookies.get_dict()["ppcw_29d3s_sid"] # register "sid"
|
self.loginData["sid"] = reponse.cookies.get_dict()["ppcw_29d3s_sid"] # register "sid"
|
||||||
except:
|
except:
|
||||||
|
@ -34,27 +36,28 @@ class Scraper:
|
||||||
if self.debug: print("connection attempt...", end = " ")
|
if self.debug: print("connection attempt...", end = " ")
|
||||||
reponse = session.post(f"{self.url}/ucp.php", data = self.loginData, params = {"mode": "login"}) # connect to the forum using credentials
|
reponse = session.post(f"{self.url}/ucp.php", data = self.loginData, params = {"mode": "login"}) # connect to the forum using credentials
|
||||||
if reponse.status_code != 200:
|
if reponse.status_code != 200:
|
||||||
raise ConnectionRefusedError(self.errorFormat(code = reponse.status_code))
|
raise ConnectionRefusedError(self.errorFormat(code = reponse.status_code, message = "Unable to connect"))
|
||||||
if self.debug: print("Connection done.")
|
if self.debug: print("Connection done.")
|
||||||
|
|
||||||
reponse = session.get(f"{self.url}/index.php", cookies = reponse.cookies, params = {"sid": self.loginData["sid"]}) # back to index page
|
reponse = session.get(f"{self.url}/index.php", cookies = reponse.cookies, params = {"sid": self.loginData["sid"]}) # back to index page
|
||||||
|
if reponse.status_code != 200:
|
||||||
|
raise ConnectionError(self.errorFormat(code = reponse.status_code, message = "Unable to get to the index page"))
|
||||||
|
|
||||||
return session
|
return session
|
||||||
|
|
||||||
def search(self, session) -> str:
|
def search(self, session) -> list:
|
||||||
if self.debug: print("Going to search page...", end = " ")
|
if self.debug: print("Going to search page...", end = " ")
|
||||||
reponse = session.get(f"{self.url}/search.php", params = {"keywords": self.requested_app, "sr": "topics", "sf": "titleonly"})
|
reponse = session.get(f"{self.url}/search.php", params = {"keywords": self.requested_app, "sr": "topics", "sf": "titleonly"})
|
||||||
|
if reponse.status_code != 200:
|
||||||
|
raise ConnectionError(self.errorFormat(code = reponse.status_code, message = "Impossible to make the search"))
|
||||||
|
|
||||||
if self.debug: print("Results retrieval...", end = " ")
|
if self.debug: print(f"Results retrieval for {self.requested_app}...", end = " ")
|
||||||
linkList = self.parse(reponse.text)
|
|
||||||
# if self.debug: print(reponse.status_code, reponse.url)
|
|
||||||
# with open("temp2.log", "w") as f: # debug
|
|
||||||
# f.writelines(res)
|
|
||||||
|
|
||||||
link = "No link for your application was found."
|
return self.parse(reponse.text)
|
||||||
return link
|
|
||||||
|
|
||||||
def parse(self, htmlPage: str) -> list:
|
def parse(self, htmlPage: str) -> list:
|
||||||
|
if "No suitable matches were found." in htmlPage:
|
||||||
|
return []
|
||||||
elements = htmlPage.split("<tr>\n<td>")[1:]
|
elements = htmlPage.split("<tr>\n<td>")[1:]
|
||||||
elements[-1] = elements[-1].split("</td>\n</tr>")[0]
|
elements[-1] = elements[-1].split("</td>\n</tr>")[0]
|
||||||
for i in range(0, len(elements)):
|
for i in range(0, len(elements)):
|
||||||
|
@ -66,13 +69,12 @@ class Scraper:
|
||||||
_author = findall(r"<br />\n<i class=\"icon-user\"></i> by <a href=\"\./memberlist\.php\?mode=viewprofile&u=\d+\"( style=\"color: #.*;\" class=\"username-coloured\")?>(.*)</a>", elements[i])[0][-1]
|
_author = findall(r"<br />\n<i class=\"icon-user\"></i> by <a href=\"\./memberlist\.php\?mode=viewprofile&u=\d+\"( style=\"color: #.*;\" class=\"username-coloured\")?>(.*)</a>", elements[i])[0][-1]
|
||||||
except:
|
except:
|
||||||
_author = None
|
_author = None
|
||||||
print(elements[i])
|
|
||||||
try:
|
try:
|
||||||
_link = findall(r"\./viewtopic\.php\?f=(\d*)&t=(\d*)&", elements[i])[0]
|
_link = findall(r"\./viewtopic\.php\?f=(\d*)&t=(\d*)&", elements[i])[0]
|
||||||
_link = {"f": _link[0], "t": _link[1]}
|
_link = {"f": _link[0], "t": _link[1]}
|
||||||
except:
|
except:
|
||||||
_link = None
|
_link = None
|
||||||
elements[i] = {"title": _title, "author": _author, "linkParams": _link}
|
elements[i] = {"title": _title, "author": _author, "link": f"https://forum.mobilism.org/viewtopic.php?f={_link['f']}&t={_link['t']}", "linkParams": _link}
|
||||||
|
|
||||||
return elements
|
return elements
|
||||||
|
|
||||||
|
@ -82,10 +84,27 @@ class Scraper:
|
||||||
|
|
||||||
return link
|
return link
|
||||||
|
|
||||||
|
def save(elements):
|
||||||
|
taille = len(elements)
|
||||||
|
if taille == 0:
|
||||||
|
print("Aucun élément n'a été trouvé avec la recherche.")
|
||||||
|
return
|
||||||
|
filename = "results.csv"
|
||||||
|
with open(filename, "w") as f:
|
||||||
|
f.write(";".join(list(elements[0].keys())[:-1]))
|
||||||
|
f.write("\n")
|
||||||
|
for element in elements:
|
||||||
|
if element != "linkParams":
|
||||||
|
print(";".join(list(element.values())[:-1]))
|
||||||
|
f.write(";".join(list(element.values())[:-1]))
|
||||||
|
f.write("\n")
|
||||||
|
print(f"{taille} éléments ont étés enrengistés dans le fichier {filename}.")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
argv = argv[1:]
|
argv = argv[1:]
|
||||||
if len(argv) >= 3 and len(argv) <= 4:
|
if len(argv) >= 3 and len(argv) <= 4:
|
||||||
print(Scraper(*argv).work())
|
save(Scraper(*argv).work())
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
@ -93,6 +112,6 @@ if __name__ == "__main__":
|
||||||
debug = environ["DEBUG_MOBILISM"].lower() in ("yes", "true", "1")
|
debug = environ["DEBUG_MOBILISM"].lower() in ("yes", "true", "1")
|
||||||
except:
|
except:
|
||||||
debug = False
|
debug = False
|
||||||
print(Scraper(environ["PSEUDO_MOBILISM"], environ["PASSWORD_MOBILISM"], environ["APP_MOBILISM"], debug).work())
|
save(Scraper(environ["PSEUDO_MOBILISM"], environ["PASSWORD_MOBILISM"], environ["APP_MOBILISM"], debug).work())
|
||||||
except KeyError:
|
except KeyError:
|
||||||
print('Please fill in the username and password (with ") by args or with .env file.')
|
print('Please fill in the username and password (with ") by args or with .env file.')
|
||||||
|
|
Reference in a new issue