diff --git a/.codecov.yml b/.codecov.yml index 39687e8e..a7a1a9e0 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -6,3 +6,4 @@ coverage: patch: off ignore: - "internal/controlpb/control*" # generated code + - "client_experimental.go" # experimental code diff --git a/client.go b/client.go index 30896b0a..4c6b4e7e 100644 --- a/client.go +++ b/client.go @@ -125,12 +125,6 @@ func (c *Client) OnHistory(h HistoryHandler) { c.eventHub.historyHandler = h } -// OnStateSnapshot allows settings StateSnapshotHandler. -// This API is EXPERIMENTAL and may be removed in the future versions. -func (c *Client) OnStateSnapshot(h StateSnapshotHandler) { - c.eventHub.stateSnapshotHandler = h -} - const ( // flagSubscribed will be set upon successful Subscription to a channel. // Until that moment channel exists in client Channels map only to track @@ -789,36 +783,6 @@ func (c *Client) Info() []byte { return info } -// AcquireStorage returns an attached connection storage (a map) and a function to be -// called when the application finished working with the storage map. Be accurate when -// using this API – avoid acquiring storage for a long time - i.e. on the time of IO operations. -// Do the work fast and release with the updated map. The API designed this way to allow -// reading, modifying or fully overriding storage map and avoid making deep copies each time. -// Note, that if storage map has not been initialized yet - i.e. if it's nil - then it will -// be initialized to an empty map and then returned – so you never receive nil map when -// acquiring. The purpose of this map is to simplify handling user-defined state during the -// lifetime of connection. Try to keep this map reasonably small. -// This API is EXPERIMENTAL. -func (c *Client) AcquireStorage() (map[string]any, func(map[string]any)) { - c.storageMu.Lock() - if c.storage == nil { - c.storage = map[string]any{} - } - return c.storage, func(updatedStorage map[string]any) { - c.storage = updatedStorage - c.storageMu.Unlock() - } -} - -// StateSnapshot allows collecting current state copy. -// Mostly useful for connection introspection from the outside. -func (c *Client) StateSnapshot() (any, error) { - if c.eventHub.stateSnapshotHandler != nil { - return c.eventHub.stateSnapshotHandler() - } - return nil, nil -} - // Transport returns client connection transport information. func (c *Client) Transport() TransportInfo { return c.transport @@ -1119,17 +1083,6 @@ func (c *Client) HandleCommand(cmd *protocol.Command, cmdProtocolSize int) bool return proceed } -// dispatchCommand dispatches Command into correct command handler. -func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnect, bool) { - c.mu.Lock() - if c.status == statusClosed { - c.mu.Unlock() - return nil, false - } - c.mu.Unlock() - return c.dispatchCommandV2(cmd, cmdSize) -} - // isPong is a helper method to check whether the command from the client // is a pong to server ping. It's actually an empty command. func isPong(cmd *protocol.Command) bool { @@ -1179,7 +1132,13 @@ func (c *Client) handleCommandDispatchError(cmd *protocol.Command, method comman } } -func (c *Client) dispatchCommandV2(cmd *protocol.Command, cmdSize int) (*Disconnect, bool) { +func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnect, bool) { + c.mu.Lock() + if c.status == statusClosed { + c.mu.Unlock() + return nil, false + } + c.mu.Unlock() isConnect := cmd.Connect != nil if !c.authenticated && !isConnect { return &DisconnectBadRequest, false @@ -1635,8 +1594,8 @@ func (c *Client) handleSubscribe(req *protocol.SubscribeRequest, cmd *protocol.C } cb := func(reply SubscribeReply, err error) { - if reply.ReplyWritten != nil { - defer close(reply.ReplyWritten) + if reply.SubscriptionReady != nil { + defer close(reply.SubscriptionReady) } if err != nil { @@ -2243,6 +2202,15 @@ func (c *Client) connectCmd(req *protocol.ConnectRequest, cmd *protocol.Command, subscriptions[ch] = opts } } + if reply.ClientReady != nil { + defer func() { + select { + case <-c.ctx.Done(): + default: + reply.ClientReady <- c + } + }() + } } else { c.startWriter(0, 0, 0) c.pingInterval, c.pongTimeout = getPingPongPeriodValues(c.transport.PingPongConfig()) @@ -2999,66 +2967,6 @@ func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publica return c.transportEnqueue(data) } -func (c *Client) WritePublication(channel string, publication *Publication, sp StreamPosition) error { - if !c.IsSubscribed(channel) { - return nil - } - - pub := pubToProto(publication) - - var ( - jsonReplyV2 []byte - protobufReplyV2 []byte - - jsonPushV2 []byte - protobufPushV2 []byte - ) - - protoType := c.transport.Protocol().toProto() - - if protoType == protocol.TypeJSON { - if c.transport.Unidirectional() { - push := &protocol.Push{Channel: channel, Pub: pub} - var err error - jsonPushV2, err = protocol.DefaultJsonPushEncoder.Encode(push) - if err != nil { - go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c) - return err - } - return c.writePublication(channel, pub, jsonPushV2, sp) - } else { - push := &protocol.Push{Channel: channel, Pub: pub} - var err error - jsonReplyV2, err = protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push}) - if err != nil { - go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c) - return err - } - return c.writePublication(channel, pub, jsonReplyV2, sp) - } - } else if protoType == protocol.TypeProtobuf { - if c.transport.Unidirectional() { - push := &protocol.Push{Channel: channel, Pub: pub} - var err error - protobufPushV2, err = protocol.DefaultProtobufPushEncoder.Encode(push) - if err != nil { - return err - } - return c.writePublication(channel, pub, protobufPushV2, sp) - } else { - push := &protocol.Push{Channel: channel, Pub: pub} - var err error - protobufReplyV2, err = protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push}) - if err != nil { - return err - } - return c.writePublication(channel, pub, protobufReplyV2, sp) - } - } - - return nil -} - func (c *Client) writePublication(ch string, pub *protocol.Publication, data []byte, sp StreamPosition) error { if c.node.LogEnabled(LogLevelTrace) { c.traceOutPush(&protocol.Push{Channel: ch, Pub: pub}) diff --git a/client_experimental.go b/client_experimental.go new file mode 100644 index 00000000..000c7f21 --- /dev/null +++ b/client_experimental.go @@ -0,0 +1,101 @@ +package centrifuge + +import ( + "errors" + + "github.com/centrifugal/protocol" +) + +var errNoSubscription = errors.New("no subscription to a channel") + +// WritePublication allows sending publications to Client subscription directly +// without HUB and Broker semantics. The possible use case is to turn subscription +// to a channel into an individual data stream. +// This API is EXPERIMENTAL and may be changed/removed. +func (c *Client) WritePublication(channel string, publication *Publication, sp StreamPosition) error { + if !c.IsSubscribed(channel) { + return errNoSubscription + } + + pub := pubToProto(publication) + protoType := c.transport.Protocol().toProto() + + if protoType == protocol.TypeJSON { + if c.transport.Unidirectional() { + push := &protocol.Push{Channel: channel, Pub: pub} + var err error + jsonPush, err := protocol.DefaultJsonPushEncoder.Encode(push) + if err != nil { + go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c) + return err + } + return c.writePublication(channel, pub, jsonPush, sp) + } else { + push := &protocol.Push{Channel: channel, Pub: pub} + var err error + jsonReply, err := protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push}) + if err != nil { + go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c) + return err + } + return c.writePublication(channel, pub, jsonReply, sp) + } + } else if protoType == protocol.TypeProtobuf { + if c.transport.Unidirectional() { + push := &protocol.Push{Channel: channel, Pub: pub} + var err error + protobufPush, err := protocol.DefaultProtobufPushEncoder.Encode(push) + if err != nil { + return err + } + return c.writePublication(channel, pub, protobufPush, sp) + } else { + push := &protocol.Push{Channel: channel, Pub: pub} + var err error + protobufReply, err := protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push}) + if err != nil { + return err + } + return c.writePublication(channel, pub, protobufReply, sp) + } + } + + return errors.New("unknown protocol type") +} + +// AcquireStorage returns an attached connection storage (a map) and a function to be +// called when the application finished working with the storage map. Be accurate when +// using this API – avoid acquiring storage for a long time - i.e. on the time of IO operations. +// Do the work fast and release with the updated map. The API designed this way to allow +// reading, modifying or fully overriding storage map and avoid making deep copies each time. +// Note, that if storage map has not been initialized yet - i.e. if it's nil - then it will +// be initialized to an empty map and then returned – so you never receive nil map when +// acquiring. The purpose of this map is to simplify handling user-defined state during the +// lifetime of connection. Try to keep this map reasonably small. +// This API is EXPERIMENTAL and may be changed/removed. +func (c *Client) AcquireStorage() (map[string]any, func(map[string]any)) { + c.storageMu.Lock() + if c.storage == nil { + c.storage = map[string]any{} + } + return c.storage, func(updatedStorage map[string]any) { + c.storage = updatedStorage + c.storageMu.Unlock() + } +} + +// OnStateSnapshot allows settings StateSnapshotHandler. +// This API is EXPERIMENTAL and may be changed/removed. +func (c *Client) OnStateSnapshot(h StateSnapshotHandler) { + c.eventHub.stateSnapshotHandler = h +} + +// StateSnapshot allows collecting current state copy. +// Mostly useful for connection introspection from the outside. +// This API is EXPERIMENTAL and may be changed/removed. +func (c *Client) StateSnapshot() (any, error) { + if c.eventHub.stateSnapshotHandler != nil { + return c.eventHub.stateSnapshotHandler() + } + return nil, nil +} diff --git a/events.go b/events.go index 3d960caa..2562b6b6 100644 --- a/events.go +++ b/events.go @@ -75,6 +75,14 @@ type ConnectReply struct { // PingPongConfig if set, will override Transport's PingPongConfig to enable setting ping/pong interval // for individual client. PingPongConfig *PingPongConfig + + // ClientReady is a channel to be filled once with a Client as soon as it's ready to be used. + // This solves a case when app requires access to a Client in the goroutines started as part + // of the Node.OnConnecting handler. Callers should check connection context for the cancellation + // to handle possible Client disconnection. In usual flow you don't need to provide this channel + // at all. + // This is EXPERIMENTAL and may be removed in the future. + ClientReady chan *Client } // ConnectingHandler called when new client authenticates on server. @@ -192,9 +200,12 @@ type SubscribeReply struct { // SubRefresh handler will be used. ClientSideRefresh bool - // ReplyWritten channel if provided will be closed as soon as Centrifuge written - // subscribe reply to the connection. - ReplyWritten chan struct{} + // SubscriptionReady channel if provided will be closed as soon as Centrifuge + // written subscribe reply to the connection, so it's possible to start writing + // publications into a channel using experimental Client.WritePublication method. + // In usual flow you don't need to provide this channel at all. + // This is EXPERIMENTAL and may be removed in the future. + SubscriptionReady chan struct{} } // SubscribeHandler called when client wants to subscribe on channel. diff --git a/hub.go b/hub.go index 1fda1232..7759b1d9 100644 --- a/hub.go +++ b/hub.go @@ -546,11 +546,11 @@ func (h *subShard) broadcastPublication(channel string, pub *protocol.Publicatio } var ( - jsonReplyV2 []byte - protobufReplyV2 []byte + jsonReply []byte + protobufReply []byte - jsonPushV2 []byte - protobufPushV2 []byte + jsonPush []byte + protobufPush []byte jsonEncodeErr *encodeError ) @@ -563,51 +563,51 @@ func (h *subShard) broadcastPublication(channel string, pub *protocol.Publicatio continue } if c.transport.Unidirectional() { - if jsonPushV2 == nil { + if jsonPush == nil { push := &protocol.Push{Channel: channel, Pub: pub} var err error - jsonPushV2, err = protocol.DefaultJsonPushEncoder.Encode(push) + jsonPush, err = protocol.DefaultJsonPushEncoder.Encode(push) if err != nil { jsonEncodeErr = &encodeError{client: c.ID(), user: c.UserID(), error: err} go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c) continue } } - _ = c.writePublication(channel, pub, jsonPushV2, sp) + _ = c.writePublication(channel, pub, jsonPush, sp) } else { - if jsonReplyV2 == nil { + if jsonReply == nil { push := &protocol.Push{Channel: channel, Pub: pub} var err error - jsonReplyV2, err = protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push}) + jsonReply, err = protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push}) if err != nil { jsonEncodeErr = &encodeError{client: c.ID(), user: c.UserID(), error: err} go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c) continue } } - _ = c.writePublication(channel, pub, jsonReplyV2, sp) + _ = c.writePublication(channel, pub, jsonReply, sp) } } else if protoType == protocol.TypeProtobuf { if c.transport.Unidirectional() { - if protobufPushV2 == nil { + if protobufPush == nil { push := &protocol.Push{Channel: channel, Pub: pub} var err error - protobufPushV2, err = protocol.DefaultProtobufPushEncoder.Encode(push) + protobufPush, err = protocol.DefaultProtobufPushEncoder.Encode(push) if err != nil { return err } } - _ = c.writePublication(channel, pub, protobufPushV2, sp) + _ = c.writePublication(channel, pub, protobufPush, sp) } else { - if protobufReplyV2 == nil { + if protobufReply == nil { push := &protocol.Push{Channel: channel, Pub: pub} var err error - protobufReplyV2, err = protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push}) + protobufReply, err = protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push}) if err != nil { return err } } - _ = c.writePublication(channel, pub, protobufReplyV2, sp) + _ = c.writePublication(channel, pub, protobufReply, sp) } } } @@ -634,11 +634,11 @@ func (h *subShard) broadcastJoin(channel string, join *protocol.Join) error { } var ( - jsonReplyV2 []byte - protobufReplyV2 []byte + jsonReply []byte + protobufReply []byte - jsonPushV2 []byte - protobufPushV2 []byte + jsonPush []byte + protobufPush []byte jsonEncodeErr *encodeError ) @@ -651,51 +651,51 @@ func (h *subShard) broadcastJoin(channel string, join *protocol.Join) error { continue } if c.transport.Unidirectional() { - if jsonPushV2 == nil { + if jsonPush == nil { push := &protocol.Push{Channel: channel, Join: join} var err error - jsonPushV2, err = protocol.DefaultJsonPushEncoder.Encode(push) + jsonPush, err = protocol.DefaultJsonPushEncoder.Encode(push) if err != nil { jsonEncodeErr = &encodeError{client: c.ID(), user: c.UserID(), error: err} go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c) continue } } - _ = c.writeJoin(channel, join, jsonPushV2) + _ = c.writeJoin(channel, join, jsonPush) } else { - if jsonReplyV2 == nil { + if jsonReply == nil { push := &protocol.Push{Channel: channel, Join: join} var err error - jsonReplyV2, err = protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push}) + jsonReply, err = protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push}) if err != nil { jsonEncodeErr = &encodeError{client: c.ID(), user: c.UserID(), error: err} go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c) continue } } - _ = c.writeJoin(channel, join, jsonReplyV2) + _ = c.writeJoin(channel, join, jsonReply) } } else if protoType == protocol.TypeProtobuf { if c.transport.Unidirectional() { - if protobufPushV2 == nil { + if protobufPush == nil { push := &protocol.Push{Channel: channel, Join: join} var err error - protobufPushV2, err = protocol.DefaultProtobufPushEncoder.Encode(push) + protobufPush, err = protocol.DefaultProtobufPushEncoder.Encode(push) if err != nil { return err } } - _ = c.writeJoin(channel, join, protobufPushV2) + _ = c.writeJoin(channel, join, protobufPush) } else { - if protobufReplyV2 == nil { + if protobufReply == nil { push := &protocol.Push{Channel: channel, Join: join} var err error - protobufReplyV2, err = protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push}) + protobufReply, err = protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push}) if err != nil { return err } } - _ = c.writeJoin(channel, join, protobufReplyV2) + _ = c.writeJoin(channel, join, protobufReply) } } } @@ -722,11 +722,11 @@ func (h *subShard) broadcastLeave(channel string, leave *protocol.Leave) error { } var ( - jsonReplyV2 []byte - protobufReplyV2 []byte + jsonReply []byte + protobufReply []byte - jsonPushV2 []byte - protobufPushV2 []byte + jsonPush []byte + protobufPush []byte jsonEncodeErr *encodeError ) @@ -739,51 +739,51 @@ func (h *subShard) broadcastLeave(channel string, leave *protocol.Leave) error { continue } if c.transport.Unidirectional() { - if jsonPushV2 == nil { + if jsonPush == nil { push := &protocol.Push{Channel: channel, Leave: leave} var err error - jsonPushV2, err = protocol.DefaultJsonPushEncoder.Encode(push) + jsonPush, err = protocol.DefaultJsonPushEncoder.Encode(push) if err != nil { jsonEncodeErr = &encodeError{client: c.ID(), user: c.UserID(), error: err} go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c) continue } } - _ = c.writeLeave(channel, leave, jsonPushV2) + _ = c.writeLeave(channel, leave, jsonPush) } else { - if jsonReplyV2 == nil { + if jsonReply == nil { push := &protocol.Push{Channel: channel, Leave: leave} var err error - jsonReplyV2, err = protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push}) + jsonReply, err = protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push}) if err != nil { jsonEncodeErr = &encodeError{client: c.ID(), user: c.UserID(), error: err} go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c) continue } } - _ = c.writeLeave(channel, leave, jsonReplyV2) + _ = c.writeLeave(channel, leave, jsonReply) } } else if protoType == protocol.TypeProtobuf { if c.transport.Unidirectional() { - if protobufPushV2 == nil { + if protobufPush == nil { push := &protocol.Push{Channel: channel, Leave: leave} var err error - protobufPushV2, err = protocol.DefaultProtobufPushEncoder.Encode(push) + protobufPush, err = protocol.DefaultProtobufPushEncoder.Encode(push) if err != nil { return err } } - _ = c.writeLeave(channel, leave, protobufPushV2) + _ = c.writeLeave(channel, leave, protobufPush) } else { - if protobufReplyV2 == nil { + if protobufReply == nil { push := &protocol.Push{Channel: channel, Leave: leave} var err error - protobufReplyV2, err = protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push}) + protobufReply, err = protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push}) if err != nil { return err } } - _ = c.writeLeave(channel, leave, protobufReplyV2) + _ = c.writeLeave(channel, leave, protobufReply) } } }