Skip to content

Commit

Permalink
rfq+rfqmsg: lookup RFQ session while parsing incoming messages
Browse files Browse the repository at this point in the history
Future RFQ quote accept messages will not contain enough information
for the RFQ service to classify them as buy or sell. Therefore, before
these messages can be fully interpreted, the corresponding quote request
message must be retrieved and inspected.

This commit modifies the parsing of incoming quote accept messages so
they can be accurately classified as buy or sell by looking up the
associated quote request message.

As a beneficial side effect, parsed quote accept message fields are now
fully populated within the parsing routine.
  • Loading branch information
ffranr committed Sep 19, 2024
1 parent 793ddf6 commit b91ebfd
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 77 deletions.
75 changes: 13 additions & 62 deletions rfq/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ func NewStreamHandler(ctx context.Context,
func (h *StreamHandler) handleIncomingWireMessage(
wireMsg rfqmsg.WireMessage) error {

// Parse the wire message as an RFQ message.
msg, err := rfqmsg.NewIncomingMsgFromWire(wireMsg)
// Parse the wire message as an RFQ message. The session cache load
// function is provided to associate incoming wire messages with their
// corresponding outgoing requests during parsing.
msg, err := rfqmsg.NewIncomingMsgFromWire(
wireMsg, h.outgoingRequests.Load,
)
if err != nil {
if errors.Is(err, rfqmsg.ErrUnknownMessageType) {
// Silently disregard the message if we don't recognise
Expand All @@ -109,66 +113,13 @@ func (h *StreamHandler) handleIncomingWireMessage(

log.Debugf("Stream handling incoming message: %s", msg)

// If the incoming message is an accept message, lookup the
// corresponding outgoing request message. Assign the outgoing request
// to a field on the accept message. This step allows us to easily
// access the request that the accept message is responding to. Some of
// the request fields are not present in the accept message.
//
// If the incoming message is a reject message, remove the corresponding
// outgoing request from the store.
switch typedMsg := msg.(type) {
case *rfqmsg.Reject:
// Delete the corresponding outgoing request from the store.
h.outgoingRequests.Delete(typedMsg.ID)

case *rfqmsg.BuyAccept:
// Load and delete the corresponding outgoing request from the
// store.
outgoingRequest, found := h.outgoingRequests.LoadAndDelete(
typedMsg.ID,
)

// Ensure that we have an outgoing request to match the incoming
// accept message.
if !found {
return fmt.Errorf("no outgoing request found for "+
"incoming accept message: %s", typedMsg.ID)
}

// Type cast the outgoing message to a BuyRequest (the request
// type that corresponds to a buy accept message).
buyReq, ok := outgoingRequest.(*rfqmsg.BuyRequest)
if !ok {
return fmt.Errorf("expected BuyRequest, got %T",
outgoingRequest)
}

typedMsg.Request = *buyReq

case *rfqmsg.SellAccept:
// Load and delete the corresponding outgoing request from the
// store.
outgoingRequest, found := h.outgoingRequests.LoadAndDelete(
typedMsg.ID,
)

// Ensure that we have an outgoing request to match the incoming
// accept message.
if !found {
return fmt.Errorf("no outgoing request found for "+
"incoming accept message: %s", typedMsg.ID)
}

// Type cast the outgoing message to a SellRequest (the request
// type that corresponds to a sell accept message).
req, ok := outgoingRequest.(*rfqmsg.SellRequest)
if !ok {
return fmt.Errorf("expected SellRequest, got %T",
outgoingRequest)
}

typedMsg.Request = *req
// If the incoming message is a response to an outgoing request, we
// will remove the corresponding session from the store. We can safely
// remove the session at this point because we have received the only
// response we expect for this session.
switch msg.(type) {
case *rfqmsg.BuyAccept, *rfqmsg.SellAccept, *rfqmsg.Reject:
h.outgoingRequests.Delete(msg.MsgID())
}

// Send the incoming message to the RFQ manager.
Expand Down
37 changes: 26 additions & 11 deletions rfqmsg/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ func (m *acceptWireMsgData) Bytes() ([]byte, error) {
// asset to us. Conversely, an incoming sell accept message indicates that our
// peer accepts our sell request, meaning they are willing to buy the asset from
// us.
func NewIncomingAcceptFromWire(wireMsg WireMessage) (IncomingMsg, error) {
func NewIncomingAcceptFromWire(wireMsg WireMessage,
sessionLookup SessionLookup) (IncomingMsg, error) {

// Ensure that the message type is a quote accept message.
if wireMsg.MsgType != MsgTypeAccept {
return nil, ErrUnknownMessageType
Expand All @@ -248,17 +250,30 @@ func NewIncomingAcceptFromWire(wireMsg WireMessage) (IncomingMsg, error) {
"quote accept message: %w", err)
}

// We will now determine whether this is a buy or sell accept. We can
// distinguish between buy/sell accept messages by inspecting which tick
// rate field is populated.
isBuyAccept := msgData.InOutRateTick.IsSome()
// Before we can determine whether this is a buy or sell accept, we need
// to look up the corresponding outgoing request message. This step is
// necessary because the accept message data does not contain sufficient
// data to distinguish between buy and sell accept messages.
if sessionLookup == nil {
return nil, fmt.Errorf("RFQ session lookup function is " +
"required")
}

// If this is a buy request, then we will create a new buy request
// message.
if isBuyAccept {
return newBuyAcceptFromWireMsg(wireMsg, msgData)
request, found := sessionLookup(msgData.ID.Val)
if !found {
return nil, fmt.Errorf("no outgoing request found for "+
"incoming accept message: %s", msgData.ID.Val)
}

// Otherwise, this is a sell request.
return newSellAcceptFromWireMsg(wireMsg, msgData)
// Use the corresponding request to determine the type of accept
// message.
switch typedRequest := request.(type) {
case *BuyRequest:
return newBuyAcceptFromWireMsg(wireMsg, msgData, *typedRequest)
case *SellRequest:
return newSellAcceptFromWireMsg(wireMsg, msgData, *typedRequest)
default:
return nil, fmt.Errorf("unknown request type for incoming "+
"accept message: %T", request)
}
}
3 changes: 2 additions & 1 deletion rfqmsg/buy_accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewBuyAcceptFromRequest(request BuyRequest, askPrice lnwire.MilliSatoshi,

// newBuyAcceptFromWireMsg instantiates a new instance from a wire message.
func newBuyAcceptFromWireMsg(wireMsg WireMessage,
msgData acceptWireMsgData) (*BuyAccept, error) {
msgData acceptWireMsgData, request BuyRequest) (*BuyAccept, error) {

// Ensure that the message type is an accept message.
if wireMsg.MsgType != MsgTypeAccept {
Expand All @@ -79,6 +79,7 @@ func newBuyAcceptFromWireMsg(wireMsg WireMessage,

return &BuyAccept{
Peer: wireMsg.Peer,
Request: request,
Version: msgData.Version.Val,
ID: msgData.ID.Val,
Expiry: msgData.Expiry.Val,
Expand Down
10 changes: 8 additions & 2 deletions rfqmsg/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,19 @@ type WireMessage struct {
Data []byte
}

// SessionLookup is a function that can be used to look up a session quote
// request message given a session ID.
type SessionLookup func(id ID) (OutgoingMsg, bool)

// NewIncomingMsgFromWire creates a new RFQ message from a wire message.
func NewIncomingMsgFromWire(wireMsg WireMessage) (IncomingMsg, error) {
func NewIncomingMsgFromWire(wireMsg WireMessage,
sessionLookup SessionLookup) (IncomingMsg, error) {

switch wireMsg.MsgType {
case MsgTypeRequest:
return NewIncomingRequestFromWire(wireMsg)
case MsgTypeAccept:
return NewIncomingAcceptFromWire(wireMsg)
return NewIncomingAcceptFromWire(wireMsg, sessionLookup)
case MsgTypeReject:
return NewQuoteRejectFromWireMsg(wireMsg)
default:
Expand Down
4 changes: 3 additions & 1 deletion rfqmsg/sell_accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func NewSellAcceptFromRequest(request SellRequest, bidPrice lnwire.MilliSatoshi,

// newSellAcceptFromWireMsg instantiates a new instance from a wire message.
func newSellAcceptFromWireMsg(wireMsg WireMessage,
msgData acceptWireMsgData) (*SellAccept, error) {
msgData acceptWireMsgData, request SellRequest) (*SellAccept,
error) {

// Ensure that the message type is an accept message.
if wireMsg.MsgType != MsgTypeAccept {
Expand All @@ -82,6 +83,7 @@ func newSellAcceptFromWireMsg(wireMsg WireMessage,
// service.
return &SellAccept{
Peer: wireMsg.Peer,
Request: request,
Version: msgData.Version.Val,
ID: msgData.ID.Val,
BidPrice: bidPrice,
Expand Down

0 comments on commit b91ebfd

Please sign in to comment.