-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnyaascraper.py
executable file
·147 lines (109 loc) · 4.06 KB
/
nyaascraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/bin/python3
from typing import Any, Dict, Optional
from bs4.element import ResultSet
from bs4 import BeautifulSoup
from urllib.parse import unquote
import os
import requests
import logging as LOGGER
# if you need additional info for the settings, look at the readme
GET_DEFAULT_ROWS: bool = True
GET_DANGER_ROWS: bool = True
TUI_MODE: bool = False
LOGGER_LEVEL: int = LOGGER.ERROR
SEARCH_URL: str = "https://nyaa.si/?q={query}&s=seeders&o=desc"
WEBTORRENT_ARGS: str = "--keep-seeding --mpv --playlist"
MAX_PAGE_NUM: int = 5
DMENU_ARGS: Dict[Any, Any] = {"font": "Ubuntu-15"}
PROXIES: Optional[Dict] = None
if not TUI_MODE:
import dmenu
LOGGER.basicConfig()
LOGGER.getLogger().setLevel(LOGGER_LEVEL)
def getRows(
soup: BeautifulSoup, getDefault=GET_DEFAULT_ROWS, getDanger=GET_DANGER_ROWS
) -> ResultSet:
rows = soup.find_all("tr", class_="success")
if getDefault:
rows += soup.find_all("tr", class_="default")
if getDanger:
rows += soup.find_all("tr", class_="danger")
return rows
def getTorrents(url: str) -> dict:
torrents = []
for pageNum in range(1, MAX_PAGE_NUM):
pageUrl = f"{url}&p={pageNum}"
LOGGER.info(f"Getting page {pageNum} with url {pageUrl}")
try: # from TheRealTechWiz, but in a slightly less dirty way
pageHtml = requests.get(pageUrl, proxies=PROXIES)
except:
continue
soup = BeautifulSoup(pageHtml.text, "html.parser")
LOGGER.info(f"Got page {pageNum} !")
rows = getRows(soup)
LOGGER.info(f"Got {len(rows)} rows from page {pageNum}")
for row in rows:
td = row.find_all("td", class_="text-center")
links = td[0].find_all("a")
size = next((x for x in td if "GiB" in x.text or "$MiB" in x.text), None)
try:
size = "[" + size.get_text() + "] "
except:
size = ""
magnet = unquote(links[1]["href"])
name = row.find_all("a", text=True)[0].get_text()
torrents.append({"name": size + name, "magnet": magnet})
if len(rows) == 0:
break
return torrents
def _choiceD(dict: dict, subElem="") -> str:
choice = dmenu.show((x.get(subElem) for x in dict), lines=25, **DMENU_ARGS)
return next((x for x in dict if x.get(subElem) == choice), None)
def _choiceT(dict: dict, subElem="") -> str:
elems = list((x.get(subElem) for x in dict))
elemsReverse = list(elems)
elemsReverse.reverse()
for i, elem in enumerate(elemsReverse):
print(f"{str(len(elems) - i)}: {elem}")
# seems to be working
index = int(input("Enter your choice: ")) - 1
return dict[index]
def choice(dict: dict, subElem="") -> str: # lazy
return (_choiceT if TUI_MODE else _choiceD)(dict, subElem)
def ask(prompt: str) -> str:
if TUI_MODE:
return input(prompt)
return dmenu.show([], prompt=prompt, **DMENU_ARGS)
def main(*args: str) -> int:
query = (" ".join(args) if (args) else ask("Search tags:")).replace(" ", "+")
torrents = getTorrents(SEARCH_URL.format(query=query))
LOGGER.info(f"Got {len(torrents)} total entries")
if len(torrents) == 0:
return 1
magnet = choice(torrents, subElem="name").get("magnet")
LOGGER.info(f"Got magnet link: {magnet}")
LOGGER.info("Loading webtorrent")
if os.name == "posix":
os.system(f'webtorrent "{magnet}" {WEBTORRENT_ARGS}')
else:
print(
"TODO: find how to run webtorrent-cli on windows. don't make an issue for this except if it's been 2 months since the last commit"
)
# os.system(f"./webtorrent \"{magnet}\" {webtorrentArgs}")
return 0
if __name__ == "__main__":
import sys
import traceback
exit_code = 1
try:
exit_code = main(*sys.argv[1:])
except KeyboardInterrupt:
print("\n\nEnding execution due to user interaction.")
except Exception as err:
print()
print("An error occured!")
print(err)
traceback.print_tb(err.__traceback__)
finally:
print()
exit(exit_code)