Skip to content

Commit

Permalink
Refactor KVDB, Update P2P Mock, and Streamline Tests and Hoarder Stre…
Browse files Browse the repository at this point in the history
…aming

This commit introduces a significant refactor of the KVDB package, implements a new P2P mock, and addresses various issues in tests and hoarder streaming routes. The changes include:

- A new PubSubBroadcaster structure that utilizes libp2p PubSub for broadcasting.
- A mechanism for registering and unregistering topics within the PubSub system.
- Enhanced synchronization with the addition of mutex locks to prevent race conditions.
- Improved error handling and subscription logic to ensure reliable message broadcasting and reception.

These updates aim to optimize the underlying data distribution mechanisms, improve the robustness of the testing framework, and ensure that streaming routes are defined more consistently.
  • Loading branch information
samyfodil authored Nov 8, 2023
1 parent f254414 commit 1c3b1f4
Show file tree
Hide file tree
Showing 20 changed files with 952 additions and 69 deletions.
14 changes: 7 additions & 7 deletions cli/app/start_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ func startCommand() *cli.Command {
return fmt.Errorf("parsing config failed with: %s", err)
}

// Migration Start
// Migration Start (from odo to tau)
// TODO: Delete this after a few releases
if _, err := os.Stat(fmt.Sprintf("/tb/storage/databases/%s", shape)); !os.IsNotExist(err) {
err = migrateDatabase(ctx.Context, shape, len(protocolConfig.Protocols) == 0)
if err != nil {
return fmt.Errorf("migrating shape %s failed with: %w", shape, err)
}
}

cmd := exec.Command("sudo", "systemctl", "stop", fmt.Sprintf("odo@%s.service", shape))
cmd.CombinedOutput()
cmd := exec.Command("sudo", "systemctl", "stop", fmt.Sprintf("odo@%s.service", shape))
cmd.CombinedOutput()

cmd = exec.Command("sudo", "systemctl", "disable", fmt.Sprintf("odo@%s.service", shape))
cmd.CombinedOutput()
// Migration End
cmd = exec.Command("sudo", "systemctl", "disable", fmt.Sprintf("odo@%s.service", shape))
cmd.CombinedOutput()
}

