Skip to content

Commit

Permalink
ipfs transport continue
Browse files Browse the repository at this point in the history
  • Loading branch information
printfcoder committed Nov 2, 2020
1 parent af83519 commit 9a4f883
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 50 deletions.
33 changes: 1 addition & 32 deletions network/transport/ipfs/client.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,5 @@
package ipfs

import (
"github.com/joincloud/peers-touch-go/network/transport"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
)

type ipfsTransportClient struct {
host host.Host
protocol protocol.ID
conn network.Conn
local string
remote string
}

func (i *ipfsTransportClient) Recv(message *transport.Message) error {
panic("implement me")
}

func (i *ipfsTransportClient) Send(message *transport.Message) error {
panic("implement me")
}

func (i *ipfsTransportClient) Close() error {
panic("implement me")
}

func (i *ipfsTransportClient) Local() string {
return i.local
}

func (i *ipfsTransportClient) Remote() string {
return i.remote
*ipfsTransportSocket
}
21 changes: 12 additions & 9 deletions network/transport/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package ipfs

import (
"fmt"
"github.com/joincloud/peers-touch-go/codec"
"time"

log "github.com/joincloud/peers-touch-go/logger"
"github.com/joincloud/peers-touch-go/network/transport"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
Expand All @@ -17,6 +17,7 @@ type ipfsTransport struct {
opts transport.Options
host host.Host
protocolID protocol.ID
codec codec.Codec
}

func (i *ipfsTransport) Init(opts ...transport.Option) (err error) {
Expand Down Expand Up @@ -50,8 +51,6 @@ func (i *ipfsTransport) Dial(addr string, opts ...transport.DialOption) (c trans
opt(&options)
}

var conn network.Conn

defer func() {
if err != nil {
log.Errorf("transport %s dial peer %s error: %s", i.String(), addr, err)
Expand All @@ -69,16 +68,18 @@ func (i *ipfsTransport) Dial(addr string, opts ...transport.DialOption) (c trans
return
}

conn, err = i.host.Network().DialPeer(options.Context, id)
conn, err := i.host.NewStream(options.Context, id, i.protocolID)
if err != nil {
err = fmt.Errorf("transport %s dial peer %s error: %s", i.String(), addr, err)
return
}

c = &ipfsTransportClient{
conn: conn,
local: conn.LocalMultiaddr().String(),
remote: conn.RemoteMultiaddr().String(),
&ipfsTransportSocket{
stream: conn,
local: conn.Conn().LocalPeer().String(),
remote: conn.Conn().RemotePeer().String(),
},
}

return
Expand All @@ -105,8 +106,10 @@ func (i *ipfsTransport) Listen(addr string, opts ...transport.ListenOption) (tl
i.host = h

return &ipfsTransportListener{
it: i,
opts: options,
host: h,
pid: i.protocolID,
opts: options,
codec: i.codec,
}, nil
}

Expand Down
31 changes: 23 additions & 8 deletions network/transport/ipfs/listener.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
package ipfs

import (
"bufio"

"github.com/joincloud/peers-touch-go/codec"
"github.com/joincloud/peers-touch-go/network/transport"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
)

type ipfsTransportListener struct {
it *ipfsTransport
opts transport.ListenOptions
host host.Host
pid protocol.ID
opts transport.ListenOptions
codec codec.Codec
}

func (i *ipfsTransportListener) Addr() string {
return i.it.host.Addrs()[0].String()
return i.host.Addrs()[0].String()
}

func (i *ipfsTransportListener) Close() error {
return i.it.host.Close()
i.host.RemoveStreamHandler(i.pid)
return nil
}

func (i *ipfsTransportListener) Accept(f func(socket transport.Socket)) error {
for {

}
func (i *ipfsTransportListener) Accept(fn func(socket transport.Socket)) error {
i.host.SetStreamHandler(i.pid, func(s network.Stream) {
fn(&ipfsTransportSocket{
stream: s,
r: bufio.NewReader(s),
w: bufio.NewWriter(s),
codec: i.codec,
})
})
return nil
}
2 changes: 1 addition & 1 deletion network/transport/ipfs/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type ipfsTransportSocket struct {
stream network.Stream
w *bufio.Writer
r *bufio.Writer
r *bufio.Reader
codec codec.Codec
local string
remote string
Expand Down
1 change: 1 addition & 0 deletions store/bolt/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (b *boltStore) Write(r *store.Record, opts ...store.WriteOption) error {

return b.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(options.Table))
log.Infof("table: %s", options.Table)
// todo codec
v, _ := json.Marshal(r)
err := b.Put([]byte(r.Key), v)
Expand Down

0 comments on commit 9a4f883

Please sign in to comment.