Skip to content

Commit

Permalink
Merge pull request #119 from soygul/queue-interface
Browse files Browse the repository at this point in the history
Queue is now an interface
  • Loading branch information
soygul authored Sep 16, 2016
2 parents 771367e + 0ced2a0 commit 94dbed0
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 183 deletions.
12 changes: 6 additions & 6 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

120 changes: 120 additions & 0 deletions data/inmem/inmemqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package inmem

import (
"sync"

"github.com/neptulon/neptulon"
"github.com/titan-x/titan/data"
)

// Queue is a message queue for queueing and sending messages to users.
type Queue struct {
senderFunc SenderFunc // sender function to send and receive messages through
conns map[string]string // user ID -> conn ID
reqChans map[string]queueChan // user ID -> queueProcessor
mutex sync.Mutex
}

// NewQueue creates a new queue object.
func NewQueue(senderFunc SenderFunc) *Queue {
q := Queue{
senderFunc: senderFunc,
conns: make(map[string]string),
reqChans: make(map[string]queueChan),
}

return &q
}

// SenderFunc is a function for sending messages over connections ID.
type SenderFunc func(connID string, method string, params interface{}, resHandler func(ctx *neptulon.ResCtx) error) (reqID string, err error)

type queuedReq struct {
Method string
Params interface{}
ResHandler func(ctx *neptulon.ResCtx) error
}

type queueChan struct {
req chan queuedReq
quit chan bool
}

// Middleware registers a queue middleware to register user/connection IDs
// for connecting users (upon their first incoming-message).
func (q *Queue) Middleware(ctx *neptulon.ReqCtx) error {
q.mutex.Lock()
userID := ctx.Conn.Session.Get("userid").(string)
connID := ctx.Conn.ID

// start queue gorutine only once per connection
if _, ok := q.conns[userID]; !ok {
q.conns[userID] = connID
data.UserCount.Add(1)
go q.processQueue(userID, connID)
}
q.mutex.Unlock()

return ctx.Next()
}

// RemoveConn removes a user's associated connection ID.
func (q *Queue) RemoveConn(userID string) {
q.mutex.Lock()
defer q.mutex.Unlock()

q.getQueueChan(userID).quit <- true
delete(q.conns, userID)
data.UserCount.Add(-1)
}

// this is not thread safe and must be used inside a mutex lock
func (q *Queue) getQueueChan(userID string) queueChan {
c, ok := q.reqChans[userID]
if !ok {
c = queueChan{req: make(chan queuedReq, 5000), quit: make(chan bool)}
q.reqChans[userID] = c
}
return c
}

// AddRequest queues a request message to be sent to the given user.
func (q *Queue) AddRequest(userID string, method string, params interface{}, resHandler func(ctx *neptulon.ResCtx) error) error {
q.mutex.Lock()
defer q.mutex.Unlock()

data.QueueLength.Add(1)
q.getQueueChan(userID).req <- queuedReq{Method: method, Params: params, ResHandler: resHandler}

return nil
}

func (q *Queue) processQueue(userID, connID string) {
q.mutex.Lock()
qc := q.getQueueChan(userID)
q.mutex.Unlock()

errc := 0 // protect against infinite retry loop

for {
select {
case req := <-qc.req:
_, err := q.senderFunc(connID, req.Method, req.Params, req.ResHandler)

if err != nil {
errc++
qc.req <- req
if errc > 10 {
return
}
continue
}

data.QueueLength.Add(-1)
errc = 0

case <-qc.quit:
return
}
}
}
23 changes: 16 additions & 7 deletions data/queue.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package data

// var queueLength = expvar.NewInt("queue-length")
// var conns = expvar.NewInt("conns")
import (
"expvar"

"github.com/neptulon/neptulon"
)

// Queue is a message queue for queueing and sending messages to users.
type Queue interface {
// todo: buffered channels or basic locks or a concurrent multimap?
// todo: at-least-once delivery relaxes things a bit for queueProcessor
//
// actually queue should not be interacted with directly, just like DB, it should be an interface
// and server.send(userID) should use it automatically behind the scenes
Middleware(ctx *neptulon.ReqCtx) error
RemoveConn(userID string)
AddRequest(userID string, method string, params interface{}, resHandler func(ctx *neptulon.ResCtx) error) error
}

// QueueLength is the total request queue for all users combined.
// This should be handled by the implementing struct.
var QueueLength = expvar.NewInt("queue-length")

// UserCount is the total authenticated live user count.
// This should be handled by the implementing struct.
var UserCount = expvar.NewInt("users")
116 changes: 0 additions & 116 deletions queue.go

This file was deleted.

17 changes: 10 additions & 7 deletions route_priv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,30 @@ import (
"github.com/neptulon/neptulon"
"github.com/neptulon/neptulon/middleware"
"github.com/titan-x/titan/client"
"github.com/titan-x/titan/data"
"github.com/titan-x/titan/models"
)

func initPrivRoutes(r *middleware.Router, q *Queue) {
r.Request("auth.jwt", initJWTAuthHandler(q))
// We need *data.Queue (pointer to interface) so that the closure below won't capture the actual value that pointer points to
// so we can swap queues whenever we want using Server.SetQueue(...)
func initPrivRoutes(r *middleware.Router, q *data.Queue) {
r.Request("auth.jwt", initJWTAuthHandler())
r.Request("echo", middleware.Echo)
r.Request("msg.send", initSendMsgHandler(q))
}

// Used for a client to authenticate and announce its presence.
// If there are any messages meant for this user, they are started to be sent after this call.
func initJWTAuthHandler(q *Queue) func(ctx *neptulon.ReqCtx) error {
func initJWTAuthHandler() func(ctx *neptulon.ReqCtx) error {
return func(ctx *neptulon.ReqCtx) error {
q.SetConn(ctx.Conn.Session.Get("userid").(string), ctx.Conn.ID)
ctx.Res = client.ACK // todo: this could rather send the remaining queue size for the client
// todo: this could rather send the remaining queue size for the client so client can disconnect if there is nothing else to do
ctx.Res = client.ACK
return ctx.Next()
}
}

// Allows clients to send messages to each other, online or offline.
func initSendMsgHandler(q *Queue) func(ctx *neptulon.ReqCtx) error {
func initSendMsgHandler(q *data.Queue) func(ctx *neptulon.ReqCtx) error {
return func(ctx *neptulon.ReqCtx) error {
var sMsgs []models.Message
if err := ctx.Params(&sMsgs); err != nil {
Expand All @@ -47,7 +50,7 @@ func initSendMsgHandler(q *Queue) func(ctx *neptulon.ReqCtx) error {
}

// submit the messages to send queue
err := q.AddRequest(to, "msg.recv", []models.Message{models.Message{From: from, Message: sMsg.Message}}, func(ctx *neptulon.ResCtx) error {
err := (*q).AddRequest(to, "msg.recv", []models.Message{models.Message{From: from, Message: sMsg.Message}}, func(ctx *neptulon.ResCtx) error {
var res string
ctx.Result(&res)
if res == client.ACK {
Expand Down
Loading

0 comments on commit 94dbed0

Please sign in to comment.