-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWorkersPipeline.py
291 lines (219 loc) · 10.5 KB
/
WorkersPipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
from WarcFileSave import WarcSaver
from DebugPrinter import JsonPrinter
from threading import Lock, Event
from collections import deque
from bs4 import BeautifulSoup
from Parser import HTMLParser
import logging
import urllib3
import utils
class WorkersPipeline():
"""
This represents the object that the workers use to communicate to eachother
"""
MAX_RESULTS_PER_WARC_FILE = 1000
def __init__(self, workers:dict, maxNumPagesCrawled:int, debug:bool=False):
self._workers = workers
self._numWorkers = len(list(workers.keys()))
self._debugMode = debug
self._numPagesCrawledLock = Lock()
self._numPagesCrawled = 0
self._maxNumPagesToCrawl = maxNumPagesCrawled
self._workerCommLinksRecv = {}
self._workersCommLocks = {}
self._numWorkersWaiting = 0
self._numWorkersWaitingLock = Lock()
self._workerWaitingLinksEvents = {}
self._workerWaitingLinksEventsLocks = {}
#For debugging purposes
self._workersThatGotOut = dict()
self._workersThatGotOutLock = Lock()
for workerId in list(self._workers.keys()):
self._workerCommLinksRecv[workerId] = deque()
self._workersCommLocks[workerId] = Lock()
self._workerWaitingLinksEvents[workerId] = Event()
self._workerWaitingLinksEventsLocks[workerId] = Lock()
self._workersThatGotOut[workerId] = False
self._allDone = False
self._allDoneLock = Lock()
self._warcSaver = WarcSaver()
self._debugPrinter = JsonPrinter()
self._resourcesPerHost = dict()
self._resourcesPerHostLock = Lock()
@property
def numWorkers(self) -> int:
return self._numWorkers
@numWorkers.setter
def numWorkers(self, newNumWorkers):
raise AttributeError("newNumWorkers is not writable")
@property
def pagesCrawled(self) -> int:
return self._numPagesCrawled
@pagesCrawled.setter
def pagesCrawled(self, pagesCrawled):
raise AttributeError("pagesCrawled is not writable")
@property
def allDone(self) -> bool:
self._allDoneLock.acquire()
allDone = self._allDone
self._allDoneLock.release()
return allDone
@allDone.setter
def allDone(self, newAllDone):
raise AttributeError("allDone is not writable")
@property
def workers(self) -> dict:
return self._workers
@workers.setter
def workers(self, newWorkers):
raise AttributeError("workers is not writable")
@property
def workerToWorkerLink(self):
raise AttributeError("workerToWorkerLink is not readable or writable")
@workerToWorkerLink.setter
def workerToWorkerLink(self, newWorkerToWorkerLink):
raise AttributeError("workerToWorkerLink is not readable or writable")
@property
def workerToWorkerLock(self):
raise AttributeError("workerToWorkerLock is not readable or writable")
@workerToWorkerLock.setter
def workerToWorkerLock(self, newWorkerToWorkerLock):
raise AttributeError("workerToWorkerLock is not readable or writable")
@property
def workerWaitingLinks(self):
raise AttributeError("workerWaitingLinks is not readable or writable")
@workerWaitingLinks.setter
def workerWaitingLinks(self, newWorkerWaitingLinks):
raise AttributeError("workerWaitingLinks is not readable or writable")
def getLinksSentToWorker(self, workerId:int) -> deque:
receivedLinksLock = self._workersCommLocks[workerId]
receivedLinksLock.acquire()
self._workerWaitingLinksEventsLocks[workerId].acquire()
linksReceived = self._workerCommLinksRecv[workerId].copy()
self._workerCommLinksRecv[workerId].clear()
self._workerWaitingLinksEvents[workerId].clear()
self._workerWaitingLinksEventsLocks[workerId].release()
receivedLinksLock.release()
return linksReceived
def sendLinksToProperWorkers(self, linksByWorker:dict):
hostsAndResourcesToWorkerMap = dict()
for workerId, linksToSend in linksByWorker.items():
hostsWithSchemaToLinksMap = self._mapLinkResoursesToHosts(linksToSend)
hostsAndResourcesToWorkerMap[workerId] = [
(hostWithSchema, resources) for hostWithSchema, resources in hostsWithSchemaToLinksMap.items()
]
self._sendResourcesToWorkers(hostsAndResourcesToWorkerMap)
def _mapLinkResoursesToHosts(self, linkList:list) -> dict:
hostsToLinksMap = dict()
for link in linkList:
hostWithSchema, resource = utils.getHostWithSchemaAndResourcesFromLink(link)
if hostWithSchema not in list(hostsToLinksMap.keys()):
hostsToLinksMap[hostWithSchema] = set()
hostsToLinksMap[hostWithSchema].add(resource)
return hostsToLinksMap
def _sendResourcesToWorkers(self, hostsAndResourcesToWorkerMap:dict):
for workerId, mappedLinks in hostsAndResourcesToWorkerMap.items():
if len(mappedLinks) > 0:
#mappedLinks is a list of tuples
#each tuple has a host as a first value and a set of resources of that
#host as a second value
workerLock = self._workersCommLocks[workerId]
workerLock.acquire()
for hostAndResources in mappedLinks:
workerLinkDeque = self._workerCommLinksRecv[workerId]
currHost = hostAndResources[0]
resourcesOfHost = hostAndResources[1]
for resource in resourcesOfHost:
completeLink = utils.getCompleteLinkFromHostAndResource(currHost, resource)
workerLinkDeque.append(completeLink)
workerLock.release()
self._signalWorkerReceivedLinkEvent(workerId)
def _signalWorkerReceivedLinkEvent(self, workerId:int):
self._workerWaitingLinksEventsLocks[workerId].acquire()
self._workerWaitingLinksEvents[workerId].set()
self._workerWaitingLinksEventsLocks[workerId].release()
def waitForLinkOrAllDoneEvent(self, workerId:int):
self._setWorkerWaiting()
if not self.allDone and not self._shouldStop():
self._workerWaitingLinksEvents[workerId].wait()
self._unsetWorkerWaiting()
def _setWorkerWaiting(self):
self._numWorkersWaitingLock.acquire()
self._numWorkersWaiting += 1
self._numWorkersWaitingLock.release()
def _unsetWorkerWaiting(self):
self._numWorkersWaitingLock.acquire()
self._numWorkersWaiting -= 1
self._numWorkersWaitingLock.release()
def _shouldStop(self):
self._numWorkersWaitingLock.acquire()
everyWorkerWaiting = self._numWorkersWaiting == self._numWorkers
self._numWorkersWaitingLock.release()
self._numPagesCrawledLock.acquire()
shouldStop = self._crawledPassMaxNumPages()
self._numPagesCrawledLock.release()
if not shouldStop:
[lock.acquire() for _, lock in self._workerWaitingLinksEventsLocks.items()]
aWorkerSentLinksToAnother = any([event.is_set() for _, event in self._workerWaitingLinksEvents.items()])
shouldStop = everyWorkerWaiting and not aWorkerSentLinksToAnother
[lock.release() for _, lock in self._workerWaitingLinksEventsLocks.items()]
if shouldStop:
logging.info("TIME TO STOP")
self.setAllDone()
[lock.acquire() for _, lock in self._workerWaitingLinksEventsLocks.items()]
self._wakeEveryWorkerToDie()
[lock.release() for _, lock in self._workerWaitingLinksEventsLocks.items()]
return shouldStop
def setAllDone(self):
self._allDoneLock.acquire()
self._allDone = True
self._allDoneLock.release()
def _wakeEveryWorkerToDie(self):
return [event.set() for _, event in self._workerWaitingLinksEvents.items()]
def separateLinksByWorker(self, urls:set) -> dict:
linkByHost = dict()
for workerId in range(self._numWorkers):
linkByHost[workerId] = list()
for url in urls:
hostWithSchema = utils.getHostWithSchemaOfLink(url)
workerId = utils.threadOfHost(self._numWorkers, hostWithSchema)
linkByHost[workerId].append(url)
return linkByHost
def setSaiu(self, workerId:int):
self._workersThatGotOutLock.acquire()
self._workersThatGotOut[workerId] = True
self._workersThatGotOutLock.release()
def getNaoSairam(self):
self._workersThatGotOutLock.acquire()
sairamString = f"{[threadId for threadId, gotOut in self._workersThatGotOut.items() if not gotOut]}"
self._workersThatGotOutLock.release()
return sairamString
def saveResponse(self, response: urllib3.response.HTTPResponse, link:str):
if self._warcSaver.saveAndReturnIfSuccess(response, link):
self._addPageCrawledAndSaved(link)
def _addPageCrawledAndSaved(self, link:str):
self._numPagesCrawledLock.acquire()
self._numPagesCrawled += 1
logging.info(f"NUM PAGES: {self._numPagesCrawled}")
if self._crawledPassMaxNumPages():
logging.info(f"ATINGIU MAX PAGES")
self.setAllDone()
self._numPagesCrawledLock.release()
def _crawledPassMaxNumPages(self) -> bool:
return self._numPagesCrawled > self._maxNumPagesToCrawl
def printIfOnDebugMode(self, link:str, reqTimestamp:float, parsedHTML:BeautifulSoup):
if self._debugMode:
NUM_WORDS_TO_PRINT = 20
textToPrint = HTMLParser.getNFirstTextWords(parsedHTML, NUM_WORDS_TO_PRINT)
title = parsedHTML.find('title').string
self._debugPrinter.printJson(link, reqTimestamp, title, textToPrint)
def addResourcesPerHost(self, hostAndNumResourcesMap:dict):
self._resourcesPerHostLock.acquire()
for host, numResources in hostAndNumResourcesMap.items():
if host in list(self._resourcesPerHost.keys()):
self._resourcesPerHost[host] += numResources
else:
self._resourcesPerHost[host] = numResources
self._resourcesPerHostLock.release()
def getTotalResourcesPerHost(self):
return self._resourcesPerHost