Skip to content

Commit

Permalink
process connect proxy result subs (#874)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Aug 12, 2024
1 parent 3a2558a commit 8189ecb
Showing 1 changed file with 74 additions and 3 deletions.
77 changes: 74 additions & 3 deletions internal/proxy/connect_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,19 @@ func (h *ConnectHandler) Handle(node *centrifuge.Node) ConnectingHandlerFunc {
reply.Data = data
}
if len(result.Channels) > 0 {
subscriptions := make(map[string]centrifuge.SubscribeOptions, len(result.Channels))
if reply.Subscriptions == nil {
reply.Subscriptions = make(map[string]centrifuge.SubscribeOptions, len(result.Channels))
}
for _, ch := range result.Channels {
_, _, chOpts, found, err := h.ruleContainer.ChannelOptions(ch)
if err != nil {
return centrifuge.ConnectReply{}, ConnectExtra{}, err
}
if !found {
node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "unknown channel in connect result channels", map[string]any{"client": e.ClientID, "channel": ch}))
return centrifuge.ConnectReply{}, ConnectExtra{}, centrifuge.ErrorUnknownChannel
}
subscriptions[ch] = centrifuge.SubscribeOptions{
reply.Subscriptions[ch] = centrifuge.SubscribeOptions{
EmitPresence: chOpts.Presence,
EmitJoinLeave: chOpts.JoinLeave,
PushJoinLeave: chOpts.ForcePushJoinLeave,
Expand All @@ -148,8 +151,76 @@ func (h *ConnectHandler) Handle(node *centrifuge.Node) ConnectingHandlerFunc {
HistoryMetaTTL: time.Duration(chOpts.HistoryMetaTTL),
}
}
reply.Subscriptions = subscriptions
}
if len(result.Subs) > 0 {
if reply.Subscriptions == nil {
reply.Subscriptions = make(map[string]centrifuge.SubscribeOptions, len(result.Subs))
}
for ch, options := range result.Subs {
_, _, chOpts, found, err := h.ruleContainer.ChannelOptions(ch)
if err != nil {
return centrifuge.ConnectReply{}, ConnectExtra{}, err
}
if !found {
node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "unknown channel in connect result subs", map[string]any{"client": e.ClientID, "channel": ch}))
return centrifuge.ConnectReply{}, ConnectExtra{}, centrifuge.ErrorUnknownChannel
}
var chInfo []byte
if options.B64Info != "" {
byteInfo, err := base64.StdEncoding.DecodeString(options.B64Info)
if err != nil {
return centrifuge.ConnectReply{}, ConnectExtra{}, err
}
chInfo = byteInfo
} else {
chInfo = options.Info
}
var chData []byte
if options.B64Data != "" {
byteInfo, err := base64.StdEncoding.DecodeString(options.B64Data)
if err != nil {
return centrifuge.ConnectReply{}, ConnectExtra{}, err
}
chData = byteInfo
} else {
chData = options.Data
}
presence := chOpts.Presence
if options.Override != nil && options.Override.Presence != nil {
presence = options.Override.Presence.Value
}
joinLeave := chOpts.JoinLeave
if options.Override != nil && options.Override.JoinLeave != nil {
joinLeave = options.Override.JoinLeave.Value
}
pushJoinLeave := chOpts.ForcePushJoinLeave
if options.Override != nil && options.Override.ForcePushJoinLeave != nil {
pushJoinLeave = options.Override.ForcePushJoinLeave.Value
}
recovery := chOpts.ForceRecovery
if options.Override != nil && options.Override.ForceRecovery != nil {
recovery = options.Override.ForceRecovery.Value
}
positioning := chOpts.ForcePositioning
if options.Override != nil && options.Override.ForcePositioning != nil {
positioning = options.Override.ForcePositioning.Value
}
recoveryMode := chOpts.GetRecoveryMode()
reply.Subscriptions[ch] = centrifuge.SubscribeOptions{
ChannelInfo: chInfo,
EmitPresence: presence,
EmitJoinLeave: joinLeave,
PushJoinLeave: pushJoinLeave,
EnableRecovery: recovery,
EnablePositioning: positioning,
RecoveryMode: recoveryMode,
Data: chData,
Source: subsource.ConnectProxy,
HistoryMetaTTL: time.Duration(chOpts.HistoryMetaTTL),
}
}
}

if result.Meta != nil {
reply.Storage = map[string]any{
clientstorage.KeyMeta: json.RawMessage(result.Meta),
Expand Down

0 comments on commit 8189ecb

Please sign in to comment.