diff --git a/nakama/main.go b/nakama/main.go index 3db8d60..ae6e7dc 100644 --- a/nakama/main.go +++ b/nakama/main.go @@ -37,16 +37,9 @@ const ( UNAUTHENTICATED = 16 ) -type personaTagStatus string - type receiptChan chan *Receipt const ( - personaTagStatusUnknown personaTagStatus = "unknown" - personaTagStatusPending personaTagStatus = "pending" - personaTagStatusAccepted personaTagStatus = "accepted" - personaTagStatusRejected personaTagStatus = "rejected" - EnvCardinalAddr = "CARDINAL_ADDR" EnvCardinalNamespace = "CARDINAL_NAMESPACE" @@ -75,8 +68,8 @@ func InitModule(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runti return fmt.Errorf("failed to init namespace: %w", err) } - if err := initReceiptStreaming(logger); err != nil { - return fmt.Errorf("failed to init receipt streaming: %w", err) + if err := initReceiptDispatcher(logger); err != nil { + return fmt.Errorf("failed to init receipt dispatcher: %w", err) } if err := initReceiptMatch(ctx, logger, db, nk, initializer); err != nil { @@ -91,8 +84,13 @@ func InitModule(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runti return fmt.Errorf("failed to init persona tag assignment map: %w", err) } - if err := initPersonaEndpoints(logger, initializer); err != nil { - return fmt.Errorf("failed to init persona endpoints: %w", err) + ptv, err := initPersonaTagVerifier(logger, nk, globalReceiptsDispatcher) + if err != nil { + return fmt.Errorf("failed to init persona tag verifier: %w", err) + } + + if err := initPersonaTagEndpoints(logger, initializer, ptv); err != nil { + return fmt.Errorf("failed to init persona tag endpoints: %w", err) } if err := initCardinalEndpoints(logger, initializer); err != nil { @@ -110,7 +108,7 @@ func initNamespace() error { return nil } -func initReceiptStreaming(log runtime.Logger) error { +func initReceiptDispatcher(log runtime.Logger) error { globalReceiptsDispatcher = newReceiptsDispatcher() go globalReceiptsDispatcher.pollReceipts(log) go globalReceiptsDispatcher.dispatch(log) @@ -166,8 +164,8 @@ func initPersonaTagAssignmentMap(ctx context.Context, logger runtime.Logger, nk } // initPersonaEndpoints sets up the nakame RPC endpoints that are used to claim a persona tag and display a persona tag. -func initPersonaEndpoints(logger runtime.Logger, initializer runtime.Initializer) error { - if err := initializer.RegisterRpc("nakama/claim-persona", handleClaimPersona); err != nil { +func initPersonaTagEndpoints(logger runtime.Logger, initializer runtime.Initializer, ptv *personaTagVerifier) error { + if err := initializer.RegisterRpc("nakama/claim-persona", handleClaimPersona(ptv)); err != nil { return err } if err := initializer.RegisterRpc("nakama/show-persona", handleShowPersona); err != nil { @@ -185,142 +183,67 @@ func getUserID(ctx context.Context) (string, error) { return userID, nil } -type personaTagStorageObj struct { - PersonaTag string `json:"persona_tag"` - Status personaTagStatus `json:"status"` - Tick uint64 `json:"tick"` - TxHash string `json:"tx_hash"` -} - -// storageObjToPersonaTagStorageObj converts a generic Nakama StorageObject to a locally defined personaTagStorageObj. -func storageObjToPersonaTagStorageObj(obj *api.StorageObject) (*personaTagStorageObj, error) { - var ptr personaTagStorageObj - if err := json.Unmarshal([]byte(obj.Value), &ptr); err != nil { - return nil, fmt.Errorf("unable to unmarshal persona tag storage obj: %w", err) - } - return &ptr, nil -} - -// getPersonaTag returns the persona tag (if any) associated with this user. ErrorNoPersonaTagForUser is returned -// if the user does not currently have a persona tag assigned. -func getPersonaTag(ctx context.Context, nk runtime.NakamaModule) (string, error) { - ptr, err := getPersonaTagStorageObj(ctx, nk) - if err != nil { - return "", err - } - if ptr.Status != personaTagStatusAccepted { - return "", ErrorNoPersonaTagForUser - } - return ptr.PersonaTag, nil -} - -func getPersonaTagStorageObj(ctx context.Context, nk runtime.NakamaModule) (*personaTagStorageObj, error) { - userID, err := getUserID(ctx) - if err != nil { - return nil, err - } - storeObjs, err := nk.StorageRead(ctx, []*runtime.StorageRead{ - { - Collection: cardinalCollection, - Key: personaTagKey, - UserID: userID, - }, - }) - if err != nil { - return nil, err - } - if len(storeObjs) == 0 { - return nil, ErrorPersonaTagStorageObjNotFound - } else if len(storeObjs) > 1 { - return nil, fmt.Errorf("expected 1 storage object, got %d with values %v", len(storeObjs), storeObjs) - } - ptr, err := storageObjToPersonaTagStorageObj(storeObjs[0]) - if err != nil { - return nil, err - } - return ptr, nil -} - -// setPersonaTagStorageObj saves the given personaTagStorageObj to the Nakama DB for the current user. -func setPersonaTagStorageObj(ctx context.Context, nk runtime.NakamaModule, obj *personaTagStorageObj) error { - userID, err := getUserID(ctx) - if err != nil { - return fmt.Errorf("unable to get user ID: %w", err) - } - buf, err := json.Marshal(obj) - if err != nil { - return fmt.Errorf("unable to marshal persona tag storage object: %w", err) - } - write := &runtime.StorageWrite{ - Collection: cardinalCollection, - Key: personaTagKey, - UserID: userID, - Value: string(buf), - PermissionRead: runtime.STORAGE_PERMISSION_NO_READ, - PermissionWrite: runtime.STORAGE_PERMISSION_NO_WRITE, - } - - _, err = nk.StorageWrite(ctx, []*runtime.StorageWrite{write}) - if err != nil { - return err - } - return nil -} +// nakamaRPCHandler is the signature required for handlers that are passed to Nakama's RegisterRpc method. +// This type is defined just to make the function below a little more readable. +type nakamaRPCHandler func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) // handleClaimPersona handles a request to Nakama to associate the current user with the persona tag in the payload. -func handleClaimPersona(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) { - if ptr, err := getPersonaTagStorageObj(ctx, nk); err != nil && err != ErrorPersonaTagStorageObjNotFound { - return logError(logger, "unable to get persona tag storage object: %w", err) - } else if err == nil { - switch ptr.Status { - case personaTagStatusPending: - return logCode(logger, ALREADY_EXISTS, "persona tag %q is pending for this account", ptr.PersonaTag) - case personaTagStatusAccepted: - return logCode(logger, ALREADY_EXISTS, "persona tag %q already associated with this account", ptr.PersonaTag) - default: - // In other cases, allow the user to claim a persona tag. +func handleClaimPersona(ptv *personaTagVerifier) nakamaRPCHandler { + return func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) { + if ptr, err := loadPersonaTagStorageObj(ctx, nk); err != nil && err != ErrorPersonaTagStorageObjNotFound { + return logError(logger, "unable to get persona tag storage object: %w", err) + } else if err == nil { + switch ptr.Status { + case personaTagStatusPending: + return logCode(logger, ALREADY_EXISTS, "persona tag %q is pending for this account", ptr.PersonaTag) + case personaTagStatusAccepted: + return logCode(logger, ALREADY_EXISTS, "persona tag %q already associated with this account", ptr.PersonaTag) + default: + // In other cases, allow the user to claim a persona tag. + } } - } - ptr := &personaTagStorageObj{} - if err := json.Unmarshal([]byte(payload), ptr); err != nil { - return logError(logger, "unable to marshal payload: %w", err) - } - if ptr.PersonaTag == "" { - return logCode(logger, INVALID_ARGUMENT, "persona_tag field must not be empty") - } + ptr := &personaTagStorageObj{} + if err := json.Unmarshal([]byte(payload), ptr); err != nil { + return logError(logger, "unable to marshal payload: %w", err) + } + if ptr.PersonaTag == "" { + return logCode(logger, INVALID_ARGUMENT, "persona_tag field must not be empty") + } - ptr.Status = personaTagStatusPending - if err := setPersonaTagStorageObj(ctx, nk, ptr); err != nil { - return logError(logger, "unable to set persona tag storage object: %w", err) - } + ptr.Status = personaTagStatusPending + if err := ptr.savePersonaTagStorageObj(ctx, nk); err != nil { + return logError(logger, "unable to set persona tag storage object: %w", err) + } - userID, err := getUserID(ctx) - if err != nil { - return logError(logger, "unable to get userID: %w", err) - } + userID, err := getUserID(ctx) + if err != nil { + return logError(logger, "unable to get userID: %w", err) + } - // Try to actually assign this personaTag->UserID in the sync map. If this succeeds, Nakama is OK with this - // user having the persona tag. This assignment still needs to be checked with cardinal. - if ok := setPersonaTagAssignment(ptr.PersonaTag, userID); !ok { - ptr.Status = personaTagStatusRejected - if err := setPersonaTagStorageObj(ctx, nk, ptr); err != nil { - return logError(logger, "unable to set persona tag storage object: %v", err) + // Try to actually assign this personaTag->UserID in the sync map. If this succeeds, Nakama is OK with this + // user having the persona tag. This assignment still needs to be checked with cardinal. + if ok := setPersonaTagAssignment(ptr.PersonaTag, userID); !ok { + ptr.Status = personaTagStatusRejected + if err := ptr.savePersonaTagStorageObj(ctx, nk); err != nil { + return logError(logger, "unable to set persona tag storage object: %v", err) + } + return logCode(logger, ALREADY_EXISTS, "persona tag %q is not available", ptr.PersonaTag) } - return logCode(logger, ALREADY_EXISTS, "persona tag %q is not available", ptr.PersonaTag) - } - txHash, tick, err := cardinalCreatePersona(ctx, nk, ptr.PersonaTag) - if err != nil { - return logError(logger, "unable to make create persona request to cardinal: %v", err) - } + txHash, tick, err := cardinalCreatePersona(ctx, nk, ptr.PersonaTag) + if err != nil { + return logError(logger, "unable to make create persona request to cardinal: %v", err) + } - ptr.Tick = tick - ptr.TxHash = txHash - if err := setPersonaTagStorageObj(ctx, nk, ptr); err != nil { - return logError(logger, "unable to save persona tag storage object: %v", err) + ptr.Tick = tick + ptr.TxHash = txHash + if err := ptr.savePersonaTagStorageObj(ctx, nk); err != nil { + return logError(logger, "unable to save persona tag storage object: %v", err) + } + ptv.addPendingPersonaTag(userID, ptr.TxHash) + return ptr.toJSON() } - return ptr.toJSON() } // verifyPersonaTag makes a request to Cardinal to see if this Nakama instance actually owns the given persona tag. @@ -334,31 +257,15 @@ func verifyPersonaTag(ctx context.Context, ptr *personaTagStorageObj) (verified } func handleShowPersona(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) { - ptr, err := getPersonaTagStorageObj(ctx, nk) + ptr, err := loadPersonaTagStorageObj(ctx, nk) if errors.Is(err, ErrorPersonaTagStorageObjNotFound) { return logError(logger, "no persona tag found: %w", err) } else if err != nil { return logError(logger, "unable to get persona tag storage object: %w", err) } - - if ptr.Status == personaTagStatusPending { - logger.Debug("persona tag status is pending. Attempting to verify against cardinal.") - verified, err := verifyPersonaTag(ctx, ptr) - if err == ErrorPersonaSignerUnknown { - // The status should remain pending - return ptr.toJSON() - } else if err != nil { - return logError(logger, "signer address could not be verified: %w", err) - } - logger.Debug("done with request. verified is %v", verified) - if verified { - ptr.Status = personaTagStatusAccepted - } else { - ptr.Status = personaTagStatusRejected - } - if err := setPersonaTagStorageObj(ctx, nk, ptr); err != nil { - return logError(logger, "unable to set persona tag storage object: %w", err) - } + ptr, err = ptr.attemptToUpdatePending(ctx, nk) + if err != nil { + logError(logger, "unable to update pending state: %v", err) } return ptr.toJSON() } @@ -434,11 +341,19 @@ func setPersonaTagAssignment(personaTag, userID string) (ok bool) { } func makeSignedPayload(ctx context.Context, nk runtime.NakamaModule, payload string) (io.Reader, error) { - personaTag, err := getPersonaTag(ctx, nk) + ptr, err := loadPersonaTagStorageObj(ctx, nk) + if err != nil { + return nil, err + } + ptr, err = ptr.attemptToUpdatePending(ctx, nk) if err != nil { return nil, err } + if ptr.Status != personaTagStatusAccepted { + return nil, ErrorNoPersonaTagForUser + } + personaTag := ptr.PersonaTag pk, nonce, err := getPrivateKeyAndANonce(ctx, nk) sp, err := sign.NewSignedPayload(pk, personaTag, globalNamespace, nonce, payload) if err != nil { @@ -450,8 +365,3 @@ func makeSignedPayload(ctx context.Context, nk runtime.NakamaModule, payload str } return bytes.NewReader(buf), nil } - -func (p *personaTagStorageObj) toJSON() (string, error) { - buf, err := json.Marshal(p) - return string(buf), err -} diff --git a/nakama/persona_tag_storage.go b/nakama/persona_tag_storage.go new file mode 100644 index 0000000..e37eba2 --- /dev/null +++ b/nakama/persona_tag_storage.go @@ -0,0 +1,141 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/heroiclabs/nakama-common/api" + "github.com/heroiclabs/nakama-common/runtime" +) + +// personaTagStorageObj contains persona tag information for a specific user, and keeps track of whether the +// persona tag has been successfully registered with cardinal. +type personaTagStorageObj struct { + PersonaTag string `json:"persona_tag"` + Status personaTagStatus `json:"status"` + Tick uint64 `json:"tick"` + TxHash string `json:"tx_hash"` + // version is used with Nakama storage layer to allow for optimistic locking. Saving this storage + // object succeeds only if the passed in version matches the version in the storage layer. + // see https://heroiclabs.com/docs/nakama/concepts/storage/collections/#conditional-writes for more info. + version string `json:"-"` +} + +type personaTagStatus string + +const ( + personaTagStatusUnknown personaTagStatus = "unknown" + personaTagStatusPending personaTagStatus = "pending" + personaTagStatusAccepted personaTagStatus = "accepted" + personaTagStatusRejected personaTagStatus = "rejected" +) + +// loadPersonaTagStorageObj loads the current user's persona tag storage object from Nakama's storage layer. The +// "current user" comes from the user ID stored in the given context. +func loadPersonaTagStorageObj(ctx context.Context, nk runtime.NakamaModule) (*personaTagStorageObj, error) { + userID, err := getUserID(ctx) + if err != nil { + return nil, err + } + storeObjs, err := nk.StorageRead(ctx, []*runtime.StorageRead{ + { + Collection: cardinalCollection, + Key: personaTagKey, + UserID: userID, + }, + }) + if err != nil { + return nil, err + } + if len(storeObjs) == 0 { + return nil, ErrorPersonaTagStorageObjNotFound + } else if len(storeObjs) > 1 { + return nil, fmt.Errorf("expected 1 storage object, got %d with values %v", len(storeObjs), storeObjs) + } + ptr, err := storageObjToPersonaTagStorageObj(storeObjs[0]) + if err != nil { + return nil, err + } + return ptr, nil +} + +// storageObjToPersonaTagStorageObj converts a generic Nakama StorageObject to a locally defined personaTagStorageObj. +func storageObjToPersonaTagStorageObj(obj *api.StorageObject) (*personaTagStorageObj, error) { + var ptr personaTagStorageObj + if err := json.Unmarshal([]byte(obj.Value), &ptr); err != nil { + return nil, fmt.Errorf("unable to unmarshal persona tag storage obj: %w", err) + } + ptr.version = obj.Version + return &ptr, nil +} + +// attemptToUpdatePending attempts to change the given personaTagStorageObj's Status from "pending" to either "accepted" +// or "rejected" by using cardinal as the source of truth. If the Status is not "pending", this call is a no-op. +func (p *personaTagStorageObj) attemptToUpdatePending(ctx context.Context, nk runtime.NakamaModule) (*personaTagStorageObj, error) { + if p.Status != personaTagStatusPending { + return p, nil + } + + verified, err := p.verifyPersonaTag(ctx) + if err == ErrorPersonaSignerUnknown { + // Leave the Status as pending. + return p, nil + } else if err != nil { + return nil, err + } + if verified { + p.Status = personaTagStatusAccepted + } else { + p.Status = personaTagStatusRejected + } + // Attempt to save the updated Status to Nakama. One reason this can fail is that the underlying record was + // updated while this processing was going on. Whatever the reason, re-fetch this record from Nakama's storage. + if err := p.savePersonaTagStorageObj(ctx, nk); err != nil { + return loadPersonaTagStorageObj(ctx, nk) + } + return p, nil +} + +// verifyPersonaTag queries cardinal to see if the signer address for the given persona tag matches Nakama's signer +// address +func (p *personaTagStorageObj) verifyPersonaTag(ctx context.Context) (verified bool, err error) { + gameSignerAddress, err := cardinalQueryPersonaSigner(ctx, p.PersonaTag, p.Tick) + if err != nil { + return false, err + } + nakamaSignerAddress := getSignerAddress() + return gameSignerAddress == nakamaSignerAddress, nil +} + +// savePersonaTagStorageObj saves the given personaTagStorageObj to the Nakama DB for the current user. +func (p *personaTagStorageObj) savePersonaTagStorageObj(ctx context.Context, nk runtime.NakamaModule) error { + userID, err := getUserID(ctx) + if err != nil { + return fmt.Errorf("unable to get user ID: %w", err) + } + buf, err := json.Marshal(p) + if err != nil { + return fmt.Errorf("unable to marshal persona tag storage object: %w", err) + } + write := &runtime.StorageWrite{ + Collection: cardinalCollection, + Key: personaTagKey, + UserID: userID, + Value: string(buf), + Version: p.version, + PermissionRead: runtime.STORAGE_PERMISSION_NO_READ, + PermissionWrite: runtime.STORAGE_PERMISSION_NO_WRITE, + } + + _, err = nk.StorageWrite(ctx, []*runtime.StorageWrite{write}) + if err != nil { + return err + } + return nil +} + +func (p *personaTagStorageObj) toJSON() (string, error) { + buf, err := json.Marshal(p) + return string(buf), err +} diff --git a/nakama/persona_tag_verifier.go b/nakama/persona_tag_verifier.go new file mode 100644 index 0000000..3f127cd --- /dev/null +++ b/nakama/persona_tag_verifier.go @@ -0,0 +1,140 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/heroiclabs/nakama-common/runtime" +) + +// personaTagVerifier is a helper struct that asynchronously collects both persona tag registration requests (from +// nakama) AND persona tag transaction receipts from cardinal. When the result of both systems has been recorded, +// this struct attempts to update the user's PersonaTagStorageObj to reflect the success/failure of the claim persona +// tag request. +type personaTagVerifier struct { + // txHashToPending keeps track of the state of pending claim persona tag requests. A sync.Map is not required + // because all map updates happen in a single goroutine. Updates are transmitted to the goroutine + // via the receiptCh channel and the pendingCh channel. + txHashToPending map[string]pendingPersonaTagRequest + receiptCh receiptChan + pendingCh chan txHashAndUserID + nk runtime.NakamaModule + logger runtime.Logger +} + +type pendingPersonaTagRequest struct { + lastUpdate time.Time + userID string + status personaTagStatus +} + +type txHashAndUserID struct { + txHash string + userID string +} + +const personaVerifierSessionName = "persona_verifier_session" + +func (p *personaTagVerifier) addPendingPersonaTag(userID, txHash string) { + p.pendingCh <- txHashAndUserID{ + userID: userID, + txHash: txHash, + } +} + +func initPersonaTagVerifier(logger runtime.Logger, nk runtime.NakamaModule, rd *receiptsDispatcher) (*personaTagVerifier, error) { + ptv := &personaTagVerifier{ + txHashToPending: map[string]pendingPersonaTagRequest{}, + receiptCh: make(receiptChan, 100), + pendingCh: make(chan txHashAndUserID), + nk: nk, + logger: logger, + } + rd.subscribe(personaVerifierSessionName, ptv.receiptCh) + go ptv.consume() + return ptv, nil +} + +func (p *personaTagVerifier) consume() { + cleanupTick := time.Tick(time.Minute) + for { + var currTxHash string + select { + case now := <-cleanupTick: + p.cleanupStaleEntries(now) + case receipt := <-p.receiptCh: + currTxHash = p.handleReceipt(receipt) + case pending := <-p.pendingCh: + currTxHash = p.handlePending(pending) + } + if currTxHash == "" { + continue + } + if err := p.attemptVerification(currTxHash); err != nil { + p.logger.Error("failed to verify persona tag: %v", err) + } + } +} + +func (p *personaTagVerifier) cleanupStaleEntries(now time.Time) { + for key, val := range p.txHashToPending { + if diff := now.Sub(val.lastUpdate); diff > time.Minute { + delete(p.txHashToPending, key) + } + } +} + +func (p *personaTagVerifier) handleReceipt(receipt *Receipt) string { + result, ok := receipt.Result["Success"] + if !ok { + return "" + } + success, ok := result.(bool) + if !ok { + return "" + } + pending := p.txHashToPending[receipt.TxHash] + pending.lastUpdate = time.Now() + if success { + pending.status = personaTagStatusAccepted + } else { + pending.status = personaTagStatusRejected + } + p.txHashToPending[receipt.TxHash] = pending + return receipt.TxHash +} + +func (p *personaTagVerifier) handlePending(tuple txHashAndUserID) string { + pending := p.txHashToPending[tuple.txHash] + pending.lastUpdate = time.Now() + pending.userID = tuple.userID + p.txHashToPending[tuple.txHash] = pending + return tuple.txHash +} + +func (p *personaTagVerifier) attemptVerification(txHash string) error { + pending, ok := p.txHashToPending[txHash] + if !ok || pending.userID == "" || pending.status == "" { + // We're missing a success/failure message from cardinal or the initial request from the + // user to claim a persona tag. + return nil + } + // We have both a user ID and a success message. Save this success/failure to nakama's storage system + ctx := context.Background() + ctx = context.WithValue(ctx, runtime.RUNTIME_CTX_USER_ID, pending.userID) + ptr, err := loadPersonaTagStorageObj(ctx, p.nk) + if err != nil { + return fmt.Errorf("unable to get persona tag storage obj: %w", err) + } + if ptr.Status != personaTagStatusPending { + return fmt.Errorf("expected a pending persona tag status but got %q", ptr.Status) + } + ptr.Status = pending.status + if err := ptr.savePersonaTagStorageObj(ctx, p.nk); err != nil { + return fmt.Errorf("unable to set persona tag storage object: %w", err) + } + delete(p.txHashToPending, txHash) + p.logger.Debug("result of associating user %q with persona tag %q: %v", pending.userID, ptr.PersonaTag, pending.status) + return nil +}