Skip to content

Commit

Permalink
Broker Touch
Browse files Browse the repository at this point in the history
  • Loading branch information
printfcoder committed Oct 24, 2020
1 parent 18ac4b7 commit de42c4b
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 30 deletions.
99 changes: 70 additions & 29 deletions pubsub/broker.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package pubsub

import (
"bufio"
"context"
"fmt"
"sync"

iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/joincloud/peers-touch-go/codec"
"github.com/joincloud/peers-touch-go/logger"
"github.com/joincloud/peers-touch-go/peer"
peerlib "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
)

Expand All @@ -18,6 +23,7 @@ type Broker interface {
Unsub(topic string) error
Join(ctx context.Context, opts ...ChannelOption) (ch *Channel, err error)
// Connect(id peer.PeerID) (err error)
Touch(ctx context.Context, opts ...TouchOption) (err error)
Codec() codec.Codec
Close() error
}
Expand Down Expand Up @@ -50,12 +56,60 @@ func (e *event) Message() Message {
type broker struct {
codec codec.Codec
coreAPI iface.CoreAPI
host peer.Host
subscribers map[string]Subscriber
muMux sync.RWMutex
exit chan chan error
chans map[string]*Channel
}

func (b *broker) Touch(ctx context.Context, opts ...TouchOption) (err error) {
options := TouchOptions{}
for _, opt := range opts {
opt(&options)
}

defer func() {
if err != nil {
logger.Errorf("touch dest error: %s", err)
}
}()

// todo check more options
if options.DestAddr == "" {
return fmt.Errorf("touch dest node shouldn't be nil")
}

maddr, err := multiaddr.NewMultiaddr(options.DestAddr)
if err != nil {
return fmt.Errorf("touch dest node err: %s", err)
}

info, err := peerlib.AddrInfoFromP2pAddr(maddr)
if err != nil {
return fmt.Errorf("touch get dest node addr info err: %s", err)
}

b.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)

s, err := b.host.NewStream(context.Background(), info.ID, "/chat/1.0.0")
if err != nil {
return fmt.Errorf("touch make new stream err: %s", err)
}

rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))

if options.Writer != nil {
go options.Writer(rw)
}

if options.Reader != nil {
go options.Reader(rw)
}

return nil
}

func (b *broker) Join(ctx context.Context, opts ...ChannelOption) (ch *Channel, err error) {
ch, err = joinChannel(ctx, opts...)
if err != nil {
Expand All @@ -65,7 +119,22 @@ func (b *broker) Join(ctx context.Context, opts ...ChannelOption) (ch *Channel,
return
}

func (b *broker) Init(...BrokerOption) error {
func (b *broker) Init(opts ...BrokerOption) (err error) {
options := BrokerOptions{}
for _, opt := range opts {
opt(&options)
}

defer func() {
if err != nil {
logger.Errorf("init broker error: %s", err)
}
}()

if options.host == nil {
return fmt.Errorf("broker's host shouldn't be nil")
}

b.chans = make(map[string]*Channel)
return nil
}
Expand Down Expand Up @@ -93,34 +162,6 @@ func (b *broker) Pub(ctx context.Context, event Event) (err error) {
return
}

/*func (b *broker) Connect(id peer.PeerID) (err error) {
// Turn the destination into a multiaddr.
maddr, err := multiaddr.NewMultiaddr(*dest)
if err != nil {
log.Fatalln(err)
}
// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(maddr)
if err != nil {
log.Fatalln(err)
}
// Add the destination's peer multiaddress in the peerstore.
// This will be used during connection and stream creation by libp2p.
host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
// Start a stream with the destination.
// Multiaddress of the destination peer is fetched from the peerstore using 'peerId'.
s, err := host.NewStream(context.Background(), info.ID, "/chat/1.0.0")
if err != nil {
panic(err)
}
// Create a buffered stream so that read and writes are non blocking.
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
}*/

func (b *broker) Sub(ctx context.Context, topic string, handler Handler) (Subscriber, error) {
b.muMux.Lock()
defer b.muMux.Unlock()
Expand Down
36 changes: 35 additions & 1 deletion pubsub/options.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package pubsub

import (
"bufio"
"context"

iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/joincloud/peers-touch-go/codec"
"github.com/joincloud/peers-touch-go/peer"
Expand All @@ -12,6 +12,7 @@ import (
type BrokerOptions struct {
coreAPI iface.CoreAPI
codec codec.Codec
host peer.Host
}

type BrokerOption func(o *BrokerOptions)
Expand All @@ -28,6 +29,12 @@ func BrokerCodec(codec codec.Codec) BrokerOption {
}
}

func BrokerHost(host peer.Host) BrokerOption {
return func(o *BrokerOptions) {
o.host = host
}
}

type SubOptions struct {
Topic string
codec codec.Codec
Expand Down Expand Up @@ -125,3 +132,30 @@ type PushOptions struct {
}

type PushOption func(o *PushOptions)

type TouchOptions struct {
// multiaddr like /ip4/0.0.0.0/tcp/8988
DestAddr string
Writer func(rw *bufio.ReadWriter)
Reader func(rw *bufio.ReadWriter)
}

type TouchOption func(o *TouchOptions)

func TouchAddr(addr string) TouchOption {
return func(o *TouchOptions) {
o.DestAddr = addr
}
}

func TouchReader(reader func(rw *bufio.ReadWriter)) TouchOption {
return func(o *TouchOptions) {
o.Reader = reader
}
}

func TouchWriter(writer func(rw *bufio.ReadWriter)) TouchOption {
return func(o *TouchOptions) {
o.Writer = writer
}
}

0 comments on commit de42c4b

Please sign in to comment.