Skip to content

Commit

Permalink
comments, layout fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jul 20, 2023
1 parent 4b374cc commit 11f3bb5
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 161 deletions.
1 change: 1 addition & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ coverage:
patch: off
ignore:
- "internal/controlpb/control*" # generated code
- "client_experimental.go" # experimental code
128 changes: 18 additions & 110 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ func (c *Client) OnHistory(h HistoryHandler) {
c.eventHub.historyHandler = h
}

// OnStateSnapshot allows settings StateSnapshotHandler.
// This API is EXPERIMENTAL and may be removed in the future versions.
func (c *Client) OnStateSnapshot(h StateSnapshotHandler) {
c.eventHub.stateSnapshotHandler = h
}

const (
// flagSubscribed will be set upon successful Subscription to a channel.
// Until that moment channel exists in client Channels map only to track
Expand Down Expand Up @@ -789,36 +783,6 @@ func (c *Client) Info() []byte {
return info
}

// AcquireStorage returns an attached connection storage (a map) and a function to be
// called when the application finished working with the storage map. Be accurate when
// using this API – avoid acquiring storage for a long time - i.e. on the time of IO operations.
// Do the work fast and release with the updated map. The API designed this way to allow
// reading, modifying or fully overriding storage map and avoid making deep copies each time.
// Note, that if storage map has not been initialized yet - i.e. if it's nil - then it will
// be initialized to an empty map and then returned – so you never receive nil map when
// acquiring. The purpose of this map is to simplify handling user-defined state during the
// lifetime of connection. Try to keep this map reasonably small.
// This API is EXPERIMENTAL.
func (c *Client) AcquireStorage() (map[string]any, func(map[string]any)) {
c.storageMu.Lock()
if c.storage == nil {
c.storage = map[string]any{}
}
return c.storage, func(updatedStorage map[string]any) {
c.storage = updatedStorage
c.storageMu.Unlock()
}
}

// StateSnapshot allows collecting current state copy.
// Mostly useful for connection introspection from the outside.
func (c *Client) StateSnapshot() (any, error) {
if c.eventHub.stateSnapshotHandler != nil {
return c.eventHub.stateSnapshotHandler()
}
return nil, nil
}

// Transport returns client connection transport information.
func (c *Client) Transport() TransportInfo {
return c.transport
Expand Down Expand Up @@ -1119,17 +1083,6 @@ func (c *Client) HandleCommand(cmd *protocol.Command, cmdProtocolSize int) bool
return proceed
}

// dispatchCommand dispatches Command into correct command handler.
func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnect, bool) {
c.mu.Lock()
if c.status == statusClosed {
c.mu.Unlock()
return nil, false
}
c.mu.Unlock()
return c.dispatchCommandV2(cmd, cmdSize)
}

// isPong is a helper method to check whether the command from the client
// is a pong to server ping. It's actually an empty command.
func isPong(cmd *protocol.Command) bool {
Expand Down Expand Up @@ -1179,7 +1132,13 @@ func (c *Client) handleCommandDispatchError(cmd *protocol.Command, method comman
}
}

func (c *Client) dispatchCommandV2(cmd *protocol.Command, cmdSize int) (*Disconnect, bool) {
func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnect, bool) {
c.mu.Lock()
if c.status == statusClosed {
c.mu.Unlock()
return nil, false
}
c.mu.Unlock()
isConnect := cmd.Connect != nil
if !c.authenticated && !isConnect {
return &DisconnectBadRequest, false
Expand Down Expand Up @@ -1635,8 +1594,8 @@ func (c *Client) handleSubscribe(req *protocol.SubscribeRequest, cmd *protocol.C
}

cb := func(reply SubscribeReply, err error) {
if reply.ReplyWritten != nil {
defer close(reply.ReplyWritten)
if reply.SubscriptionReady != nil {
defer close(reply.SubscriptionReady)
}

if err != nil {
Expand Down Expand Up @@ -2243,6 +2202,15 @@ func (c *Client) connectCmd(req *protocol.ConnectRequest, cmd *protocol.Command,
subscriptions[ch] = opts
}
}
if reply.ClientReady != nil {
defer func() {
select {
case <-c.ctx.Done():
default:
reply.ClientReady <- c
}
}()
}
} else {
c.startWriter(0, 0, 0)
c.pingInterval, c.pongTimeout = getPingPongPeriodValues(c.transport.PingPongConfig())
Expand Down Expand Up @@ -2999,66 +2967,6 @@ func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publica
return c.transportEnqueue(data)
}

func (c *Client) WritePublication(channel string, publication *Publication, sp StreamPosition) error {
if !c.IsSubscribed(channel) {
return nil
}

pub := pubToProto(publication)

var (
jsonReplyV2 []byte
protobufReplyV2 []byte

jsonPushV2 []byte
protobufPushV2 []byte
)

protoType := c.transport.Protocol().toProto()

if protoType == protocol.TypeJSON {
if c.transport.Unidirectional() {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
jsonPushV2, err = protocol.DefaultJsonPushEncoder.Encode(push)
if err != nil {
go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c)
return err
}
return c.writePublication(channel, pub, jsonPushV2, sp)
} else {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
jsonReplyV2, err = protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push})
if err != nil {
go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c)
return err
}
return c.writePublication(channel, pub, jsonReplyV2, sp)
}
} else if protoType == protocol.TypeProtobuf {
if c.transport.Unidirectional() {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
protobufPushV2, err = protocol.DefaultProtobufPushEncoder.Encode(push)
if err != nil {
return err
}
return c.writePublication(channel, pub, protobufPushV2, sp)
} else {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
protobufReplyV2, err = protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push})
if err != nil {
return err
}
return c.writePublication(channel, pub, protobufReplyV2, sp)
}
}

