forked from jbzdarkid/TFWiki-scripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
66 lines (53 loc) · 1.66 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from queue import Empty, Queue
from threading import Thread, Event
from time import gmtime, strftime
class meta_plural(type):
def __getattr__(cls, word):
if word.endswith('s'):
word = word[:-1]
return lambda num: f'1 {word}' if num == 1 else f'{num} {word}s'
class plural(metaclass=meta_plural):
pass
def time_and_date():
return strftime(r'%H:%M, %d %B %Y (GMT)', gmtime())
def whatlinkshere(title, count, **kwargs):
kwargs.setdefault('limit', min(50, count))
kwargs.setdefault('namespace', 0)
kwargs.setdefault('hideredirs', 1)
query = '&'.join(f'{key}={value}' for key, value in kwargs.items())
return '{{fullurl:Special:WhatLinksHere/%s|%s}}' % (title, query)
class pagescraper_queue:
def __init__(self, thread_func, *args):
self.thread_func = thread_func
self.thread_func_args = args
def __enter__(self):
self.q = Queue()
self.done = Event()
self.threads = []
NUM_THREADS = 50
for _ in range(NUM_THREADS): # Number of threads
thread = Thread(target=self.meta_thread_func)
self.threads.append(thread)
thread.start()
return self
def put(self, obj):
self.q.put(obj)
def __exit__(self, exc_type, exc_val, traceback):
self.done.set()
for thread in self.threads:
thread.join()
def meta_thread_func(self):
while True:
try:
obj = self.q.get(True, 1)
except Empty:
if self.done.is_set():
return
else:
continue
try:
self.thread_func(obj, *self.thread_func_args)
except Exception as e:
print(e)
if __name__ == '__main__':
print(f'There are {plural.translations(2)} but only {plural.dogs(1)}')