-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Decoding of Messages Received from Backend #1
base: main
Are you sure you want to change the base?
Changes from 4 commits
3df2fb9
f6eebff
1b8357f
1058a5b
e4ef67b
ec9769c
139909d
b37d880
b7502d1
edc61fa
80f2774
5cc95b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import Foundation | ||
import NIOCore | ||
|
||
struct MemcacheBackendMessageDecoder: NIOSingleStepByteToMessageDecoder { | ||
typealias InboundOut = MemcacheBackendMessage | ||
|
||
func decode(buffer: inout ByteBuffer) throws -> MemcacheBackendMessage? { | ||
// Keep track of the reader index in case we later notice that we need more data | ||
let startReaderIndex = buffer.readerIndex | ||
|
||
// Peek at the message to read the verb. It is before the first \r\n and before the first <space> if the message | ||
// contains one. | ||
guard let messageSlice = buffer.getCarriageReturnNewlineTerminatedSlice(at: buffer.readerIndex) else { | ||
// reader index wasn't moved, wait for more bytes | ||
return nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the benefit of finding the first line, if that doesn't ensure that this will be the complete first message? IIUC you are interested in the first There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes correct. Though being able to read a line does not always indicate that we have a full message. For value messages (format: Maybe the |
||
|
||
// The verb in messageSlice is either the entire slice or the part before the first <space> | ||
let endVerbIndex = messageSlice.readableBytesView.firstIndex(of: .space) ?? messageSlice.writerIndex | ||
|
||
guard let verbString = messageSlice.getString(at: messageSlice.readerIndex, length: endVerbIndex) else { | ||
// If we can't read a string, the messageSlice must be empty (i.e. no characters before the first occurence of \r\n) | ||
throw MemcacheDecodingError.emptyMessageReceived(bytes: buffer) | ||
} | ||
|
||
guard let verb = MemcacheBackendMessage.Verb(rawValue: verbString) else { | ||
throw MemcacheDecodingError.unknownVerbReceived(messageVerb: verbString, messageBytes: messageSlice) | ||
} | ||
|
||
// Move the buffer's readerIndex to after the verb so we can continue reading flags and/or data. | ||
buffer.moveReaderIndex(forwardBy: endVerbIndex) | ||
|
||
if buffer.readableBytesView.first == .space { | ||
// Move the reader index to after the <space> that is following the verb | ||
buffer.moveReaderIndex(forwardBy: 1) | ||
} | ||
|
||
do { | ||
// Pass the buffer instead of messageSlice because .value messages continue after the first \r\n | ||
let result = try MemcacheBackendMessage.decode(from: &buffer, for: verb) | ||
// TODO: Can we make sure the message was read entirely? Difficult because we don't know the length of VA messages here. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know if you have ideas! |
||
return result | ||
} catch _ as MemcacheNeedMoreDataError { | ||
// A message decoder told us that it expects more data. Move the reader index back to the start and try again. | ||
buffer.moveReaderIndex(to: startReaderIndex) | ||
return nil | ||
} catch let error as MemcachePartialDecodingError { | ||
throw MemcacheDecodingError.withPartialError(error, messageVerb: verb.rawValue, messageBytes: messageSlice) | ||
} catch { | ||
preconditionFailure("Expected to only see `MemcachePartialDecodingError` here.") | ||
} | ||
} | ||
|
||
func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> MemcacheBackendMessage? { | ||
try self.decode(buffer: &buffer) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import NIOCore | ||
|
||
struct MemcacheDecodingError: Error { | ||
let messageVerb: String | ||
let payload: String | ||
let description: String | ||
let file: StaticString | ||
let line: UInt | ||
|
||
static func withPartialError( | ||
_ partialError: MemcachePartialDecodingError, | ||
messageVerb: String, | ||
messageBytes: ByteBuffer | ||
) -> Self { | ||
MemcacheDecodingError( | ||
messageVerb: messageVerb, | ||
payload: "", // TODO: Can we get a base64 representation without Foundation? | ||
moritzsternemann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
description: partialError.description, | ||
file: partialError.file, | ||
line: partialError.line | ||
) | ||
} | ||
|
||
static func emptyMessageReceived( | ||
bytes: ByteBuffer, | ||
file: StaticString = #file, | ||
moritzsternemann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
line: UInt = #line | ||
) -> Self { | ||
MemcacheDecodingError( | ||
messageVerb: "", | ||
payload: "", // TODO: Can we get a base64 representation without Foundation? | ||
description: "Received an empty message (i.e. no characters before the first occurence of \r\n). A valid message has to contain a messageVerb at least.", | ||
file: file, | ||
line: line | ||
) | ||
} | ||
|
||
static func unknownVerbReceived( | ||
messageVerb: String, | ||
messageBytes: ByteBuffer, | ||
file: StaticString = #file, | ||
line: UInt = #line | ||
) -> Self { | ||
MemcacheDecodingError( | ||
messageVerb: messageVerb, | ||
payload: "", // TODO: Can we get a base64 representation without Foundation? | ||
moritzsternemann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
description: "Received a message with messageVerb '\(messageVerb)'. There is no message type associated with this message identifier.", | ||
file: file, | ||
line: line | ||
) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
struct MemcacheNeedMoreDataError: Error {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import NIOCore | ||
|
||
struct MemcachePartialDecodingError: Error { | ||
let description: String | ||
let file: StaticString | ||
let line: UInt | ||
|
||
static func fieldNotDecodable( | ||
as type: Any.Type, | ||
from string: String, | ||
file: StaticString = #file, | ||
line: UInt = #line | ||
) -> Self { | ||
MemcachePartialDecodingError( | ||
description: "Could not read '\(type)' from '\(string)' from the ByteBuffer.", | ||
file: file, | ||
line: line | ||
) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import NIOCore | ||
|
||
extension MemcacheBackendMessage { | ||
struct Flags: MemcacheMessagePayloadDecodable, Equatable, ExpressibleByArrayLiteral { | ||
let flags: [String] // TODO: Do we want something like (Character, token: String?) instead? Or a struct? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we try to parse the flags a bit further in this stage, we can catch malformed messages earlier and it might make parsing the meta-command-specific flags a bit easier. Flags are always a single character followed by an optional token string. Some tokens have a length limit but I don't think it makes sense to try to enforce it in this stage. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the flags should be an enum with associated values. And I think we should already enforce this here. I think users should be able to never decode anything after this stage and trust the values 100%. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Totally agree 👍 One potential issue however is that flags can have slightly different meanings depending on the sent command. For example, the One thing we can't verify here though is which flags can be part of a message. This also depends on the sent command. Edit: Or do you think we should just be unopinionated and directly use the flag characters as the enum case names? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I implemented decoding of flags, I think, as far as possible and added checks for things like data type and length where it makes sense. |
||
|
||
init(_ flags: [String]) { | ||
self.flags = flags | ||
} | ||
|
||
init(arrayLiteral elements: String...) { | ||
flags = elements | ||
} | ||
|
||
/// Decode flags from any backend message. | ||
/// | ||
/// The following formats can be decoded from the `buffer`: | ||
/// - `<flags>\r\n`. Flags are space-separated strings. | ||
/// - `\r\n`. No flags. | ||
static func decode(from buffer: inout ByteBuffer) throws -> Self { | ||
// Flags are always the last part of a message, which is terminated by \r\n. | ||
// Because we get passed a potentially longer buffer here, we parse until \r\n. | ||
guard var flagsSlice = buffer.readCarriageReturnNewlineTerminatedSlice() else { | ||
// No \r\n? Something went terribly wrong... | ||
preconditionFailure("Expected to only see messages that contain \r\n here.") | ||
} | ||
|
||
// Flags can always be empty | ||
guard flagsSlice.readableBytes > 0 else { return [] } | ||
|
||
// The slice now only contains the flags separated by <space> | ||
guard let flagsString = flagsSlice.readString(length: flagsSlice.readableBytes) else { | ||
preconditionFailure("We have readable bytes so we should be able to read a string") | ||
} | ||
|
||
return Flags(flagsString.components(separatedBy: " ")) | ||
} | ||
} | ||
} | ||
|
||
extension MemcacheBackendMessage.Flags: CustomDebugStringConvertible { | ||
var debugDescription: String { | ||
"[\(flags.map({ "\"\($0)\"" }).joined(separator: ", "))]" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import NIOCore | ||
|
||
extension MemcacheBackendMessage { | ||
struct Value: MemcacheMessagePayloadDecodable, Equatable { | ||
var flags: Flags | ||
var data: ByteBuffer | ||
|
||
/// Decode a `VA` backend message. | ||
/// | ||
/// The message can have the following formats: | ||
/// - `<size> <flags>\r\n<data block>\r\n`. Flags are space-separated strings. | ||
/// - `<size>\r\n<data block>\r\n` | ||
static func decode(from buffer: inout ByteBuffer) throws -> Self { | ||
// Decode the size of the data block, optional flags, and the data block itself | ||
guard let valueMetaSlice = buffer.getCarriageReturnNewlineTerminatedSlice(at: buffer.readerIndex) else { | ||
// No \r\n? Something went terribly wrong... | ||
preconditionFailure("Expected to only see messages that contain \r\n here.") | ||
} | ||
|
||
// The size value in valueMetaSlice is either the entire slice or the part before the first <space> | ||
let endSizeIndex = valueMetaSlice.readableBytesView.firstIndex(of: .space) ?? valueMetaSlice.writerIndex | ||
|
||
guard let sizeString = valueMetaSlice.getString(at: valueMetaSlice.readerIndex, length: endSizeIndex) else { | ||
preconditionFailure("We have readable bytes so we should be able to read a string") | ||
} | ||
moritzsternemann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
guard let size = Int(sizeString) else { | ||
throw MemcachePartialDecodingError.fieldNotDecodable(as: Int.self, from: sizeString) | ||
} | ||
|
||
// Move the buffer's readerIndex to after the size so we can continue reading flags and/or data. | ||
buffer.moveReaderIndex(forwardBy: endSizeIndex) | ||
|
||
if buffer.readableBytesView.first == .space { | ||
// Move the reader index to after the <space> that is following the size | ||
buffer.moveReaderIndex(forwardBy: 1) | ||
} | ||
|
||
let flags = try Flags.decode(from: &buffer) | ||
|
||
guard let dataBlock = buffer.readSlice(length: size) else { | ||
// Tell the decoder that we expect more data | ||
throw MemcacheNeedMoreDataError() | ||
} | ||
|
||
// Make sure we received the final terminating \r\n. | ||
guard let _ = buffer.readCarriageReturnNewlineTerminatedSlice() else { | ||
throw MemcacheNeedMoreDataError() | ||
} | ||
|
||
return Value(flags: flags, data: dataBlock) | ||
} | ||
} | ||
} | ||
|
||
extension MemcacheBackendMessage.Value: CustomDebugStringConvertible { | ||
var debugDescription: String { | ||
return "flags: \(flags), data: \(data.readableBytes) bytes" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
import NIOCore | ||
|
||
protocol MemcacheMessagePayloadDecodable { | ||
static func decode(from buffer: inout ByteBuffer) throws -> Self | ||
} | ||
|
||
enum MemcacheBackendMessage { | ||
/// Header (`HD <flags>*\r\n`) | ||
case header(Flags) | ||
|
||
/// Not found (`NF <flags>*\r\n`) | ||
case notFound(Flags) | ||
|
||
/// Not stored (`NS <flags>*\r\n`) | ||
case notStored(Flags) | ||
|
||
/// Exists (`EX <flags>*\r\n`) | ||
case exists(Flags) | ||
|
||
/// Value (`VA <size> <flags>*\r\n<data block>\r\n`) | ||
case value(Value) | ||
|
||
/// End (`EN\r\n`) | ||
case end | ||
|
||
// TODO: error responses | ||
} | ||
|
||
extension MemcacheBackendMessage { | ||
enum Verb: RawRepresentable { | ||
typealias RawValue = String | ||
|
||
case header | ||
case notFound | ||
case notStored | ||
case exists | ||
case value | ||
case end | ||
// TODO: error responses | ||
|
||
init?(rawValue: String) { | ||
switch rawValue { | ||
case "HD": | ||
self = .header | ||
case "NF": | ||
self = .notFound | ||
case "NS": | ||
self = .notStored | ||
case "EX": | ||
self = .exists | ||
case "VA": | ||
self = .value | ||
case "EN": | ||
self = .end | ||
default: | ||
return nil | ||
} | ||
} | ||
|
||
var rawValue: String { | ||
switch self { | ||
case .header: | ||
return "HD" | ||
case .notFound: | ||
return "NF" | ||
case .notStored: | ||
return "NS" | ||
case .exists: | ||
return "EX" | ||
case .value: | ||
return "VA" | ||
case .end: | ||
return "EN" | ||
} | ||
} | ||
} | ||
} | ||
|
||
extension MemcacheBackendMessage { | ||
static func decode(from buffer: inout ByteBuffer, for verb: Verb) throws -> MemcacheBackendMessage { | ||
switch verb { | ||
case .header: | ||
return try .header(.decode(from: &buffer)) | ||
case .notFound: | ||
return try .notFound(.decode(from: &buffer)) | ||
case .notStored: | ||
return try .notStored(.decode(from: &buffer)) | ||
case .exists: | ||
return try .exists(.decode(from: &buffer)) | ||
case .value: | ||
return try .value(.decode(from: &buffer)) | ||
case .end: | ||
buffer.moveReaderIndex(forwardBy: 2) | ||
return .end | ||
} | ||
} | ||
} | ||
|
||
extension MemcacheBackendMessage: CustomDebugStringConvertible { | ||
var debugDescription: String { | ||
switch self { | ||
case let .header(flags): | ||
return ".header(\(String(reflecting: flags)))" | ||
case let .notFound(flags): | ||
return ".notFound(\(String(reflecting: flags)))" | ||
case let .notStored(flags): | ||
return ".notStored(\(String(reflecting: flags)))" | ||
case let .exists(flags): | ||
return ".exists(\(String(reflecting: flags)))" | ||
case let .value(value): | ||
return ".value(\(String(reflecting: value)))" | ||
case .end: | ||
return ".end" | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of handling indexes so much, a better way is normally to get a copy of the bytebuffer.