Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Decoding of Messages Received from Backend #1

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ let package = Package(
.testTarget(
name: "MemcacheTests",
dependencies: [
.target(name: "Memcache")
.target(name: "Memcache"),
.product(name: "NIOTestUtils", package: "swift-nio")
]
),

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import Foundation
import NIOCore

struct MemcacheBackendMessageDecoder: NIOSingleStepByteToMessageDecoder {
typealias InboundOut = MemcacheBackendMessage

func decode(buffer: inout ByteBuffer) throws -> MemcacheBackendMessage? {
// Keep track of the reader index in case we later notice that we need more data
let startReaderIndex = buffer.readerIndex
Copy link

@fabianfett fabianfett Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of handling indexes so much, a better way is normally to get a copy of the bytebuffer.

var peekableBuffer = buffer

// if you were able to decode a buffer, just write the new reader indexes back:
buffer = peakableBuffer


// Peek at the message to read the verb. It is before the first \r\n and before the first <space> if the message
// contains one.
guard let messageSlice = buffer.getCarriageReturnNewlineTerminatedSlice(at: buffer.readerIndex) else {
// reader index wasn't moved, wait for more bytes
return nil
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of finding the first line, if that doesn't ensure that this will be the complete first message? IIUC you are interested in the first VERB and you can get that by finding either a space or a \r\n correct?

Copy link
Owner Author

@moritzsternemann moritzsternemann Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes correct. Though being able to read a line does not always indicate that we have a full message. For value messages (format: VA <data block size> <flags>\r\n<data block>\r\n), we only know how long the message should be when we start parsing the part after the verb.
If we would look for a space first, we might fail to parse the following buffer for example: EN\r\nHD <flags>\r\n.

Maybe the messageSlice name is also a bit misleading here 😇


// The verb in messageSlice is either the entire slice or the part before the first <space>
let endVerbIndex = messageSlice.readableBytesView.firstIndex(of: .space) ?? messageSlice.writerIndex

guard let verbString = messageSlice.getString(at: messageSlice.readerIndex, length: endVerbIndex) else {
// If we can't read a string, the messageSlice must be empty (i.e. no characters before the first occurence of \r\n)
throw MemcacheDecodingError.emptyMessageReceived(bytes: buffer)
}

guard let verb = MemcacheBackendMessage.Verb(rawValue: verbString) else {
throw MemcacheDecodingError.unknownVerbReceived(messageVerb: verbString, messageBytes: messageSlice)
}

// Move the buffer's readerIndex to after the verb so we can continue reading flags and/or data.
buffer.moveReaderIndex(forwardBy: endVerbIndex)

if buffer.readableBytesView.first == .space {
// Move the reader index to after the <space> that is following the verb
buffer.moveReaderIndex(forwardBy: 1)
}

do {
// Pass the buffer instead of messageSlice because .value messages continue after the first \r\n
let result = try MemcacheBackendMessage.decode(from: &buffer, for: verb)
// TODO: Can we make sure the message was read entirely? Difficult because we don't know the length of VA messages here.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you have ideas!

return result
} catch _ as MemcacheNeedMoreDataError {
// A message decoder told us that it expects more data. Move the reader index back to the start and try again.
buffer.moveReaderIndex(to: startReaderIndex)
return nil
} catch let error as MemcachePartialDecodingError {
throw MemcacheDecodingError.withPartialError(error, messageVerb: verb.rawValue, messageBytes: messageSlice)
} catch {
preconditionFailure("Expected to only see `MemcachePartialDecodingError` here.")
}
}

func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> MemcacheBackendMessage? {
try self.decode(buffer: &buffer)
}
}
52 changes: 52 additions & 0 deletions Sources/Memcache/ChannelHandler/MemcacheDecodingError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import NIOCore

struct MemcacheDecodingError: Error {
let messageVerb: String
let payload: String
let description: String
let file: StaticString
let line: UInt

static func withPartialError(
_ partialError: MemcachePartialDecodingError,
messageVerb: String,
messageBytes: ByteBuffer
) -> Self {
MemcacheDecodingError(
messageVerb: messageVerb,
payload: "", // TODO: Can we get a base64 representation without Foundation?
moritzsternemann marked this conversation as resolved.
Show resolved Hide resolved
description: partialError.description,
file: partialError.file,
line: partialError.line
)
}

static func emptyMessageReceived(
bytes: ByteBuffer,
file: StaticString = #file,
moritzsternemann marked this conversation as resolved.
Show resolved Hide resolved
line: UInt = #line
) -> Self {
MemcacheDecodingError(
messageVerb: "",
payload: "", // TODO: Can we get a base64 representation without Foundation?
description: "Received an empty message (i.e. no characters before the first occurence of \r\n). A valid message has to contain a messageVerb at least.",
file: file,
line: line
)
}

static func unknownVerbReceived(
messageVerb: String,
messageBytes: ByteBuffer,
file: StaticString = #file,
line: UInt = #line
) -> Self {
MemcacheDecodingError(
messageVerb: messageVerb,
payload: "", // TODO: Can we get a base64 representation without Foundation?
moritzsternemann marked this conversation as resolved.
Show resolved Hide resolved
description: "Received a message with messageVerb '\(messageVerb)'. There is no message type associated with this message identifier.",
file: file,
line: line
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
struct MemcacheNeedMoreDataError: Error {}
20 changes: 20 additions & 0 deletions Sources/Memcache/ChannelHandler/MemcachePartialDecodingError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import NIOCore

struct MemcachePartialDecodingError: Error {
let description: String
let file: StaticString
let line: UInt

static func fieldNotDecodable(
as type: Any.Type,
from string: String,
file: StaticString = #file,
line: UInt = #line
) -> Self {
MemcachePartialDecodingError(
description: "Could not read '\(type)' from '\(string)' from the ByteBuffer.",
file: file,
line: line
)
}
}
45 changes: 45 additions & 0 deletions Sources/Memcache/ChannelHandler/Messages/Backend+Flags.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import NIOCore

extension MemcacheBackendMessage {
struct Flags: MemcacheMessagePayloadDecodable, Equatable, ExpressibleByArrayLiteral {
let flags: [String] // TODO: Do we want something like (Character, token: String?) instead? Or a struct?
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we try to parse the flags a bit further in this stage, we can catch malformed messages earlier and it might make parsing the meta-command-specific flags a bit easier. Flags are always a single character followed by an optional token string. Some tokens have a length limit but I don't think it makes sense to try to enforce it in this stage.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the flags should be an enum with associated values. And I think we should already enforce this here. I think users should be able to never decode anything after this stage and trust the values 100%.

Copy link
Owner Author

@moritzsternemann moritzsternemann Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree 👍 One potential issue however is that flags can have slightly different meanings depending on the sent command. For example, the N(token) flags is described as vivify on miss, takes TTL as argument for Meta Get, and as auto create item on miss with supplied TTL for Meta Arithmetic commands. I couldn't find any collisions so we just need to choose good names for the enum cases.

One thing we can't verify here though is which flags can be part of a message. This also depends on the sent command.

Edit: Or do you think we should just be unopinionated and directly use the flag characters as the enum case names?

Copy link
Owner Author

@moritzsternemann moritzsternemann Sep 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented decoding of flags, I think, as far as possible and added checks for things like data type and length where it makes sense.


init(_ flags: [String]) {
self.flags = flags
}

init(arrayLiteral elements: String...) {
flags = elements
}

/// Decode flags from any backend message.
///
/// The following formats can be decoded from the `buffer`:
/// - `<flags>\r\n`. Flags are space-separated strings.
/// - `\r\n`. No flags.
static func decode(from buffer: inout ByteBuffer) throws -> Self {
// Flags are always the last part of a message, which is terminated by \r\n.
// Because we get passed a potentially longer buffer here, we parse until \r\n.
guard var flagsSlice = buffer.readCarriageReturnNewlineTerminatedSlice() else {
// No \r\n? Something went terribly wrong...
preconditionFailure("Expected to only see messages that contain \r\n here.")
}

// Flags can always be empty
guard flagsSlice.readableBytes > 0 else { return [] }

// The slice now only contains the flags separated by <space>
guard let flagsString = flagsSlice.readString(length: flagsSlice.readableBytes) else {
preconditionFailure("We have readable bytes so we should be able to read a string")
}

return Flags(flagsString.components(separatedBy: " "))
}
}
}

extension MemcacheBackendMessage.Flags: CustomDebugStringConvertible {
var debugDescription: String {
"[\(flags.map({ "\"\($0)\"" }).joined(separator: ", "))]"
}
}
60 changes: 60 additions & 0 deletions Sources/Memcache/ChannelHandler/Messages/Backend+Value.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import NIOCore

extension MemcacheBackendMessage {
struct Value: MemcacheMessagePayloadDecodable, Equatable {
var flags: Flags
var data: ByteBuffer

/// Decode a `VA` backend message.
///
/// The message can have the following formats:
/// - `<size> <flags>\r\n<data block>\r\n`. Flags are space-separated strings.
/// - `<size>\r\n<data block>\r\n`
static func decode(from buffer: inout ByteBuffer) throws -> Self {
// Decode the size of the data block, optional flags, and the data block itself
guard let valueMetaSlice = buffer.getCarriageReturnNewlineTerminatedSlice(at: buffer.readerIndex) else {
// No \r\n? Something went terribly wrong...
preconditionFailure("Expected to only see messages that contain \r\n here.")
}

// The size value in valueMetaSlice is either the entire slice or the part before the first <space>
let endSizeIndex = valueMetaSlice.readableBytesView.firstIndex(of: .space) ?? valueMetaSlice.writerIndex

guard let sizeString = valueMetaSlice.getString(at: valueMetaSlice.readerIndex, length: endSizeIndex) else {
preconditionFailure("We have readable bytes so we should be able to read a string")
}
moritzsternemann marked this conversation as resolved.
Show resolved Hide resolved

guard let size = Int(sizeString) else {
throw MemcachePartialDecodingError.fieldNotDecodable(as: Int.self, from: sizeString)
}

// Move the buffer's readerIndex to after the size so we can continue reading flags and/or data.
buffer.moveReaderIndex(forwardBy: endSizeIndex)

if buffer.readableBytesView.first == .space {
// Move the reader index to after the <space> that is following the size
buffer.moveReaderIndex(forwardBy: 1)
}

let flags = try Flags.decode(from: &buffer)

guard let dataBlock = buffer.readSlice(length: size) else {
// Tell the decoder that we expect more data
throw MemcacheNeedMoreDataError()
}

// Make sure we received the final terminating \r\n.
guard let _ = buffer.readCarriageReturnNewlineTerminatedSlice() else {
throw MemcacheNeedMoreDataError()
}

return Value(flags: flags, data: dataBlock)
}
}
}

extension MemcacheBackendMessage.Value: CustomDebugStringConvertible {
var debugDescription: String {
return "flags: \(flags), data: \(data.readableBytes) bytes"
}
}
116 changes: 116 additions & 0 deletions Sources/Memcache/ChannelHandler/Messages/MemcacheBackendMessage.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import NIOCore

protocol MemcacheMessagePayloadDecodable {
static func decode(from buffer: inout ByteBuffer) throws -> Self
}

enum MemcacheBackendMessage {
/// Header (`HD <flags>*\r\n`)
case header(Flags)

/// Not found (`NF <flags>*\r\n`)
case notFound(Flags)

/// Not stored (`NS <flags>*\r\n`)
case notStored(Flags)

/// Exists (`EX <flags>*\r\n`)
case exists(Flags)

/// Value (`VA <size> <flags>*\r\n<data block>\r\n`)
case value(Value)

/// End (`EN\r\n`)
case end

// TODO: error responses
}

extension MemcacheBackendMessage {
enum Verb: RawRepresentable {
typealias RawValue = String

case header
case notFound
case notStored
case exists
case value
case end
// TODO: error responses

init?(rawValue: String) {
switch rawValue {
case "HD":
self = .header
case "NF":
self = .notFound
case "NS":
self = .notStored
case "EX":
self = .exists
case "VA":
self = .value
case "EN":
self = .end
default:
return nil
}
}

var rawValue: String {
switch self {
case .header:
return "HD"
case .notFound:
return "NF"
case .notStored:
return "NS"
case .exists:
return "EX"
case .value:
return "VA"
case .end:
return "EN"
}
}
}
}

extension MemcacheBackendMessage {
static func decode(from buffer: inout ByteBuffer, for verb: Verb) throws -> MemcacheBackendMessage {
switch verb {
case .header:
return try .header(.decode(from: &buffer))
case .notFound:
return try .notFound(.decode(from: &buffer))
case .notStored:
return try .notStored(.decode(from: &buffer))
case .exists:
return try .exists(.decode(from: &buffer))
case .value:
return try .value(.decode(from: &buffer))
case .end:
buffer.moveReaderIndex(forwardBy: 2)
return .end
}
}
}

extension MemcacheBackendMessage: CustomDebugStringConvertible {
var debugDescription: String {
switch self {
case let .header(flags):
return ".header(\(String(reflecting: flags)))"
case let .notFound(flags):
return ".notFound(\(String(reflecting: flags)))"
case let .notStored(flags):
return ".notStored(\(String(reflecting: flags)))"
case let .exists(flags):
return ".exists(\(String(reflecting: flags)))"
case let .value(value):
return ".value(\(String(reflecting: value)))"
case .end:
return ".end"
}
}
}
Loading