Skip to content

Commit

Permalink
sync sessions now have trace ids
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Sep 11, 2024
1 parent d70971d commit 2a9aebb
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
20 changes: 12 additions & 8 deletions op.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,32 @@ func ParsePacket(pack []byte) (lit byte, id, ref rdx.ID, body []byte, err error)
return
}

func ParseHandshake(body []byte) (mode SyncMode, vv rdx.VV, err error) {
// handshake: H(T{pro,src} M(mode) V(V{p,s}+) ...)
func ParseHandshake(body []byte) (mode SyncMode, vv rdx.VV, trace_id []byte, err error) {
// handshake: H(T{pro,src} M(mode) V(V{p,s}+), T(trace_id) ...)
var mbody, vbody []byte
rest := body
mbody, rest = protocol.Take('M', rest)
if mbody == nil {
return 0, nil, ErrBadHPacket
return 0, nil, nil, ErrBadHPacket
}

vbody, _ = protocol.Take('V', rest)
vbody, rest = protocol.Take('V', rest)
if vbody == nil {
return 0, nil, ErrBadHPacket
return 0, nil, nil, ErrBadHPacket
}

vv = make(rdx.VV)
if err := vv.PutTLV(vbody); err != nil {
return 0, nil, err
return 0, nil, nil, err
}

if err := mode.Unzip(mbody); err != nil {
return 0, nil, err
return 0, nil, nil, err
}

return mode, vv, nil
trace_id, _ = protocol.Take('S', rest)
if trace_id == nil {
return 0, nil, nil, ErrBadHPacket
}
return mode, vv, trace_id, nil
}
40 changes: 37 additions & 3 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package chotki
import (
"bytes"
"context"
"crypto/sha1"
"encoding/hex"
"errors"
"io"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -13,6 +16,7 @@ import (
"github.com/drpcorg/chotki/protocol"
"github.com/drpcorg/chotki/rdx"
"github.com/drpcorg/chotki/utils"
"github.com/google/uuid"
)

const SyncBlockBits = 28
Expand Down Expand Up @@ -74,6 +78,8 @@ func (s SyncState) String() string {
return []string{"SendHandshake", "SendDiff", "SendLive", "SendEOF", "SendNone", "SendPing", "SendPong"}[s]
}

const TraceSize = 10

type Syncer struct {
Src uint64
Name string
Expand All @@ -93,6 +99,8 @@ type Syncer struct {
hostvv, peervv rdx.VV
vpack []byte
reason error
myTraceId [TraceSize]byte
theirsTraceid [TraceSize]byte

lock sync.Mutex
cond sync.Cond
Expand All @@ -101,7 +109,7 @@ type Syncer struct {
}

func (sync *Syncer) withDefaultArgs(args ...any) []any {
return append([]any{"name", sync.Name}, args...)
return append([]any{"name", sync.Name, "trace_id", sync.getTraceId()}, args...)
}

func (sync *Syncer) Close() error {
Expand Down Expand Up @@ -275,11 +283,19 @@ func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error) {
sync.lock.Lock()
mode := sync.Mode.Zip()
sync.lock.Unlock()
// handshake: H(T{pro,src} M(mode) V(V{p,s}+))
uuid, err := uuid.NewV7()
if err != nil {
return nil, err
}
hash := sha1.Sum(uuid[:])
sync.myTraceId = [TraceSize]byte(hash[:TraceSize])

// handshake: H(T{pro,src} M(mode) V(V{p,s}+), T(trace_ids))
hs := protocol.Record('H',
protocol.TinyRecord('T', sync.snaplast.ZipBytes()),
protocol.TinyRecord('M', mode),
protocol.Record('V', sync.vvit.Value()),
protocol.Record('S', sync.myTraceId[:]),
)

return protocol.Records{hs}, nil
Expand Down Expand Up @@ -487,13 +503,31 @@ func (sync *Syncer) Drain(ctx context.Context, recs protocol.Records) (err error
return
}

func (sync *Syncer) getTraceId() string {
theirs := hex.EncodeToString(sync.theirsTraceid[:])
mine := hex.EncodeToString(sync.myTraceId[:])
if strings.Compare(mine, theirs) >= 0 {
return mine + "-" + theirs
} else {
return theirs + "-" + mine
}
}

func (sync *Syncer) DrainHandshake(recs protocol.Records) (err error) {
lit, _, _, body, e := ParsePacket(recs[0])
if lit != 'H' || e != nil {
return ErrBadHPacket
}
var mode SyncMode
mode, sync.peervv, err = ParseHandshake(body)
var trace_id []byte
mode, sync.peervv, trace_id, err = ParseHandshake(body)
if trace_id != nil {
if len(trace_id) != len(sync.theirsTraceid) {
err = ErrBadHPacket
} else {
sync.theirsTraceid = [TraceSize]byte(trace_id)
}
}
sync.lock.Lock()
sync.Mode &= mode
sync.lock.Unlock()
Expand Down

0 comments on commit 2a9aebb

Please sign in to comment.