Skip to content

Commit

Permalink
refactor: router supports a more extensive set of rules. (#697)
Browse files Browse the repository at this point in the history
# Description

Refactor `Router` interface, the new interface is:

```go
// Router routes data that is written by source/sfn according to parameters be passed.
// Users should define their own rules that tells zipper how to route data and how to store the rules.
type Router interface {
	// Add adds the route rule to the router.
	Add(connID string, observeDataTags []uint32, md metadata.M) error
	// Route gets the ID list of connections from the router.
	Route(dataTag uint32, md metadata.M) (connIDs []string)
	// Remove removes the route rule from the router.
	Remove(connID string)
	// Release release the router and removes all the route rules.
	Release()
}
```
  • Loading branch information
woorui authored Jan 8, 2024
1 parent c650727 commit 048dd08
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 190 deletions.
4 changes: 2 additions & 2 deletions core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,15 @@ type hookTester struct {
}

func (a *hookTester) connMiddleware(next ConnHandler) ConnHandler {
return func(c *Connection, r router.Route) {
return func(c *Connection) {
a.mu.Lock()
if a.connNames == nil {
a.connNames = make([]string, 0)
}
a.connNames = append(a.connNames, c.Name())
a.mu.Unlock()

next(c, r)
next(c)

a.mu.Lock()
assert.Contains(a.t, a.connNames, c.Name())
Expand Down
7 changes: 1 addition & 6 deletions core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
"github.com/yomorun/yomo/core/router"
"golang.org/x/exp/slog"
)

Expand All @@ -22,8 +21,6 @@ type Context struct {
Frame *frame.DataFrame
// FrameMetadata is the merged metadata from the frame.
FrameMetadata metadata.M
// Route is the route from handshake.
Route router.Route
// mu is used to protect Keys from concurrent read and write operations.
mu sync.RWMutex
// Keys stores the key/value pairs in context, It is Lazy initialized.
Expand Down Expand Up @@ -88,7 +85,7 @@ func (c *Context) Value(key any) any {
// The YoMo context is used to manage the lifecycle of a connection and provides a way to pass data and metadata
// between connection processing functions. The lifecycle of the context is equal to the lifecycle of the connection
// that it is associated with. The context can be used to manage timeouts, cancellations, and other aspects of connection processing.
func newContext(conn *Connection, route router.Route, df *frame.DataFrame) (c *Context, err error) {
func newContext(conn *Connection, df *frame.DataFrame) (c *Context, err error) {
fmd, err := metadata.Decode(df.Metadata)
if err != nil {
return nil, err
Expand All @@ -111,7 +108,6 @@ func newContext(conn *Connection, route router.Route, df *frame.DataFrame) (c *C
c.FrameMetadata = fmd

c.Connection = conn
c.Route = route

// log with tid
c.Logger = c.Connection.Logger.With("tid", GetTIDFromMetadata(fmd))
Expand Down Expand Up @@ -140,7 +136,6 @@ func (c *Context) Release() {

func (c *Context) reset() {
c.Connection = nil
c.Route = nil
c.Frame = nil
c.FrameMetadata = nil
c.Logger = nil
Expand Down
91 changes: 0 additions & 91 deletions core/router/default.go

This file was deleted.

40 changes: 0 additions & 40 deletions core/router/default_test.go

This file was deleted.

89 changes: 75 additions & 14 deletions core/router/router.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,86 @@
// Package router defines the interface of router and route.
// Package router defines the interface of router.
package router

import (
"sync"

"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
)

// Router is the interface to manage the routes for applications.
// Router routes data that is written by source/sfn according to parameters be passed.
// Users should define their own rules that tells zipper how to route data and how to store the rules.
type Router interface {
// Route gets the route
Route(metadata metadata.M) Route
// Clean the routes.
Clean()
// Add adds the route rule to the router.
Add(connID string, observeDataTags []uint32, md metadata.M) error
// Route gets the ID list of connections from the router.
Route(dataTag uint32, md metadata.M) (connIDs []string)
// Remove removes the route rule from the router.
Remove(connID string)
// Release release the router and removes all the route rules.
Release()
}

// Route manages data subscribers according to their observed data tags.
type Route interface {
// Add a route.
Add(connID string, observeDataTags []frame.Tag) error
// Remove a route.
Remove(connID string) error
// GetForwardRoutes returns all the subscribers by the given data tag.
GetForwardRoutes(tag frame.Tag) (connIDs []string)
type defaultRouter struct {
// mu protects data.
mu sync.RWMutex

// data stores tag and connID connection.
// The key is frame tag, The value is connID connection.
data map[frame.Tag]map[string]struct{}
}

// DefaultRouter provides a default implementation of `router`,
// It routes data according to observed tag or connID.
func Default() *defaultRouter {
return &defaultRouter{
data: make(map[frame.Tag]map[string]struct{}),
}
}

func (r *defaultRouter) Add(connID string, ObserveDataTags []uint32, md metadata.M) error {
r.mu.Lock()
defer r.mu.Unlock()

for _, tag := range ObserveDataTags {
conns := r.data[tag]
if conns == nil {
conns = map[string]struct{}{}
r.data[tag] = conns
}
r.data[tag][connID] = struct{}{}
}

return nil
}

func (r *defaultRouter) Route(dataTag uint32, md metadata.M) []string {
r.mu.RLock()
defer r.mu.RUnlock()

var connID []string
if conns, ok := r.data[dataTag]; ok {
for k := range conns {
connID = append(connID, k)
}
}
return connID
}

func (r *defaultRouter) Remove(connID string) {
r.mu.Lock()
defer r.mu.Unlock()

for _, conns := range r.data {
delete(conns, connID)
}
}

func (r *defaultRouter) Release() {
r.mu.Lock()
defer r.mu.Unlock()

for key := range r.data {
delete(r.data, key)
}
}
34 changes: 34 additions & 0 deletions core/router/router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package router

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/yomorun/yomo/core/metadata"
)

func TestRouter(t *testing.T) {
router := Default()

err := router.Add("conn-1", []uint32{1}, metadata.M{})
assert.NoError(t, err)

err = router.Add("conn-2", []uint32{1}, metadata.M{})
assert.NoError(t, err)

err = router.Add("conn-3", []uint32{1}, metadata.M{})
assert.NoError(t, err)

ids := router.Route(1, nil)
assert.ElementsMatch(t, []string{"conn-1", "conn-2", "conn-3"}, ids)

router.Remove("conn-1")

ids = router.Route(1, nil)
assert.ElementsMatch(t, []string{"conn-2", "conn-3"}, ids)

router.Release()

ids = router.Route(1, nil)
assert.Equal(t, []string(nil), ids)
}
Loading

1 comment on commit 048dd08

@vercel
Copy link

@vercel vercel bot commented on 048dd08 Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

yomo – ./

yomo-yomorun.vercel.app
yomo.run
www.yomo.run
yomo.vercel.app
yomo-git-master-yomorun.vercel.app

Please sign in to comment.