Skip to content

Commit

Permalink
Implement Step8 persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
ngokchaoho committed Dec 25, 2023
1 parent 8f2a695 commit bc8980b
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*.pyc
*.aof
12 changes: 8 additions & 4 deletions pyredis/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pyredis.asyncserver import RedisServerProtocol
from pyredis.trioserver import TrioServer
from pyredis.datastore import DataStore
from pyredis.persistence import AppendOnlyPersister


REDIS_DEFAULT_PORT = 6379
Expand All @@ -18,13 +19,13 @@
def check_expiry_task(datastore):
while True:
datastore.remove_expired_keys()
sleep(0.1)
sleep(1)


async def acheck_expiry_task(datastore):
while True:
datastore.remove_expired_keys()
await asyncio.sleep(0.1)
await asyncio.sleep(1)


async def amain(args):
Expand All @@ -34,10 +35,12 @@ async def amain(args):

loop = asyncio.get_running_loop()

monitor_task = loop.create_task(acheck_expiry_task(datastore))
loop.create_task(acheck_expiry_task(datastore))

persister = AppendOnlyPersister("ccdb.aof")

server = await loop.create_server(
lambda: RedisServerProtocol(datastore), "127.0.0.1", args.port
lambda: RedisServerProtocol(datastore, persister), "127.0.0.1", args.port
)

async with server:
Expand All @@ -53,6 +56,7 @@ def main(args):
log.info(f"Starting PyRedis on port: {args.port}")

datastore = DataStore()

expiration_monitor = threading.Thread(target=check_expiry_task, args=(datastore,))
expiration_monitor.start()

Expand Down
5 changes: 3 additions & 2 deletions pyredis/asyncserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@


class RedisServerProtocol(asyncio.Protocol):
def __init__(self, datastore):
def __init__(self, datastore, persister):
self.buffer = bytearray()
self._datastore = datastore
self._persister = persister

def connection_made(self, transport):
self.transport = transport
Expand All @@ -22,5 +23,5 @@ def data_received(self, data):

if frame:
self.buffer = self.buffer[frame_size:]
result = handle_command(frame, self._datastore)
result = handle_command(frame, self._datastore, self._persister)
self.transport.write(encode_message(result))
46 changes: 31 additions & 15 deletions pyredis/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ def _handle_ping(command):
return Error(data="ERR wrong number of arguments for 'ping' command")


def _handle_set(command, datastore):
def _handle_set(command, datastore, persister):
length = len(command)
if length >= 3:
key = command[1].data.decode()
value = command[2].data.decode()

if length == 3:
datastore[key] = value
if persister:
persister.log_command(command)
return SimpleString("OK")
elif length == 5:
expiry_mode = command[3].data.decode()
Expand All @@ -39,9 +41,13 @@ def _handle_set(command, datastore):

if expiry_mode == "ex":
datastore.set_with_expiry(key, value, expiry * 1000)
if persister:
persister.log_command(command)
return SimpleString("OK")
elif expiry_mode == "px":
datastore.set_with_expiry(key, value, expiry)
if persister:
persister.log_command(command)
return SimpleString("OK")
return Error("ERR syntax error")

Expand Down Expand Up @@ -70,41 +76,47 @@ def _handle_exists(command, datastore):
return Error("ERR wrong number of arguments for 'exists' command")


def _handle_del(command, datastore):
def _handle_del(command, datastore, persister):
if len(command) >= 2:
found = 0
for key in command[1:]:
if key.data.decode() in datastore:
del datastore._data[key.data.decode()]
found += 1
if persister:
persister.log_command(command)
return Integer(found)
else:
return Error("ERR wrong number of arguments for 'del' command")


def _handle_incr(command, datastore):
def _handle_incr(command, datastore, persister):
if len(command) == 2:
key = command[1].data.decode()
try:
value = datastore.incr(key)
if persister:
persister.log_command(command)
return Integer(value)
except TypeError:
return Error("ERR value is not an integer or out of range")
return Integer(value)
return Error("ERR wrong number of arguments for 'incr' command")


def _handle_decr(command, datastore):
def _handle_decr(command, datastore, persister):
if len(command) == 2:
key = command[1].data.decode()
try:
value = datastore.decr(key)
if persister:
persister.log_command(command)
return Integer(value)
except TypeError:
return Error("ERR value is not an integer or out of range")
return Integer(value)
return Error("ERR wrong number of arguments for 'decr' command")


def _handle_lpush(command, datastore):
def _handle_lpush(command, datastore, persister):
if len(command) >= 2:
count = 0
key = command[1].data.decode()
Expand All @@ -113,6 +125,8 @@ def _handle_lpush(command, datastore):
for c in command[2:]:
item = c.data.decode()
count = datastore.prepend(key, item)
if persister:
persister.log_command(command)
return Integer(count)
except TypeError:
return Error(
Expand All @@ -138,7 +152,7 @@ def _handle_lrange(command, datastore):
return Error("ERR wrong number of arguments for 'lrange' command")


def _handle_rpush(command, datastore):
def _handle_rpush(command, datastore, persister):
if len(command) >= 2:
count = 0
key = command[1].data.decode()
Expand All @@ -147,6 +161,8 @@ def _handle_rpush(command, datastore):
for c in command[2:]:
item = c.data.decode()
count = datastore.append(key, item)
if persister:
persister.log_command(command)
return Integer(count)
except TypeError:
return Error(
Expand All @@ -162,7 +178,7 @@ def _handle_unrecognised_command(command, *args):
)


def handle_command(command, datastore):
def handle_command(command, datastore, persister):
match command[0].data.decode().upper():
case "ECHO":
return _handle_echo(command)
Expand All @@ -171,21 +187,21 @@ def handle_command(command, datastore):
return _handle_ping(command)

case "SET":
return _handle_set(command, datastore)
return _handle_set(command, datastore, persister)
case "GET":
return _handle_get(command, datastore)
case "EXISTS":
return _handle_exists(command, datastore)
case "DEL":
return _handle_del(command, datastore)
return _handle_del(command, datastore, persister)
case "INCR":
return _handle_incr(command, datastore)
return _handle_incr(command, datastore, persister)
case "DECR":
return _handle_decr(command, datastore)
return _handle_decr(command, datastore, persister)
case "LPUSH":
return _handle_lpush(command, datastore)
return _handle_lpush(command, datastore, persister)
case "RPUSH":
return _handle_rpush(command, datastore)
return _handle_rpush(command, datastore, persister)
case "LRANGE":
return _handle_lrange(command, datastore)
return _handle_unrecognised_command(command)
10 changes: 10 additions & 0 deletions pyredis/persistence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class AppendOnlyPersister:
def __init__(self, filename):
self._filename = filename
self._file = open(filename, mode="ab", buffering=0)

def log_command(self, command):
self._file.write(f"*{len(command)}\r\n".encode())

for item in command:
self._file.write(item.resp_encode())
9 changes: 8 additions & 1 deletion pyredis/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@ def resp_encode(self):
if self.data is None:
return "$-1\r\n".encode()
else:
return f"${len(self.data)}\r\n{self.data}\r\n".encode()
if isinstance(self.data, str):
return f"${len(self.data)}\r\n{self.data}\r\n".encode()
else:
return (
f"${len(self.data)}\r\n".encode()
+ bytes(self.data)
+ "\r\n".encode()
)


@dataclass
Expand Down
Loading

0 comments on commit bc8980b

Please sign in to comment.