-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathheap.py
64 lines (52 loc) · 1.6 KB
/
heap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""Heap
By default, this is a min-heap
"""
import abc
from typing import Callable, List, Optional, Self, TypeVar, Protocol
class Comparable(Protocol):
def __lt__(self, other: 'Comparable') -> bool:
pass
T = TypeVar('T')
K = TypeVar['C', Comparable]
class Heap:
def __init__(
self,
items: Optional[List[T]] = None,
key: Optional[Callable[[T], Comparable]] = None
) -> None:
self.items = items if items else list()
self.key = key if key else lambda x: x
def insert(self: Self, item: T) -> None:
self.items.append(item)
self._swim(len(self.items) - 1)
def pop(self) -> T:
self._swap(0, len(self.items) - 1)
self._sink(0)
return self.items.pop()
def __len__(self):
return len(self.items)
def _swap(self: Self, i: int, j: int) -> None:
tmp = self.items[i]
self.items[i] = self.items[j]
self.items[j] = tmp
def _eval(self: Self, i: int):
return self.key(self.items[i])
def _swim(self: Self, i: int) -> None:
while i > 0:
parent = (i - 1) // 2
if self._eval(parent) < self._eval(i):
break
self._swap(parent, i)
i = parent
def _sink(self, i: int) -> None:
while 2 * (i + 1) <= len(self.items):
j = 2 * i + 1
if (
j + 1 < len(self.items)
and self._eval(j + 1) < self._eval(j)
):
j += 1
if self._eval(j) < self._eval(i):
break
self._swap(i, j)
i = j