diff --git a/network/transport/ipfs/client.go b/network/transport/ipfs/client.go index baf3aea..069217b 100644 --- a/network/transport/ipfs/client.go +++ b/network/transport/ipfs/client.go @@ -1,36 +1,5 @@ package ipfs -import ( - "github.com/joincloud/peers-touch-go/network/transport" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/protocol" -) - type ipfsTransportClient struct { - host host.Host - protocol protocol.ID - conn network.Conn - local string - remote string -} - -func (i *ipfsTransportClient) Recv(message *transport.Message) error { - panic("implement me") -} - -func (i *ipfsTransportClient) Send(message *transport.Message) error { - panic("implement me") -} - -func (i *ipfsTransportClient) Close() error { - panic("implement me") -} - -func (i *ipfsTransportClient) Local() string { - return i.local -} - -func (i *ipfsTransportClient) Remote() string { - return i.remote + *ipfsTransportSocket } diff --git a/network/transport/ipfs/ipfs.go b/network/transport/ipfs/ipfs.go index 93dd775..3e2ca39 100644 --- a/network/transport/ipfs/ipfs.go +++ b/network/transport/ipfs/ipfs.go @@ -2,13 +2,13 @@ package ipfs import ( "fmt" + "github.com/joincloud/peers-touch-go/codec" "time" log "github.com/joincloud/peers-touch-go/logger" "github.com/joincloud/peers-touch-go/network/transport" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" ) @@ -17,6 +17,7 @@ type ipfsTransport struct { opts transport.Options host host.Host protocolID protocol.ID + codec codec.Codec } func (i *ipfsTransport) Init(opts ...transport.Option) (err error) { @@ -50,8 +51,6 @@ func (i *ipfsTransport) Dial(addr string, opts ...transport.DialOption) (c trans opt(&options) } - var conn network.Conn - defer func() { if err != nil { log.Errorf("transport %s dial peer %s error: %s", i.String(), addr, err) @@ -69,16 +68,18 @@ func (i *ipfsTransport) Dial(addr string, opts ...transport.DialOption) (c trans return } - conn, err = i.host.Network().DialPeer(options.Context, id) + conn, err := i.host.NewStream(options.Context, id, i.protocolID) if err != nil { err = fmt.Errorf("transport %s dial peer %s error: %s", i.String(), addr, err) return } c = &ipfsTransportClient{ - conn: conn, - local: conn.LocalMultiaddr().String(), - remote: conn.RemoteMultiaddr().String(), + &ipfsTransportSocket{ + stream: conn, + local: conn.Conn().LocalPeer().String(), + remote: conn.Conn().RemotePeer().String(), + }, } return @@ -105,8 +106,10 @@ func (i *ipfsTransport) Listen(addr string, opts ...transport.ListenOption) (tl i.host = h return &ipfsTransportListener{ - it: i, - opts: options, + host: h, + pid: i.protocolID, + opts: options, + codec: i.codec, }, nil } diff --git a/network/transport/ipfs/listener.go b/network/transport/ipfs/listener.go index fa5f3ac..b87ef30 100644 --- a/network/transport/ipfs/listener.go +++ b/network/transport/ipfs/listener.go @@ -1,24 +1,39 @@ package ipfs import ( + "bufio" + + "github.com/joincloud/peers-touch-go/codec" "github.com/joincloud/peers-touch-go/network/transport" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/protocol" ) type ipfsTransportListener struct { - it *ipfsTransport - opts transport.ListenOptions + host host.Host + pid protocol.ID + opts transport.ListenOptions + codec codec.Codec } func (i *ipfsTransportListener) Addr() string { - return i.it.host.Addrs()[0].String() + return i.host.Addrs()[0].String() } func (i *ipfsTransportListener) Close() error { - return i.it.host.Close() + i.host.RemoveStreamHandler(i.pid) + return nil } -func (i *ipfsTransportListener) Accept(f func(socket transport.Socket)) error { - for { - - } +func (i *ipfsTransportListener) Accept(fn func(socket transport.Socket)) error { + i.host.SetStreamHandler(i.pid, func(s network.Stream) { + fn(&ipfsTransportSocket{ + stream: s, + r: bufio.NewReader(s), + w: bufio.NewWriter(s), + codec: i.codec, + }) + }) + return nil } diff --git a/network/transport/ipfs/socket.go b/network/transport/ipfs/socket.go index 9b95ec9..6fa8819 100644 --- a/network/transport/ipfs/socket.go +++ b/network/transport/ipfs/socket.go @@ -13,7 +13,7 @@ import ( type ipfsTransportSocket struct { stream network.Stream w *bufio.Writer - r *bufio.Writer + r *bufio.Reader codec codec.Codec local string remote string diff --git a/store/bolt/bolt.go b/store/bolt/bolt.go index 9f732a8..3d4e872 100644 --- a/store/bolt/bolt.go +++ b/store/bolt/bolt.go @@ -97,6 +97,7 @@ func (b *boltStore) Write(r *store.Record, opts ...store.WriteOption) error { return b.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(options.Table)) + log.Infof("table: %s", options.Table) // todo codec v, _ := json.Marshal(r) err := b.Put([]byte(r.Key), v)