Skip to content

Commit

Permalink
Apply VersionVector to GC
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Jul 18, 2024
1 parent c78ffe7 commit fb3a582
Show file tree
Hide file tree
Showing 27 changed files with 1,150 additions and 886 deletions.
6 changes: 6 additions & 0 deletions api/docs/yorkie/v1/resources.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ components:
description: ""
title: min_synced_ticket
type: object
minSyncedVersionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: min_synced_version_vector
type: object
snapshot:
additionalProperties: false
description: ""
Expand Down
6 changes: 6 additions & 0 deletions api/docs/yorkie/v1/yorkie.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,12 @@ components:
description: ""
title: min_synced_ticket
type: object
minSyncedVersionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: min_synced_version_vector
type: object
snapshot:
additionalProperties: false
description: ""
Expand Down
1,569 changes: 792 additions & 777 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ message ChangePack {
bytes snapshot = 3;
VersionVector snapshot_version_vector = 7;
repeated Change changes = 4;
TimeTicket min_synced_ticket = 5;
TimeTicket min_synced_ticket = 5; // Deprecated
VersionVector min_synced_version_vector = 8;
bool is_removed = 6;
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/document/change/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *Change) PresenceChange() *innerpresence.PresenceChange {
return c.presenceChange
}

// CausallyAfter returns whether this change is causally after the given change.
func (c *Change) CausallyAfter(other *Change) bool {
return c.id.CausallyAfter(other.id)
// AfterOrEqual returns whether this change is after or equal to the given change.
func (c *Change) AfterOrEqual(other *Change) bool {
return c.id.AfterOrEqual(other.id)
}
6 changes: 3 additions & 3 deletions pkg/document/change/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (id ID) VersionVector() time.VersionVector {
return id.versionVector
}

// CausallyAfter returns whether this ID is causally after the given ID.
func (id ID) CausallyAfter(other ID) bool {
return id.versionVector.CausallyAfter(other.versionVector)
// AfterOrEqual returns whether this ID is causally after or equal the given ID.
func (id ID) AfterOrEqual(other ID) bool {
return id.versionVector.AfterOrEqual(other.versionVector)
}
4 changes: 4 additions & 0 deletions pkg/document/change/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type Pack struct {
// SnapshotVersionVector is the version vector of the snapshot if it exists.
SnapshotVersionVector time.VersionVector

// MinSyncedVersionVector is the minimum version vector taken by clients who
// attach the document.
MinSyncedVersionVector time.VersionVector

// MinSyncedTicket is the minimum logical time taken by clients who attach the document.
// It used to collect garbage on the replica on the client.
MinSyncedTicket *time.Ticket
Expand Down
6 changes: 3 additions & 3 deletions pkg/document/crdt/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ func (r *Root) DeepCopy() (*Root, error) {
}

// GarbageCollect purge elements that were removed before the given time.
func (r *Root) GarbageCollect(ticket *time.Ticket) (int, error) {
func (r *Root) GarbageCollect(vector time.VersionVector) (int, error) {
count := 0

for _, pair := range r.gcElementPairMap {
if ticket.Compare(pair.elem.RemovedAt()) >= 0 {
if vector.After(pair.elem.RemovedAt()) {
if err := pair.parent.Purge(pair.elem); err != nil {
return 0, err
}
Expand All @@ -157,7 +157,7 @@ func (r *Root) GarbageCollect(ticket *time.Ticket) (int, error) {
}

for _, pair := range r.gcNodePairMap {
if ticket.Compare(pair.Child.RemovedAt()) >= 0 {
if vector.After(pair.Child.RemovedAt()) {
if err := pair.Parent.Purge(pair.Child); err != nil {
return 0, err
}
Expand Down
59 changes: 26 additions & 33 deletions pkg/document/crdt/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/pkg/document/crdt"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/test/helper"
)

Expand Down Expand Up @@ -54,7 +53,7 @@ func TestRoot(t *testing.T) {
assert.Equal(t, "[0,2]", array.Marshal())
assert.Equal(t, 1, root.GarbageLen())

n, err := root.GarbageCollect(time.MaxTicket)
n, err := root.GarbageCollect(helper.MaxVectorClock())
assert.NoError(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, 0, root.GarbageLen())
Expand All @@ -66,39 +65,39 @@ func TestRoot(t *testing.T) {
text := crdt.NewText(crdt.NewRGATreeSplit(crdt.InitialTextNode()), ctx.IssueTimeTicket())

fromPos, toPos, _ := text.CreateRange(0, 0)
_, _, err := text.Edit(fromPos, toPos, nil, "Hello World", nil, ctx.IssueTimeTicket())
_, _, pairs, err := text.Edit(fromPos, toPos, nil, "Hello World", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
registerGCPairs(root, pairs)
assert.Equal(t, "Hello World", text.String())
assert.Equal(t, 0, root.GarbageLen())

fromPos, toPos, _ = text.CreateRange(5, 10)
_, _, err = text.Edit(fromPos, toPos, nil, "Yorkie", nil, ctx.IssueTimeTicket())
_, _, pairs, err = text.Edit(fromPos, toPos, nil, "Yorkie", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
registerGCPairs(root, pairs)
assert.Equal(t, "HelloYorkied", text.String())
assert.Equal(t, 1, root.GarbageLen())

fromPos, toPos, _ = text.CreateRange(0, 5)
_, _, err = text.Edit(fromPos, toPos, nil, "", nil, ctx.IssueTimeTicket())
_, _, pairs, err = text.Edit(fromPos, toPos, nil, "", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
registerGCPairs(root, pairs)
assert.Equal(t, "Yorkied", text.String())
assert.Equal(t, 2, root.GarbageLen())

fromPos, toPos, _ = text.CreateRange(6, 7)
_, _, err = text.Edit(fromPos, toPos, nil, "", nil, ctx.IssueTimeTicket())
_, _, pairs, err = text.Edit(fromPos, toPos, nil, "", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
registerGCPairs(root, pairs)
assert.Equal(t, "Yorkie", text.String())
assert.Equal(t, 3, root.GarbageLen())

// It contains code marked tombstone.
// CausallyAfter calling the garbage collector, the node will be removed.
// After calling the garbage collector, the node will be removed.
nodeLen := len(text.Nodes())
assert.Equal(t, 4, nodeLen)

n, err := root.GarbageCollect(time.MaxTicket)
n, err := root.GarbageCollect(helper.MaxVectorClock())
assert.NoError(t, err)
assert.Equal(t, 3, n)
assert.Equal(t, 0, root.GarbageLen())
Expand All @@ -107,43 +106,37 @@ func TestRoot(t *testing.T) {
})

t.Run("garbage collection for fragments of text", func(t *testing.T) {
type test struct {
steps := []struct {
from int
to int
content string
want string
garbage int
}{
{0, 0, "Hi World", `[0:0:00:0 {} ""][0:2:00:0 {} "Hi World"]`, 0},
{2, 7, "Earth", `[0:0:00:0 {} ""][0:2:00:0 {} "Hi"][0:3:00:0 {} "Earth"]{0:2:00:2 {} " Worl"}[0:2:00:7 {} "d"]`, 1},
{0, 2, "", `[0:0:00:0 {} ""]{0:2:00:0 {} "Hi"}[0:3:00:0 {} "Earth"]{0:2:00:2 {} " Worl"}[0:2:00:7 {} "d"]`, 2},
{5, 6, "", `[0:0:00:0 {} ""]{0:2:00:0 {} "Hi"}[0:3:00:0 {} "Earth"]{0:2:00:2 {} " Worl"}{0:2:00:7 {} "d"}`, 3},
{from: 0, to: 0, content: "Yorkie", want: "Yorkie", garbage: 0},
{from: 4, to: 5, content: "", want: "Yorke", garbage: 1},
{from: 2, to: 3, content: "", want: "Yoke", garbage: 2},
{from: 0, to: 1, content: "", want: "oke", garbage: 3},
}

root := helper.TestRoot()
ctx := helper.TextChangeContext(root)
text := crdt.NewText(crdt.NewRGATreeSplit(crdt.InitialTextNode()), ctx.IssueTimeTicket())

for _, step := range steps {
fromPos, toPos, _ := text.CreateRange(step.from, step.to)
_, _, pairs, err := text.Edit(fromPos, toPos, nil, step.content, nil, ctx.IssueTimeTicket())
for _, tc := range steps {
fromPos, toPos, _ := text.CreateRange(tc.from, tc.to)
_, _, pairs, err := text.Edit(fromPos, toPos, nil, tc.content, nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerGCPairs(root, pairs)
assert.Equal(t, step.want, text.ToTestString())
assert.Equal(t, step.garbage, root.GarbageLen())
assert.Equal(t, tc.want, text.String())
assert.Equal(t, tc.garbage, root.GarbageLen())
}

// It contains code marked tombstone.
// After calling the garbage collector, the node will be removed.
assert.Equal(t, 4, len(text.Nodes()))

n, err := root.GarbageCollect(time.MaxTicket)
n, err := root.GarbageCollect(helper.MaxVectorClock())
assert.NoError(t, err)
assert.Equal(t, 3, n)
assert.Equal(t, 0, root.GarbageLen())
assert.Equal(t, 1, len(text.Nodes()))
})

t.Run("garbage collection for fragments of text", func(t *testing.T) {
steps := []struct {
from int
Expand Down Expand Up @@ -171,7 +164,7 @@ func TestRoot(t *testing.T) {
assert.Equal(t, tc.garbage, root.GarbageLen())
}

n, err := root.GarbageCollect(time.MaxTicket)
n, err := root.GarbageCollect(helper.MaxVectorClock())
assert.NoError(t, err)
assert.Equal(t, 3, n)
assert.Equal(t, 0, root.GarbageLen())
Expand Down Expand Up @@ -208,7 +201,7 @@ func TestRoot(t *testing.T) {
nodeLen := len(text.Nodes())
assert.Equal(t, 3, nodeLen)

garbageLen, err := root.GarbageCollect(time.MaxTicket)
garbageLen, err := root.GarbageCollect(helper.MaxVectorClock())
assert.NoError(t, err)
assert.Equal(t, 2, garbageLen)
assert.Equal(t, 0, root.GarbageLen())
Expand Down Expand Up @@ -246,7 +239,7 @@ func TestRoot(t *testing.T) {
assert.Equal(t, `{"1":1,"3":3}`, obj.Marshal())
assert.Equal(t, 4, root.GarbageLen())

n, err := root.GarbageCollect(time.MaxTicket)
n, err := root.GarbageCollect(helper.MaxVectorClock())
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, 0, root.GarbageLen())
Expand All @@ -256,7 +249,7 @@ func TestRoot(t *testing.T) {
assert.Equal(t, `{"1":1}`, obj.Marshal())
assert.Equal(t, 1, root.GarbageLen())

n, err = root.GarbageCollect(time.MaxTicket)
n, err = root.GarbageCollect(helper.MaxVectorClock())
assert.NoError(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, 0, root.GarbageLen())
Expand Down
10 changes: 6 additions & 4 deletions pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Option func(*Options)
// Options configures how we set up the document.
type Options struct {
// DisableGC disables garbage collection.
// NOTE(hackerwins): This is temporary option. We need to remove this option
// after introducing the garbage collection based on the version vector.
DisableGC bool
}

Expand Down Expand Up @@ -214,7 +216,7 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {

// 04. Do Garbage collection.
if !d.options.DisableGC {
d.GarbageCollect(pack.MinSyncedTicket)
d.GarbageCollect(pack.MinSyncedVersionVector)
}

// 05. Update the status.
Expand Down Expand Up @@ -324,14 +326,14 @@ func (d *Document) Root() *json.Object {
}

// GarbageCollect purge elements that were removed before the given time.
func (d *Document) GarbageCollect(ticket *time.Ticket) int {
func (d *Document) GarbageCollect(vector time.VersionVector) int {
if d.cloneRoot != nil {
if _, err := d.cloneRoot.GarbageCollect(ticket); err != nil {
if _, err := d.cloneRoot.GarbageCollect(vector); err != nil {
panic(err)
}
}

n, err := d.doc.GarbageCollect(ticket)
n, err := d.doc.GarbageCollect(vector)
if err != nil {
panic(err)
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/document/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/yorkie-team/yorkie/pkg/document/json"
"github.com/yorkie-team/yorkie/pkg/document/presence"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/test/helper"
)

var (
Expand Down Expand Up @@ -487,7 +488,7 @@ func TestDocument(t *testing.T) {
)

assert.Equal(t, 1, doc.GarbageLen())
doc.GarbageCollect(time.MaxTicket)
doc.GarbageCollect(helper.MaxVectorClock(doc.ActorID()))
assert.Equal(t, 0, doc.GarbageLen())
assert.Equal(
t,
Expand Down Expand Up @@ -520,7 +521,7 @@ func TestDocument(t *testing.T) {
assert.Equal(t, "{}", doc.Marshal())
assert.Equal(t, 2, doc.GarbageLen())

doc.GarbageCollect(time.MaxTicket)
doc.GarbageCollect(helper.MaxVectorClock(doc.ActorID()))
assert.Equal(t, "{}", doc.Marshal())
assert.Equal(t, 0, doc.GarbageLen())
})
Expand All @@ -547,8 +548,8 @@ func TestDocument(t *testing.T) {

packA := docA.CreateChangePack()
packA.MinSyncedTicket = time.InitialTicket
assert.True(t, packA.Changes[1].CausallyAfter(packA.Changes[0]))
assert.False(t, packA.Changes[0].CausallyAfter(packA.Changes[1]))
assert.True(t, packA.Changes[1].AfterOrEqual(packA.Changes[0]))
assert.False(t, packA.Changes[0].AfterOrEqual(packA.Changes[1]))

// 02. create document with actorB and apply change packA of docA to docB and check version vector.
actorB, err := time.ActorIDFromHex("000000000000000000000002")
Expand All @@ -567,7 +568,7 @@ func TestDocument(t *testing.T) {
assert.Equal(t, int64(3), docB.VersionVector().VersionOf(actorB))
packB := docB.CreateChangePack()
packB.MinSyncedTicket = time.InitialTicket
assert.True(t, packB.Changes[0].CausallyAfter(packA.Changes[1]))
assert.True(t, packB.Changes[0].AfterOrEqual(packA.Changes[1]))

// 03. update docA and docB concurrently and check version vector.
assert.NoError(t, docA.Update(func(r *json.Object, p *presence.Presence) error {
Expand All @@ -580,6 +581,6 @@ func TestDocument(t *testing.T) {
}))
packA = docA.CreateChangePack()
packB = docB.CreateChangePack()
assert.False(t, packA.Changes[2].CausallyAfter(packB.Changes[1]))
assert.False(t, packA.Changes[2].AfterOrEqual(packB.Changes[1]))
})
}
10 changes: 5 additions & 5 deletions pkg/document/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/json"
"github.com/yorkie-team/yorkie/pkg/document/presence"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/test/helper"
)

func TestTreeGC(t *testing.T) {
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestTreeGC(t *testing.T) {
} else if s.op.code == DeleteNode {
root.GetTree("t").Edit(0, 2, nil, 0)
} else if s.op.code == GC {
doc.GarbageCollect(time.MaxTicket)
doc.GarbageCollect(helper.MaxVectorClock(doc.ActorID()))
}
return nil
}))
Expand All @@ -140,7 +140,7 @@ func TestTreeGC(t *testing.T) {
}

// 03. Garbage collect
doc.GarbageCollect(time.MaxTicket)
doc.GarbageCollect(helper.MaxVectorClock(doc.ActorID()))
assert.Equal(t, 0, doc.GarbageLen())
})
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestTextGC(t *testing.T) {
} else if s.op.code == DeleteNode {
root.GetText("t").Edit(0, 2, "")
} else if s.op.code == GC {
doc.GarbageCollect(time.MaxTicket)
doc.GarbageCollect(helper.MaxVectorClock(doc.ActorID()))
}
return nil
}))
Expand All @@ -215,7 +215,7 @@ func TestTextGC(t *testing.T) {
}

// 03. Garbage collect
doc.GarbageCollect(time.MaxTicket)
doc.GarbageCollect(helper.MaxVectorClock(doc.ActorID()))
assert.Equal(t, 0, doc.GarbageLen())
})
}
Expand Down
Loading

0 comments on commit fb3a582

Please sign in to comment.