diff --git a/pubsub/broker.go b/pubsub/broker.go index a23c998..c6a8b2a 100644 --- a/pubsub/broker.go +++ b/pubsub/broker.go @@ -1,6 +1,7 @@ package pubsub import ( + "bufio" "context" "fmt" "sync" @@ -8,6 +9,10 @@ import ( iface "github.com/ipfs/interface-go-ipfs-core" "github.com/joincloud/peers-touch-go/codec" "github.com/joincloud/peers-touch-go/logger" + "github.com/joincloud/peers-touch-go/peer" + peerlib "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" ) @@ -18,6 +23,7 @@ type Broker interface { Unsub(topic string) error Join(ctx context.Context, opts ...ChannelOption) (ch *Channel, err error) // Connect(id peer.PeerID) (err error) + Touch(ctx context.Context, opts ...TouchOption) (err error) Codec() codec.Codec Close() error } @@ -50,12 +56,60 @@ func (e *event) Message() Message { type broker struct { codec codec.Codec coreAPI iface.CoreAPI + host peer.Host subscribers map[string]Subscriber muMux sync.RWMutex exit chan chan error chans map[string]*Channel } +func (b *broker) Touch(ctx context.Context, opts ...TouchOption) (err error) { + options := TouchOptions{} + for _, opt := range opts { + opt(&options) + } + + defer func() { + if err != nil { + logger.Errorf("touch dest error: %s", err) + } + }() + + // todo check more options + if options.DestAddr == "" { + return fmt.Errorf("touch dest node shouldn't be nil") + } + + maddr, err := multiaddr.NewMultiaddr(options.DestAddr) + if err != nil { + return fmt.Errorf("touch dest node err: %s", err) + } + + info, err := peerlib.AddrInfoFromP2pAddr(maddr) + if err != nil { + return fmt.Errorf("touch get dest node addr info err: %s", err) + } + + b.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) + + s, err := b.host.NewStream(context.Background(), info.ID, "/chat/1.0.0") + if err != nil { + return fmt.Errorf("touch make new stream err: %s", err) + } + + rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) + + if options.Writer != nil { + go options.Writer(rw) + } + + if options.Reader != nil { + go options.Reader(rw) + } + + return nil +} + func (b *broker) Join(ctx context.Context, opts ...ChannelOption) (ch *Channel, err error) { ch, err = joinChannel(ctx, opts...) if err != nil { @@ -65,7 +119,22 @@ func (b *broker) Join(ctx context.Context, opts ...ChannelOption) (ch *Channel, return } -func (b *broker) Init(...BrokerOption) error { +func (b *broker) Init(opts ...BrokerOption) (err error) { + options := BrokerOptions{} + for _, opt := range opts { + opt(&options) + } + + defer func() { + if err != nil { + logger.Errorf("init broker error: %s", err) + } + }() + + if options.host == nil { + return fmt.Errorf("broker's host shouldn't be nil") + } + b.chans = make(map[string]*Channel) return nil } @@ -93,34 +162,6 @@ func (b *broker) Pub(ctx context.Context, event Event) (err error) { return } -/*func (b *broker) Connect(id peer.PeerID) (err error) { - // Turn the destination into a multiaddr. - maddr, err := multiaddr.NewMultiaddr(*dest) - if err != nil { - log.Fatalln(err) - } - - // Extract the peer ID from the multiaddr. - info, err := peer.AddrInfoFromP2pAddr(maddr) - if err != nil { - log.Fatalln(err) - } - - // Add the destination's peer multiaddress in the peerstore. - // This will be used during connection and stream creation by libp2p. - host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) - - // Start a stream with the destination. - // Multiaddress of the destination peer is fetched from the peerstore using 'peerId'. - s, err := host.NewStream(context.Background(), info.ID, "/chat/1.0.0") - if err != nil { - panic(err) - } - - // Create a buffered stream so that read and writes are non blocking. - rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) -}*/ - func (b *broker) Sub(ctx context.Context, topic string, handler Handler) (Subscriber, error) { b.muMux.Lock() defer b.muMux.Unlock() diff --git a/pubsub/options.go b/pubsub/options.go index 8df01d2..475070d 100644 --- a/pubsub/options.go +++ b/pubsub/options.go @@ -1,8 +1,8 @@ package pubsub import ( + "bufio" "context" - iface "github.com/ipfs/interface-go-ipfs-core" "github.com/joincloud/peers-touch-go/codec" "github.com/joincloud/peers-touch-go/peer" @@ -12,6 +12,7 @@ import ( type BrokerOptions struct { coreAPI iface.CoreAPI codec codec.Codec + host peer.Host } type BrokerOption func(o *BrokerOptions) @@ -28,6 +29,12 @@ func BrokerCodec(codec codec.Codec) BrokerOption { } } +func BrokerHost(host peer.Host) BrokerOption { + return func(o *BrokerOptions) { + o.host = host + } +} + type SubOptions struct { Topic string codec codec.Codec @@ -125,3 +132,30 @@ type PushOptions struct { } type PushOption func(o *PushOptions) + +type TouchOptions struct { + // multiaddr like /ip4/0.0.0.0/tcp/8988 + DestAddr string + Writer func(rw *bufio.ReadWriter) + Reader func(rw *bufio.ReadWriter) +} + +type TouchOption func(o *TouchOptions) + +func TouchAddr(addr string) TouchOption { + return func(o *TouchOptions) { + o.DestAddr = addr + } +} + +func TouchReader(reader func(rw *bufio.ReadWriter)) TouchOption { + return func(o *TouchOptions) { + o.Reader = reader + } +} + +func TouchWriter(writer func(rw *bufio.ReadWriter)) TouchOption { + return func(o *TouchOptions) { + o.Writer = writer + } +}