return nil
}

func (c *Client) writePublication(ch string, pub *protocol.Publication, data []byte, sp StreamPosition) error {
if c.node.LogEnabled(LogLevelTrace) {
c.traceOutPush(&protocol.Push{Channel: ch, Pub: pub})
Expand Down
101 changes: 101 additions & 0 deletions client_experimental.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package centrifuge

import (
"errors"

"github.com/centrifugal/protocol"
)

var errNoSubscription = errors.New("no subscription to a channel")

// WritePublication allows sending publications to Client subscription directly
// without HUB and Broker semantics. The possible use case is to turn subscription
// to a channel into an individual data stream.
// This API is EXPERIMENTAL and may be changed/removed.
func (c *Client) WritePublication(channel string, publication *Publication, sp StreamPosition) error {
if !c.IsSubscribed(channel) {
return errNoSubscription
}

pub := pubToProto(publication)
protoType := c.transport.Protocol().toProto()

if protoType == protocol.TypeJSON {
if c.transport.Unidirectional() {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
jsonPush, err := protocol.DefaultJsonPushEncoder.Encode(push)
if err != nil {
go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c)
return err
}
return c.writePublication(channel, pub, jsonPush, sp)
} else {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
jsonReply, err := protocol.DefaultJsonReplyEncoder.Encode(&protocol.Reply{Push: push})
if err != nil {
go func(c *Client) { c.Disconnect(DisconnectInappropriateProtocol) }(c)
return err
}
return c.writePublication(channel, pub, jsonReply, sp)
}
} else if protoType == protocol.TypeProtobuf {
if c.transport.Unidirectional() {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
protobufPush, err := protocol.DefaultProtobufPushEncoder.Encode(push)
if err != nil {
return err
}
return c.writePublication(channel, pub, protobufPush, sp)
} else {
push := &protocol.Push{Channel: channel, Pub: pub}
var err error
protobufReply, err := protocol.DefaultProtobufReplyEncoder.Encode(&protocol.Reply{Push: push})
if err != nil {
return err
}
return c.writePublication(channel, pub, protobufReply, sp)
}
}

return errors.New("unknown protocol type")
}

// AcquireStorage returns an attached connection storage (a map) and a function to be
// called when the application finished working with the storage map. Be accurate when
// using this API – avoid acquiring storage for a long time - i.e. on the time of IO operations.
// Do the work fast and release with the updated map. The API designed this way to allow
// reading, modifying or fully overriding storage map and avoid making deep copies each time.
// Note, that if storage map has not been initialized yet - i.e. if it's nil - then it will
// be initialized to an empty map and then returned – so you never receive nil map when
// acquiring. The purpose of this map is to simplify handling user-defined state during the
// lifetime of connection. Try to keep this map reasonably small.
// This API is EXPERIMENTAL and may be changed/removed.
func (c *Client) AcquireStorage() (map[string]any, func(map[string]any)) {
c.storageMu.Lock()
if c.storage == nil {
c.storage = map[string]any{}
}
return c.storage, func(updatedStorage map[string]any) {
c.storage = updatedStorage
c.storageMu.Unlock()
}
}

// OnStateSnapshot allows settings StateSnapshotHandler.
// This API is EXPERIMENTAL and may be changed/removed.
func (c *Client) OnStateSnapshot(h StateSnapshotHandler) {
c.eventHub.stateSnapshotHandler = h
}

// StateSnapshot allows collecting current state copy.
// Mostly useful for connection introspection from the outside.
// This API is EXPERIMENTAL and may be changed/removed.
func (c *Client) StateSnapshot() (any, error) {
if c.eventHub.stateSnapshotHandler != nil {
return c.eventHub.stateSnapshotHandler()
}
return nil, nil
}
17 changes: 14 additions & 3 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ type ConnectReply struct {
// PingPongConfig if set, will override Transport's PingPongConfig to enable setting ping/pong interval
// for individual client.
PingPongConfig *PingPongConfig

// ClientReady is a channel to be filled once with a Client as soon as it's ready to be used.
// This solves a case when app requires access to a Client in the goroutines started as part
// of the Node.OnConnecting handler. Callers should check connection context for the cancellation
// to handle possible Client disconnection. In usual flow you don't need to provide this channel
// at all.
// This is EXPERIMENTAL and may be removed in the future.
ClientReady chan *Client
}

// ConnectingHandler called when new client authenticates on server.
Expand Down Expand Up @@ -192,9 +200,12 @@ type SubscribeReply struct {
// SubRefresh handler will be used.
ClientSideRefresh bool

// ReplyWritten channel if provided will be closed as soon as Centrifuge written
// subscribe reply to the connection.
ReplyWritten chan struct{}
// SubscriptionReady channel if provided will be closed as soon as Centrifuge
// written subscribe reply to the connection, so it's possible to start writing
// publications into a channel using experimental Client.WritePublication method.
// In usual flow you don't need to provide this channel at all.
// This is EXPERIMENTAL and may be removed in the future.
SubscriptionReady chan struct{}
}

// SubscribeHandler called when client wants to subscribe on channel.
Expand Down
Loading

0 comments on commit 11f3bb5

Please sign in to comment.