-
Notifications
You must be signed in to change notification settings - Fork 0
/
map-reduce02.py
46 lines (30 loc) · 1.19 KB
/
map-reduce02.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
def map_function(word):
return word, 1
def shuffle_function(mapped_values):
shuffled = defaultdict(list)
for key, value in mapped_values:
shuffled[key].append(value)
return shuffled.items()
def reduce_function(key_values):
key, values = key_values
return key, sum(values)
# Виконання MapReduce
def map_reduce(text):
words = text.split()
# Паралельний Маппінг
with ThreadPoolExecutor() as executor:
mapped_values = list(executor.map(map_function, words))
# Крок 2: Shuffle
shuffled_values = shuffle_function(mapped_values)
# Паралельна Редукція
with ThreadPoolExecutor() as executor:
reduced_values = list(executor.map(reduce_function, shuffled_values))
return dict(reduced_values)
if __name__ == "__main__":
# Вхідний текст для обробки
text = "hello world hello Python hello Student"
# Виконання MapReduce на вхідному тексті
result = map_reduce(text)
print("Результат підрахунку слів:", result)