Skip to content

Commit

Permalink
redis: reload data pipeline immediately upon reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Aug 29, 2021
1 parent 1167e4b commit db074ca
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
2 changes: 1 addition & 1 deletion _examples/chat_protobuf/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
const input = document.getElementById("input");
const container = document.getElementById('messages');

const centrifuge = new Centrifuge('ws://localhost:8000/connection/websocket?format=protobuf', {
const centrifuge = new Centrifuge('ws://localhost:8000/connection/websocket', {
protocol: 'protobuf' // Note we are setting protobuf protocol here!
});

Expand Down
1 change: 1 addition & 0 deletions broker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ func (b *RedisBroker) runPubSub(s *RedisShard, eventHandler BrokerEventHandler)
case redis.Subscription:
case error:
b.node.Log(NewLogEntry(LogLevelError, "Redis receiver error", map[string]interface{}{"error": n.Error()}))
s.reloadPipeline()
return
}
}
Expand Down
38 changes: 26 additions & 12 deletions redis_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ type redisConnPool interface {
}

type RedisShard struct {
config RedisShardConfig
pool redisConnPool
subCh chan subRequest
pubCh chan pubRequest
dataCh chan *dataRequest
useCluster bool
scriptsMu sync.RWMutex
scripts []*redis.Script
scriptsCh chan struct{}
config RedisShardConfig
pool redisConnPool
subCh chan subRequest
pubCh chan pubRequest
dataCh chan *dataRequest
useCluster bool
scriptsMu sync.RWMutex
scripts []*redis.Script
scriptsCh chan struct{}
reloadPipelineCh chan struct{}
}

func confFromAddress(address string, conf RedisShardConfig) (RedisShardConfig, error) {
Expand Down Expand Up @@ -102,9 +103,10 @@ func NewRedisShard(n *Node, conf RedisShardConfig) (*RedisShard, error) {
}
}
shard := &RedisShard{
config: conf,
scriptsCh: make(chan struct{}, 1),
useCluster: len(conf.ClusterAddresses) > 0,
config: conf,
scriptsCh: make(chan struct{}, 1),
useCluster: len(conf.ClusterAddresses) > 0,
reloadPipelineCh: make(chan struct{}),
}
pool, err := newPool(shard, n, conf)
if err != nil {
Expand Down Expand Up @@ -249,6 +251,16 @@ func (s *RedisShard) registerScripts(scripts ...*redis.Script) {
}
}

// Best effort to process a signal for reloading data pipeline if we know
// that connection should be re-established. If the signal can't be processed
// then pipeline will be automatically reload upon first error from Redis.
func (s *RedisShard) reloadPipeline() {
select {
case s.reloadPipelineCh <- struct{}{}:
default:
}
}

func (s *RedisShard) getDataResponse(r *dataRequest) *dataResponse {
if s.useCluster {
reply, err := s.processClusterDataRequest(r)
Expand Down Expand Up @@ -318,6 +330,8 @@ func (s *RedisShard) runDataPipeline() error {

for {
select {
case <-s.reloadPipelineCh:
return nil
case <-s.scriptsCh:
s.scriptsMu.RLock()
if len(s.scripts) == len(scripts) {
Expand Down

0 comments on commit db074ca

Please sign in to comment.