Skip to content

Commit

Permalink
Merge pull request #43 from xconnio/subsave
Browse files Browse the repository at this point in the history
Correctly save subscription, log fixes
  • Loading branch information
om26er authored Jun 30, 2024
2 parents 0fe0eeb + 493d1dd commit 316020c
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
9 changes: 5 additions & 4 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (b *Broker) ReceiveMessage(sessionID int64, msg messages.Message) (*Message
case messages.MessageTypeSubscribe:
_, exists := b.subscriptionsBySession[sessionID]
if !exists {
return nil, fmt.Errorf("dealer: cannot subscribe, session %d doesn't exist", sessionID)
return nil, fmt.Errorf("broker: cannot subscribe, session %d doesn't exist", sessionID)
}

subscribe := msg.(*messages.Subscribe)
Expand All @@ -98,6 +98,7 @@ func (b *Broker) ReceiveMessage(sessionID int64, msg messages.Message) (*Message
Topic: subscribe.Topic(),
Subscribers: map[int64]int64{sessionID: sessionID},
}
b.subscriptionsByTopic[subscribe.Topic()] = subscription
}

b.subscriptionsBySession[sessionID][subscription.ID] = subscription
Expand All @@ -109,7 +110,7 @@ func (b *Broker) ReceiveMessage(sessionID int64, msg messages.Message) (*Message
unsubscribe := msg.(*messages.UnSubscribe)
subscriptions, exists := b.subscriptionsBySession[sessionID]
if !exists {
return nil, fmt.Errorf("dealer: cannot unsubscribe, session %d doesn't exist", sessionID)
return nil, fmt.Errorf("broker: cannot unsubscribe, session %d doesn't exist", sessionID)
}

subscription, exists := subscriptions[unsubscribe.SubscriptionID()]
Expand All @@ -129,9 +130,9 @@ func (b *Broker) ReceiveMessage(sessionID int64, msg messages.Message) (*Message
result := &MessageWithRecipient{Message: unsubscribed, Recipient: sessionID}
return result, nil
case messages.MessageTypeError:
return nil, fmt.Errorf("dealer: error handling is not implemented yet")
return nil, fmt.Errorf("broker: error handling is not implemented yet")
default:
return nil, fmt.Errorf("dealer: received unexpected message of type %T", msg)
return nil, fmt.Errorf("broker: received unexpected message of type %T", msg)
}
}

Expand Down
2 changes: 0 additions & 2 deletions dealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ func (d *Dealer) ReceiveMessage(sessionID int64, msg messages.Message) (*Message

var result *messages.Result
if yield.PayloadIsBinary() && d.sessions[pending.CallerID].StaticSerializer() {
// FIXME: If YIELD has binary payload, we need to make sure the caller
// also supports binary payloads.
result = messages.NewResultBinary(pending.RequestID, nil, yield.Payload(), yield.PayloadSerializer())
} else {
result = messages.NewResult(pending.RequestID, nil, yield.Args(), yield.KwArgs())
Expand Down

0 comments on commit 316020c

Please sign in to comment.