setNetworkDomains(sourceConfig)
return node.Start(ctx.Context, protocolConfig)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ require (
github.com/taubyte/go-simple-git v0.2.5
github.com/taubyte/go-specs v0.10.8-0.20230912140105-e8d804edc77c
github.com/taubyte/http v0.10.5
github.com/taubyte/p2p v0.10.1-0.20230919152907-f26fd82a39d3
github.com/taubyte/p2p v0.11.0
github.com/taubyte/utils v0.1.7
github.com/taubyte/vm v1.0.3
github.com/taubyte/vm v1.0.4
github.com/taubyte/vm-core-plugins v0.3.4
github.com/taubyte/vm-orbit v1.0.0
github.com/urfave/cli/v2 v2.25.7
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -906,12 +906,12 @@ github.com/taubyte/go-specs v0.10.8-0.20230912140105-e8d804edc77c/go.mod h1:lhCi
github.com/taubyte/http v0.10.5 h1:0dpSYi1XvryxO7U1Df5cgh+6LWKtaMypILQjgfJdExw=
github.com/taubyte/http v0.10.5/go.mod h1:hSJYC7/yqRkrsZYe1Q0c194GKkVD0KGWDzw0sC+SAiU=
github.com/taubyte/odo v0.0.0-20230726203810-bdae9f37e8f8 h1:+r57pU7UW1DZkBgOUsBh0zmRSg77sz+bJ3DaybLR3z0=
github.com/taubyte/p2p v0.10.1-0.20230919152907-f26fd82a39d3 h1:NbgrtVT1D0IOnIidXWIA8JN7SykD5QRT20Ak+YTFflc=
github.com/taubyte/p2p v0.10.1-0.20230919152907-f26fd82a39d3/go.mod h1:Uo464wZDiMeuP7GDHvjUjwGbnm72ByuGofk2O2m58yI=
github.com/taubyte/p2p v0.11.0 h1:gtrCZk6/AqIB62UuRO3iz1mcqmqUc0/AoOmISGHyrB8=
github.com/taubyte/p2p v0.11.0/go.mod h1:Uo464wZDiMeuP7GDHvjUjwGbnm72ByuGofk2O2m58yI=
github.com/taubyte/utils v0.1.7 h1:iFMOxhBNJKdUrD+Z1UjP9W5559LlQN8178V54uLngdk=
github.com/taubyte/utils v0.1.7/go.mod h1:MWO21qZu7AcBwJkJLsltwb3zWyN84xSexLCU28JC4gM=
github.com/taubyte/vm v1.0.3 h1:Z9IYHdHiWVh0/v1+dvWblGe6Ny+ni3t4yr8GIgfhavg=
github.com/taubyte/vm v1.0.3/go.mod h1:5pRQKirHakEXSdhCPtkbom1VA2z64JO01ce1F4JibBM=
github.com/taubyte/vm v1.0.4 h1:yHM61nU1GYwt+gfqXMvu+AkmA9tr7CuPYja4TJKeY7E=
github.com/taubyte/vm v1.0.4/go.mod h1:WXBnk6d4WaOvT5g05Co66LDXe+gt8SFfO9nEq1srdQo=
github.com/taubyte/vm-core-plugins v0.3.4 h1:NPYZfcL6JTS+YsQ5TYn/fO657L0Sqqk8vte7GBEtlqA=
github.com/taubyte/vm-core-plugins v0.3.4/go.mod h1:jQBHrYYVvHjNP0fy0Sx3KsFQ4JMh8f0cmbSE6OXr+j8=
github.com/taubyte/vm-orbit v1.0.0 h1:oSzlnGUTJWItGoz9WTS6nW68sh0zcllOOQkLtbc0LkU=
Expand Down
143 changes: 143 additions & 0 deletions pkgs/kvdb/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package kvdb

import (
"context"
"strings"
"sync"

crdt "github.com/ipfs/go-ds-crdt"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

// PubSubBroadcaster implements a Broadcaster using libp2p PubSub.
type PubSubBroadcaster struct {
lock sync.Mutex
ctx context.Context
psub *pubsub.PubSub
topic *pubsub.Topic
subs *pubsub.Subscription
}

var (
broadcasters = make(map[string]*PubSubBroadcaster)
broadcastersLock sync.Mutex
)

func registerTopic(topic string, b *PubSubBroadcaster) {
broadcastersLock.Lock()
defer broadcastersLock.Unlock()
broadcasters[topic] = b
}

func unregisterTopic(topic string) {
broadcastersLock.Lock()
defer broadcastersLock.Unlock()
delete(broadcasters, topic)
}

func getTopic(topic string) *PubSubBroadcaster {
broadcastersLock.Lock()
defer broadcastersLock.Unlock()
return broadcasters[topic]
}

// NewPubSubBroadcaster returns a new broadcaster using the given PubSub and
// a topic to subscribe/broadcast to. The given context can be used to cancel
// the broadcaster.
// Please register any topic validators before creating the Broadcaster.
//
// The broadcaster can be shut down by cancelling the given context.
// This must be done before Closing the crdt.Datastore, otherwise things
// may hang.
func NewPubSubBroadcaster(ctx context.Context, psub *pubsub.PubSub, topic string) (b *PubSubBroadcaster, err error) {
if b = getTopic(topic); b != nil {
return b, nil
}

b = &PubSubBroadcaster{
ctx: ctx,
psub: psub,
}

b.topic, err = psub.Join(topic)
if err != nil {
return nil, err
}

registerTopic(topic, b)

if err = b.ensureSubscribed(); err != nil {
return nil, err
}

go func() {
<-b.ctx.Done()
b.lock.Lock()
defer b.lock.Unlock()
if b.subs != nil {
b.subs.Cancel()
}
b.topic.Close()
unregisterTopic(topic)
}()

return
}

func (pbc *PubSubBroadcaster) ensureSubscribed() (err error) {
pbc.lock.Lock()
defer pbc.lock.Unlock()
pbc.subs, err = pbc.topic.Subscribe()
return
}

// Broadcast publishes some data.
func (pbc *PubSubBroadcaster) Broadcast(data []byte) error {
return pbc.topic.Publish(pbc.ctx, data)
}

// Next returns published data.
func (pbc *PubSubBroadcaster) Next() ([]byte, error) {
for try := 3; try > 0; try-- {
msg, err := pbc.next()
if err != crdt.ErrNoMoreBroadcast {
return msg, err
}

// try again
if pbc.ensureSubscribed() != nil {
break
}
}

return nil, crdt.ErrNoMoreBroadcast
}

func (pbc *PubSubBroadcaster) next() ([]byte, error) {
var msg *pubsub.Message
var err error

select {
case <-pbc.ctx.Done():
return nil, crdt.ErrNoMoreBroadcast
default:
}

pbc.lock.Lock()
defer pbc.lock.Unlock()

if pbc.subs == nil {
return nil, crdt.ErrNoMoreBroadcast
}

msg, err = pbc.subs.Next(pbc.ctx)
if err != nil {
if strings.Contains(err.Error(), "subscription cancelled") ||
strings.Contains(err.Error(), "context") {
return nil, crdt.ErrNoMoreBroadcast
}
return nil, err
}

return msg.GetData(), nil
}
Loading

0 comments on commit 1c3b1f4

Please sign in to comment.