Skip to content

Commit 5c0d9bc

Browse files
authored
Feature: circular_buffer.py
1 parent 6978607 commit 5c0d9bc

File tree

1 file changed

+96
-0
lines changed

1 file changed

+96
-0
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""
2+
Module implements Circular buffer.
3+
"""
4+
5+
__all__ = ['CircularBuffer']
6+
7+
from pydatastructs.utils.misc_util import _check_type
8+
9+
class CircularBuffer(object):
10+
11+
__slots__ = ['size', 'buffer', 'rear', 'front', 'dtype', 'count']
12+
13+
def __new__(cls, size, dtype):
14+
obj = object.__new__(cls)
15+
obj.size = size
16+
obj.buffer = [None] * size
17+
obj.rear = obj.front = -1
18+
obj.dtype = dtype
19+
obj.count = 0
20+
return obj
21+
22+
def __str__(self):
23+
return ' -> '.join([str(i) for i in self.buffer])
24+
25+
def __repr__(self):
26+
return self.__str__()
27+
28+
def enqueue(self, data):
29+
"""
30+
Adds data to the buffer.
31+
32+
Parameters
33+
==========
34+
35+
data
36+
Data to be added to the buffer.
37+
"""
38+
_check_type(data, self.dtype)
39+
if self.is_full():
40+
raise OverflowError("Circular buffer is full")
41+
if self.front == -1:
42+
self.front = 0
43+
self.rear = (self.rear + 1) % self.size
44+
self.buffer[self.rear] = self.dtype(data)
45+
self.count += 1
46+
47+
def dequeue(self):
48+
"""
49+
Removes and returns the data from the buffer.
50+
"""
51+
if self.is_empty():
52+
raise ValueError("Circular buffer is empty")
53+
data = self.buffer[self.front]
54+
self.buffer[self.front] = None
55+
if self.front == self.rear:
56+
self.front = self.rear = -1
57+
else:
58+
self.front = (self.front + 1) % self.size
59+
self.count -= 1
60+
return data
61+
62+
def peek(self):
63+
"""
64+
Returns the data at the front of the buffer without removing it.
65+
"""
66+
if self.is_empty():
67+
raise ValueError("Circular buffer is empty")
68+
return self.buffer[self.front]
69+
70+
def get(self, index):
71+
"""
72+
Get the data at the index.
73+
74+
Parameters
75+
==========
76+
77+
index: int
78+
The index of the data to be fetched.
79+
"""
80+
if self.is_empty():
81+
raise IndexError("The buffer is empty.")
82+
if index < 0 or index >= self.size:
83+
raise IndexError("Index out of bounds.")
84+
return self.buffer[index]
85+
86+
def is_empty(self):
87+
"""
88+
Checks if the buffer is empty.
89+
"""
90+
return self.count == 0
91+
92+
def is_full(self):
93+
"""
94+
Checks if the buffer is full.
95+
"""
96+
return self.count == self.size

0 commit comments

Comments
 (0)