Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear presence when housekeeping process #936

Draft
wants to merge 35 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5d48edb
Modify `clients.Deactivate` to handle presence clear
raararaara Jul 19, 2024
fb631fa
Resolve conflict
raararaara Jul 22, 2024
90cfe15
Fix invalid argument passing
hackerwins Jul 22, 2024
e496ad2
Fix lint warning
hackerwins Jul 22, 2024
a87a9f1
Add related test
raararaara Jul 22, 2024
1da3ec3
Add logic to prevent duplicate presence clear change
raararaara Jul 22, 2024
525d4c4
Simplify presenceChange clear condition
raararaara Jul 23, 2024
28b0db5
Add test related to duplicate presence clear
raararaara Jul 23, 2024
1c5b05c
Lint
raararaara Jul 23, 2024
e0b9fce
Fix test to return correct result
raararaara Jul 23, 2024
597b867
Add additional comments
raararaara Jul 23, 2024
e39b542
Cleanup code
raararaara Jul 23, 2024
74bca05
Cleanup code
raararaara Jul 23, 2024
3409bf1
Revise the deactivation logic
hackerwins Jul 23, 2024
e4ded21
Revert queries and test related to `DeactivateClient`
raararaara Jul 24, 2024
13211b1
Modify `clients.Deactivate` to load client within the method
raararaara Jul 26, 2024
cc14a6c
Fix rpcAddr to cover other testcases
raararaara Jul 26, 2024
ab278e7
Lint
raararaara Jul 26, 2024
17de638
Fix `ToClient` to allow token value to be received when creating clie…
raararaara Jul 29, 2024
532f87d
Lint
raararaara Jul 29, 2024
5ad0b89
Refactored `clients.Deactivate` to use `client.Detach`
raararaara Aug 1, 2024
6034b7d
Add gateway host to Yorkie deployment
raararaara Aug 6, 2024
5733c0e
Lint
raararaara Aug 6, 2024
c2da835
Modified client creation to be performed at `clients.Deactivate`
raararaara Aug 6, 2024
98d5938
Refactored comment to clarify APIhost determination logic based on en…
raararaara Aug 6, 2024
a1d8ed1
Move backend-gateway-addr to flag
hackerwins Aug 9, 2024
6619673
Added server token to ensure authorization for client deactivation du…
raararaara Aug 14, 2024
0aae9a0
Add systemService to handle perform background deactivation
raararaara Aug 19, 2024
3c5f5e1
Revert "Add systemService to handle perform background deactivation"
raararaara Aug 19, 2024
4ae640a
Add system server to handle perform housekeeping deactivation
raararaara Sep 24, 2024
9728100
Fix lint
raararaara Sep 24, 2024
c24411b
Fix lint proto
raararaara Sep 24, 2024
ab77581
Fix lint proto
raararaara Sep 24, 2024
60425b7
Modify buf.yaml temporarily to allow buf lint for CI test verification
raararaara Sep 24, 2024
1ea7a8d
Introduce system client
raararaara Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document, options ...
return err
}

