2015-11-12 00:54:25 +01:00
|
|
|
class BufferFullException(Exception):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BufferEmptyException(Exception):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
2019-11-07 08:58:16 -05:00
|
|
|
class CircularBuffer:
|
2015-11-12 00:54:25 +01:00
|
|
|
|
|
|
|
|
def __init__(self, capacity):
|
|
|
|
|
self.buffer = bytearray(capacity)
|
|
|
|
|
self.read_point = 0
|
|
|
|
|
self.write_point = 0
|
|
|
|
|
|
2015-11-12 02:02:07 +01:00
|
|
|
# (protected) helper method to support python 2/3
|
|
|
|
|
def _update_buffer(self, data):
|
|
|
|
|
try:
|
|
|
|
|
self.buffer[self.write_point] = data
|
|
|
|
|
except TypeError:
|
|
|
|
|
self.buffer[self.write_point] = ord(data)
|
|
|
|
|
|
2015-11-12 00:54:25 +01:00
|
|
|
def clear(self):
|
|
|
|
|
self.buffer = bytearray(len(self.buffer))
|
|
|
|
|
|
|
|
|
|
def write(self, data):
|
|
|
|
|
if all(self.buffer):
|
2017-12-12 18:11:43 +00:00
|
|
|
raise BufferFullException("Circular buffer is full")
|
2015-11-12 02:02:07 +01:00
|
|
|
self._update_buffer(data)
|
2015-11-12 00:54:25 +01:00
|
|
|
self.write_point = (self.write_point + 1) % len(self.buffer)
|
|
|
|
|
|
|
|
|
|
def overwrite(self, data):
|
2015-11-12 02:02:07 +01:00
|
|
|
self._update_buffer(data)
|
2015-11-12 00:54:25 +01:00
|
|
|
if all(self.buffer) and self.write_point == self.read_point:
|
|
|
|
|
self.read_point = (self.read_point + 1) % len(self.buffer)
|
|
|
|
|
self.write_point = (self.write_point + 1) % len(self.buffer)
|
|
|
|
|
|
|
|
|
|
def read(self):
|
|
|
|
|
if not any(self.buffer):
|
2017-12-12 18:11:43 +00:00
|
|
|
raise BufferEmptyException("Circular buffer is empty")
|
2015-11-12 00:54:25 +01:00
|
|
|
data = chr(self.buffer[self.read_point])
|
|
|
|
|
self.buffer[self.read_point] = 0
|
|
|
|
|
self.read_point = (self.read_point + 1) % len(self.buffer)
|
|
|
|
|
return data
|