-
Notifications
You must be signed in to change notification settings - Fork 6
/
Action.py
178 lines (144 loc) · 5.44 KB
/
Action.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from RandomFileQueue import RandomFileQueue
from weakref import WeakKeyDictionary
from threading import RLock
import Downloader
import TaskSystem
import Index
import FileSysIntf
import Logging
from typing import Dict
lock = RLock()
randomWalkers = WeakKeyDictionary() # type: Dict[Index.Dir,RandomFileQueue]
def get_random_walker(base):
"""
:param Index.Dir base:
:rtype: RandomFileQueue
"""
with lock:
if base in randomWalkers:
return randomWalkers[base]
walker = RandomFileQueue(root_dir=base, filesystem=Index.filesystem)
randomWalkers[base] = walker
return walker
class BaseAction:
def __hash__(self):
return id(self)
def __eq__(self, other):
return self is other
def __lt__(self, other):
if not isinstance(other, self.__class__):
return self.__class__.__name__ < other.__class__.__name__
return id(self) < id(other)
def __repr__(self):
return "%s()" % self.__class__.__name__
class Download(BaseAction):
def __init__(self, url):
self.url = str(url)
self.downloader = Downloader.Downloader(url=self.url)
def __call__(self):
import main
if not main.allowed_by_blacklist(self.url):
return
try:
self.downloader.run()
except Downloader.DownloadTemporaryError as exc:
print("%s: %s %s" % (self, type(exc).__name__, exc))
# Retry later.
# However, also queue some random action to allow other downloads.
TaskSystem.queue_work(RandomNextFile())
TaskSystem.queue_work(self)
except Downloader.DownloadFatalError as exc:
print("%s: %s %s" % (self, type(exc).__name__, exc))
# Cannot handle. Nothing we can do.
def __hash__(self):
return hash(str(self.url))
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return str(self.url) == str(other.url)
def __lt__(self, other):
if not isinstance(other, self.__class__):
return self.__class__.__name__ < other.__class__.__name__
return str(self.url) < str(other.url)
def __repr__(self):
return "Download(%r)" % str(self.url)
def __str__(self):
return "%r, %s" % (self, self.downloader.describe_state())
class RandomNextFile(BaseAction):
"""
This will search through the sources, via :func:`get_random_walker`,
which will lazily build up the file index and explore all the directories,
and add some random file to the download queue.
"""
def __init__(self, base=None, url=None):
"""
:param None|Index.Dir base:
:param None|str url:
"""
if url:
assert not base
base = Index.Dir(url=url)
if not base:
base = Index.index.get_random_source() # type: Index.Dir
self.base = base
def __call__(self):
walker = get_random_walker(self.base)
try:
# Either throws TemporaryException or returns None if empty.
url = walker.get_next_file()
except FileSysIntf.TemporaryException as exc:
# Handle another one later.
# Will automatically be added.
Logging.log("%s: TemporaryException:" % self, exc)
return
if not url:
# Can happen if we end up in an empty source or directory.
Logging.log("%s: no file found" % self)
Index.index.remove_source(self.base)
return
if TaskSystem.reached_suggested_max_queue():
# Better go exploring a bit more, and handle the current queue first.
Logging.log("%s: reached suggested max queue, will not queue download" % self)
return
TaskSystem.queue_work(Download(url))
def __hash__(self):
return hash(str(self.base.url))
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return str(self.base.url) == str(other.base.url)
def __lt__(self, other):
if not isinstance(other, self.__class__):
return self.__class__.__name__ < other.__class__.__name__
return str(self.base.url) < str(other.base.url)
def __repr__(self):
# Doesn't matter which base, just take another random next time.
# This repr will be used for the persistence storage.
return "RandomNextFile()"
def __str__(self):
return "RandomNextFile{%r}" % self.base.url
class CheckDownloadsFinished(BaseAction):
""" If we download the remaining files, check if we are finished """
def __call__(self):
import main
if not main.DownloadOnly:
# This can happen if we saved this action at an earlier run
# where we had the option enabled.
# Just ignore it now.
return
import TaskSystem
import Threading
# Check if there are no more downloads running.
if len(TaskSystem.currentWorkSet) <= 1: # should only be ourselves
# Exit.
Logging.log("All downloads finished.")
Threading.do_in_main_thread(IssueSystemExit(), wait=False)
else:
# Check again later.
TaskSystem.queue_work(CheckDownloadsFinished())
class IssueSystemExit(BaseAction):
def __call__(self):
Logging.log("Exit now.")
raise SystemExit
def get_new_action():
return RandomNextFile()