-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue_c.pyx
112 lines (83 loc) · 2.35 KB
/
queue_c.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from cpython.mem cimport PyMem_Malloc, PyMem_Free
from utils import print_func_name, set_stdout, restore_stdout
""" ################## Queue in C ######################### """
cdef queue* create_queue(size_t n):
cdef queue* q = <queue*> PyMem_Malloc(sizeof(queue))
if q == NULL: exit(1)
q.items = <size_t *> PyMem_Malloc(n * sizeof(size_t))
if q.items == NULL: exit(1)
q.front = 0
q.rear = -1
q.capacity = n
return q
cdef void free_queue(queue* q):
PyMem_Free(q.items)
PyMem_Free(q)
cdef inline bint is_full_q(queue* q):
return q.rear == q.capacity - 1
cdef inline bint is_empty_q(queue* q):
return q.front == q.rear + 1
cdef void enqueue(queue* q, size_t x):
if not is_full_q(q):
q.rear += 1
q.items[q.rear] = x
else:
print("Enqueue error: queue is full")
exit(1)
cdef size_t dequeue(queue* q):
cdef size_t x
if not is_empty_q(q):
x = q.items[q.front]
q.front += 1
return x
else:
print("Dequeue error: queue is empty")
exit(1)
cdef inline size_t size_q(queue* q):
return q.rear + 1 - q.front
cdef void print_queue(queue* q):
cdef:
size_t i, idx
size_t n = size_q(q)
if is_empty_q(q):
print("[]")
return
print("[", end="")
# idx = q.front
for i in range(q.front, q.front + n - 1):
print(q.items[i], end=", ")
i += 1
print(q.items[i], end="]\n")
""" ################################################################ """
""" ######################### UNIT TESTS ########################### """
""" ################################################################ """
def test_enqueue():
cdef queue* q = create_queue(3)
enqueue(q, 1)
enqueue(q, 2)
enqueue(q, 3)
assert dequeue(q) == 1
assert dequeue(q) == 2
assert dequeue(q) == 3
free_queue(q)
def test_empty():
cdef queue* q = create_queue(3)
assert is_empty_q(q)
enqueue(q, 1)
dequeue(q)
assert is_empty_q(q)
def test_full():
cdef queue* q = create_queue(2)
enqueue(q, 1)
enqueue(q, 2)
assert is_full_q(q)
def test_print():
cdef queue* q = create_queue(3)
enqueue(q, 1)
enqueue(q, 2)
enqueue(q, 3)
s = set_stdout()
print_queue(q)
out = s.getvalue()
restore_stdout()
assert out == '[1, 2, 3]\n'