// TODO(raararaara): We need to revert the presence clearing from the local
// changes, if the server fails to detach the document.
res, err := c.client.DetachDocument(
ctx,
withShardKey(connect.NewRequest(&api.DetachDocumentRequest{
Expand Down
5 changes: 5 additions & 0 deletions pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,11 @@ func (d *Document) BroadcastEventHandlers() map[string]func(
return d.broadcastEventHandlers
}

// setInternalDoc sets the given internal document to this document.
func (d *Document) setInternalDoc(internalDoc *InternalDocument) {
d.doc = internalDoc
}

func messageFromMsgAndArgs(msgAndArgs ...interface{}) string {
if len(msgAndArgs) == 0 {
return ""
Expand Down
19 changes: 19 additions & 0 deletions pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ func (d *InternalDocument) Checkpoint() change.Checkpoint {
return d.checkpoint
}

// SyncCheckpoint syncs the checkpoint and the changeID with the given serverSeq
// and clientSeq.
func (d *InternalDocument) SyncCheckpoint(serverSeq int64, clientSeq uint32) {
d.changeID = change.NewID(
clientSeq,
serverSeq,
d.changeID.Lamport(),
d.changeID.ActorID(),
)
d.checkpoint = d.checkpoint.SyncClientSeq(clientSeq)
}

// HasLocalChanges returns whether this document has local changes or not.
func (d *InternalDocument) HasLocalChanges() bool {
return len(d.localChanges) > 0
Expand Down Expand Up @@ -372,3 +384,10 @@ func (d *InternalDocument) AddOnlineClient(clientID string) {
func (d *InternalDocument) RemoveOnlineClient(clientID string) {
d.onlineClients.Delete(clientID)
}

// ToDocument converts this document to Document.
func (d *InternalDocument) ToDocument() *Document {
doc := New(d.key)
doc.setInternalDoc(d)
return doc
}
93 changes: 67 additions & 26 deletions server/clients/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ import (
"errors"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/json"
"github.com/yorkie-team/yorkie/pkg/document/presence"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/packs"
)

var (
Expand All @@ -46,42 +51,78 @@ func Activate(
// Deactivate deactivates the given client.
func Deactivate(
ctx context.Context,
db database.Database,
be *backend.Backend,
refKey types.ClientRefKey,
) (*database.ClientInfo, error) {
// TODO(hackerwins): We need to remove the presence of the client from the document.
// Be careful that housekeeping is executed by the leader. And documents are sharded
// by the servers in the cluster. So, we need to consider the case where the leader is
// not the same as the server that handles the document.

// TODO(raararaara): When deactivating a client, we need to update three DB properties
// (ClientInfo.Status, ClientInfo.Documents, SyncedSeq) in DB.
// Updating the sub-properties of ClientInfo guarantees atomicity as it involves a single MongoDB document.
// However, SyncedSeqs are stored in separate documents, so we can't ensure atomic updates for both.
// Currently, if SyncedSeqs update fails, it mainly impacts GC efficiency without causing major issues.
// We need to consider implementing a correction logic to remove SyncedSeqs in the future.
clientInfo, err := db.DeactivateClient(ctx, refKey)
// NOTE(hackerwins): Before deactivating the client, we need to detach all
// attached documents from the client.
// Because detachments and deactivation are separate steps, failure of steps
// must be considered. If each step of detachments is failed, some documents
// are still attached and the client is not deactivated. In this case,
// the client or housekeeping process should retry the deactivation.
clientInfo, err := be.DB.FindClientInfoByRefKey(ctx, refKey)
if err != nil {
return nil, err
}

// 01. Detach attached documents from the client.
actorID, err := clientInfo.ID.ToActorID()
if err != nil {
return nil, err
}

projectInfo, err := be.DB.FindProjectInfoByID(ctx, clientInfo.ProjectID)
if err != nil {
return nil, err
}
project := projectInfo.ToProject()

for docID, info := range clientInfo.Documents {
if info.Status != database.DocumentAttached {
continue
}

docInfo, err := be.DB.FindDocInfoByRefKey(ctx, types.DocRefKey{
ProjectID: clientInfo.ProjectID,
DocID: docID,
})
if err != nil {
return nil, err
}

doc, err := packs.BuildDocForCheckpoint(ctx, be, docInfo, info.ServerSeq, info.ClientSeq, actorID)
if err != nil {
return nil, err
}

if err := doc.Update(func(root *json.Object, p *presence.Presence) error {
p.Clear()
return nil
}); err != nil {
return nil, err
}

// TODO(raararaara): We're currently updating SyncedSeq one by one. This approach is similar
// to n+1 query problem. We need to investigate if we can optimize this process by using a single query in the future.
for docID, clientDocInfo := range clientInfo.Documents {
if err := db.UpdateSyncedSeq(
ctx,
clientInfo,
types.DocRefKey{
ProjectID: refKey.ProjectID,
DocID: docID,
},
clientDocInfo.ServerSeq,
); err != nil {
// TODO(hackerwins): This is a temporary solution to detach the document
// from the client. Documents are shared between multiple servers in the
// cluster to simplify the implementation including the distributed lock.
// In the future, we need to request the detachments to the load balancer
// and the load balancer will forward the request to the server that has
// the document.
if _, err = packs.PushPull(ctx, be, project, clientInfo, docInfo, doc.CreateChangePack(), packs.PushPullOptions{
Mode: types.SyncModePushOnly,
Status: document.StatusDetached,
}); err != nil {
return nil, err
}
}

return clientInfo, err
// 02. Deactivate the client.
clientInfo, err = be.DB.DeactivateClient(ctx, refKey)
if err != nil {
return nil, err
}

return clientInfo, nil
}

// FindActiveClientInfo find the active client info by the given ref key.
Expand Down
2 changes: 1 addition & 1 deletion server/clients/housekeeping.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func DeactivateInactives(

deactivatedCount := 0
for _, clientInfo := range candidates {
if _, err := Deactivate(ctx, be.DB, clientInfo.RefKey()); err != nil {
if _, err := Deactivate(ctx, be, clientInfo.RefKey()); err != nil {
return database.DefaultProjectID, err
}

Expand Down
8 changes: 4 additions & 4 deletions server/documents/documents.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func ListDocumentSummaries(
}

if includeSnapshot {
doc, err := packs.BuildDocumentForServerSeq(ctx, be, docInfo, docInfo.ServerSeq)
doc, err := packs.BuildInternalDocForServerSeq(ctx, be, docInfo, docInfo.ServerSeq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func GetDocumentSummary(
return nil, err
}

doc, err := packs.BuildDocumentForServerSeq(ctx, be, docInfo, docInfo.ServerSeq)
doc, err := packs.BuildInternalDocForServerSeq(ctx, be, docInfo, docInfo.ServerSeq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -135,7 +135,7 @@ func GetDocumentSummaries(
snapshot := ""
if includeSnapshot {
// TODO(hackerwins, kokodak): Resolve the N+1 problem.
doc, err := packs.BuildDocumentForServerSeq(ctx, be, docInfo, docInfo.ServerSeq)
doc, err := packs.BuildInternalDocForServerSeq(ctx, be, docInfo, docInfo.ServerSeq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func GetDocumentByServerSeq(
return nil, err
}

doc, err := packs.BuildDocumentForServerSeq(ctx, be, docInfo, serverSeq)
doc, err := packs.BuildInternalDocForServerSeq(ctx, be, docInfo, serverSeq)
if err != nil {
return nil, err
}
Expand Down
24 changes: 22 additions & 2 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/pkg/units"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
Expand Down Expand Up @@ -210,8 +211,27 @@ func PushPull(
return respPack, nil
}

// BuildDocumentForServerSeq returns a new document for the given serverSeq.
func BuildDocumentForServerSeq(
// BuildDocForCheckpoint returns a new document for the given checkpoint.
func BuildDocForCheckpoint(
ctx context.Context,
be *backend.Backend,
docInfo *database.DocInfo,
serverSeq int64,
clientSeq uint32,
actorID *time.ActorID,
) (*document.Document, error) {
internalDoc, err := BuildInternalDocForServerSeq(ctx, be, docInfo, serverSeq)
if err != nil {
return nil, err
}

internalDoc.SetActor(actorID)
internalDoc.SyncCheckpoint(serverSeq, clientSeq)
return internalDoc.ToDocument(), nil
}

// BuildInternalDocForServerSeq returns a new document for the given serverSeq.
func BuildInternalDocForServerSeq(
ctx context.Context,
be *backend.Backend,
docInfo *database.DocInfo,
Expand Down
2 changes: 1 addition & 1 deletion server/packs/pushpull.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func pullSnapshot(
initialServerSeq int64,
) (*ServerPack, error) {
// Build document from DB if the size of changes for the response is greater than the snapshot threshold.
doc, err := BuildDocumentForServerSeq(ctx, be, docInfo, initialServerSeq)
doc, err := BuildInternalDocForServerSeq(ctx, be, docInfo, initialServerSeq)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/rpc/yorkie_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *yorkieServer) DeactivateClient(
}

project := projects.From(ctx)
_, err = clients.Deactivate(ctx, s.backend.DB, types.ClientRefKey{
_, err = clients.Deactivate(ctx, s.backend, types.ClientRefKey{
ProjectID: project.ID,
ClientID: types.IDFromActorID(actorID),
})
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (r *Yorkie) DeactivateClient(ctx context.Context, c1 *client.Client) error
return err
}

_, err = clients.Deactivate(ctx, r.backend.DB, types.ClientRefKey{
_, err = clients.Deactivate(ctx, r.backend, types.ClientRefKey{
ProjectID: project.ID,
ClientID: types.IDFromActorID(c1.ID()),
})
Expand Down
23 changes: 23 additions & 0 deletions test/integration/presence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,4 +466,27 @@ func TestPresence(t *testing.T) {

assert.Equal(t, expected, responsePairs)
})

t.Run("presence clear by server test", func(t *testing.T) {
ctx := context.Background()

d1 := document.New(helper.TestDocKey(t))
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
assert.NoError(t, c2.Attach(ctx, d2))
defer func() {
// No need to detach c1
assert.NoError(t, c2.Detach(ctx, d2))
}()

assert.NoError(t, c1.Sync(ctx))
assert.NoError(t, c2.Sync(ctx))
assert.Equal(t, 2, len(d1.AllPresences()))
assert.Equal(t, 2, len(d2.AllPresences()))

assert.NoError(t, defaultServer.DeactivateClient(ctx, c1))
assert.NoError(t, c2.Sync(ctx))

assert.Equal(t, 1, len(d2.AllPresences()))
})
}
Loading