From e7b93000ac420eec240ac81f9491eb609c65c93e Mon Sep 17 00:00:00 2001 From: Kaveh Ghasemloo Date: Sat, 7 Mar 2020 18:46:09 -0800 Subject: [PATCH] cleanup dsstore PiperOrigin-RevId: 299617479 Change-Id: If059239b1e59d77135287aea9319ad399c2548e4 --- gcp/dam/main.go | 2 +- gcp/dam_import/main.go | 4 +- gcp/ic/main.go | 2 +- gcp/ic_import/main.go | 2 +- lib/dsstore/datastore.go | 268 ++++++++++++++++++++++++--------------- 5 files changed, 171 insertions(+), 107 deletions(-) diff --git a/gcp/dam/main.go b/gcp/dam/main.go index 3cbad83d..8c80c562 100644 --- a/gcp/dam/main.go +++ b/gcp/dam/main.go @@ -85,7 +85,7 @@ func main() { var store storage.Store switch storageType { case "datastore": - store = dsstore.NewDatastoreStorage(ctx, project, srvName, cfgPath) + store = dsstore.NewStore(ctx, project, srvName, cfgPath) case "memory": store = storage.NewMemoryStorage(srvName, cfgPath) // Import and resolve template variables, if any. diff --git a/gcp/dam_import/main.go b/gcp/dam_import/main.go index 6b0cdeeb..069dcae6 100644 --- a/gcp/dam_import/main.go +++ b/gcp/dam_import/main.go @@ -53,7 +53,7 @@ func main() { accountPrefix = *pre } ctx := context.Background() - store := dsstore.NewDatastoreStorage(context.Background(), project, service, *path) + store := dsstore.NewStore(context.Background(), project, service, *path) wh := saw.MustNew(ctx, store) vars := map[string]string{ "${YOUR_PROJECT_ID}": project, @@ -81,7 +81,7 @@ func main() { glog.Infof("SUCCESS resetting DAM service %q", service) } -func cleanupServiceAccounts(ctx context.Context, accountPrefix, project string, store *dsstore.DatastoreStorage) { +func cleanupServiceAccounts(ctx context.Context, accountPrefix, project string, store *dsstore.Store) { wh := saw.MustNew(ctx, store) var ( removed, skipped, errors int diff --git a/gcp/ic/main.go b/gcp/ic/main.go index f6a93a07..60442ad4 100644 --- a/gcp/ic/main.go +++ b/gcp/ic/main.go @@ -86,7 +86,7 @@ func main() { var store storage.Store switch storageType { case "datastore": - store = dsstore.NewDatastoreStorage(ctx, project, srvName, cfgPath) + store = dsstore.NewStore(ctx, project, srvName, cfgPath) case "memory": store = storage.NewMemoryStorage(srvName, cfgPath) // Import and resolve template variables, if any. diff --git a/gcp/ic_import/main.go b/gcp/ic_import/main.go index 38671372..18745c1e 100644 --- a/gcp/ic_import/main.go +++ b/gcp/ic_import/main.go @@ -42,7 +42,7 @@ func main() { envPrefix = "-" + env service += envPrefix } - store := dsstore.NewDatastoreStorage(context.Background(), project, service, *path) + store := dsstore.NewStore(context.Background(), project, service, *path) vars := map[string]string{ "${YOUR_PROJECT_ID}": project, diff --git a/lib/dsstore/datastore.go b/lib/dsstore/datastore.go index c0eb0b9f..a33221f0 100644 --- a/lib/dsstore/datastore.go +++ b/lib/dsstore/datastore.go @@ -20,7 +20,6 @@ import ( "fmt" "math/rand" "strings" - "sync" "time" glog "github.com/golang/glog" /* copybara-comment */ @@ -35,28 +34,35 @@ import ( cpb "github.com/GoogleCloudPlatform/healthcare-federated-access-services/proto/common/v1" /* copybara-comment: go_proto */ ) +// storageVersion is the version of store data model. +// If there is a breaking change to the store data model, the version needs to +// be updated. + const ( storageType = "gcpDatastore" storageVersion = "v0" + metaVersion = "version" +) + +// Data +var ( entityKind = "entity" historyKind = "history" metaKind = "meta" - - metaVersion = "version" - - multiDeleteChunkSize = 400 // must not exceed 500 as per Datastore API - minJitter = 1 * 1e9 // nanoseconds as integer for math - maxJitter = 3 * 1e9 // nanoseconds as integer for math ) -var ( - mutex = &sync.Mutex{} - wipeKinds = []string{historyKind, entityKind} -) +// Key is the key for items. +type Key struct { + Datatype string `datastore:"type"` + Realm string `datastore:"realm"` + User string `datastore:"user_id"` + ID string `datastore:"id"` + Rev int64 `datastore:"rev"` +} -// DatastoreStorage is a datastore based implementation of storage. -type DatastoreStorage struct { +// Store is a datastore based implementation of storage. +type Store struct { client *datastore.Client // TODO: these fileds are only used for Info and are not related to the store. @@ -69,55 +75,58 @@ type DatastoreStorage struct { path string } -type DatastoreEntity struct { +// Entity is a datastore entity for data. +type Entity struct { Key *datastore.Key `datastore:"__key__"` Service string `datastore:"service"` Datatype string `datastore:"type"` Realm string `datastore:"realm"` User string `datastore:"user_id"` - Id string `datastore:"id"` + ID string `datastore:"id"` Rev int64 `datastore:"rev"` Version string `datastore:"version,noindex"` Modified int64 `datastore:"modified"` Content string `datastore:"content,noindex"` } -type DatastoreHistory struct { +// History is an datastore entity for history. +type History struct { Key *datastore.Key `datastore:"__key__"` Service string `datastore:"service"` Datatype string `datastore:"type"` Realm string `datastore:"realm"` User string `datastore:"user_id"` - Id string `datastore:"id"` + ID string `datastore:"id"` Rev int64 `datastore:"rev"` Version string `datastore:"version,noindex"` Modified int64 `datastore:"modified"` Content string `datastore:"content,noindex"` } -type DatastoreMeta struct { +// Meta is a datastore entity for meta. +type Meta struct { Key *datastore.Key `datastore:"__key__"` Name string `datastore:"name"` Value string `datastore:"value,noindex"` } -// NewDatastoreStorage creates a new datastore storace and initilizes it. +// NewStore creates a new datastore storace and initilizes it. // TODO: create the client for datastore in the main and inject it. -func NewDatastoreStorage(ctx context.Context, project, service, path string) *DatastoreStorage { +func NewStore(ctx context.Context, project, service, path string) *Store { client, err := datastore.NewClient(ctx, project) if err != nil { glog.Fatalf("cannot initialize datastore: %v", err) } s := New(client, project, service, path) - if err := s.Init(); err != nil { + if err := s.Init(context.Background()); err != nil { glog.Fatalf("Datastore failed to initialize: %v", err) } return s } // New creates a new storage. -func New(client *datastore.Client, project, service, path string) *DatastoreStorage { - return &DatastoreStorage{ +func New(client *datastore.Client, project, service, path string) *Store { + return &Store{ client: client, project: project, service: service, @@ -125,7 +134,9 @@ func New(client *datastore.Client, project, service, path string) *DatastoreStor } } -func (s *DatastoreStorage) Info() map[string]string { +// Info returns some information about the store. +// TODO: delete this and pass the information directly rather than through store. +func (s *Store) Info() map[string]string { return map[string]string{ "type": storageType, "version": storageVersion, @@ -135,26 +146,27 @@ func (s *DatastoreStorage) Info() map[string]string { } // Exists checks if a data entity with the given name exists. -func (s *DatastoreStorage) Exists(datatype, realm, user, id string, rev int64) (bool, error) { - k := datastore.NameKey(entityKind, s.entityKey(datatype, realm, user, id, rev), nil) - e := new(DatastoreEntity) - err := s.client.Get(context.Background() /* TODO: pass ctx from request */, k, e) - if err == nil { - return true, nil - } else if err == datastore.ErrNoSuchEntity { +func (s *Store) Exists(datatype, realm, user, id string, rev int64) (bool, error) { + ctx := context.Background() /* TODO: pass ctx from request */ + k := datastore.NameKey(entityKind, s.newEntityKey(datatype, realm, user, id, rev), nil) + err := s.client.Get(ctx, k, &Entity{}) + if err == datastore.ErrNoSuchEntity { return false, nil } - return false, err + if err != nil { + return false, err + } + return true, nil } // Read reads a data entity. -func (s *DatastoreStorage) Read(datatype, realm, user, id string, rev int64, content proto.Message) error { +func (s *Store) Read(datatype, realm, user, id string, rev int64, content proto.Message) error { return s.ReadTx(datatype, realm, user, id, rev, content, nil) } // ReadTx reads a data entity inside a transaction. // ReadTx will not see the writes inside the transaction. -func (s *DatastoreStorage) ReadTx(datatype, realm, user, id string, rev int64, content proto.Message, tx storage.Tx) (ferr error) { +func (s *Store) ReadTx(datatype, realm, user, id string, rev int64, content proto.Message, tx storage.Tx) (ferr error) { if tx == nil { var err error tx, err = s.Tx(false) @@ -168,13 +180,14 @@ func (s *DatastoreStorage) ReadTx(datatype, realm, user, id string, rev int64, c } }() } - dstx, ok := tx.(*DatastoreTx) + + dstx, ok := tx.(*Tx) if !ok { return status.Errorf(codes.InvalidArgument, "invalid transaction") } - k := datastore.NameKey(entityKind, s.entityKey(datatype, realm, user, id, rev), nil) - e, err := s.datastoreEntity(k, datatype, realm, user, id, rev, content) + k := datastore.NameKey(entityKind, s.newEntityKey(datatype, realm, user, id, rev), nil) + e, err := s.newEntity(k, datatype, realm, user, id, rev, content) if err != nil { return err } @@ -196,7 +209,8 @@ func (s *DatastoreStorage) ReadTx(datatype, realm, user, id string, rev int64, c // if user is "" reads all users. // Returns the number of items matching the filter. // content is a map of user and id to values. -func (s *DatastoreStorage) MultiReadTx(datatype, realm, user string, filters [][]storage.Filter, offset, pageSize int, content map[string]map[string]proto.Message, typ proto.Message, tx storage.Tx) (_ int, ferr error) { +func (s *Store) MultiReadTx(datatype, realm, user string, filters [][]storage.Filter, offset, pageSize int, content map[string]map[string]proto.Message, typ proto.Message, tx storage.Tx) (_ int, ferr error) { + ctx := context.Background() /* TODO: pass ctx from request */ if tx == nil { var err error tx, err = s.Tx(false) @@ -210,11 +224,14 @@ func (s *DatastoreStorage) MultiReadTx(datatype, realm, user string, filters [][ } }() } + if pageSize > storage.MaxPageSize { pageSize = storage.MaxPageSize } - q := datastore.NewQuery(entityKind).Filter("service =", s.service).Filter("type =", datatype) + q := datastore.NewQuery(entityKind). + Filter("service =", s.service). + Filter("type =", datatype) if realm != storage.AllRealms { q = q.Filter("realm =", realm) } @@ -229,10 +246,10 @@ func (s *DatastoreStorage) MultiReadTx(datatype, realm, user string, filters [][ offset = 0 } - it := s.client.Run(context.Background() /* TODO: pass ctx from request */, q) + it := s.client.Run(ctx, q) count := 0 for { - var e DatastoreEntity + var e Entity _, err := it.Next(&e) if err == iterator.Done { break @@ -257,12 +274,10 @@ func (s *DatastoreStorage) MultiReadTx(datatype, realm, user string, filters [][ continue } if pageSize == 0 || pageSize > count { - userContent, ok := content[e.User] - if !ok { + if _, ok := content[e.User]; !ok { content[e.User] = make(map[string]proto.Message) - userContent = content[e.User] } - userContent[e.Id] = p + content[e.User][e.ID] = p } count++ } @@ -270,12 +285,13 @@ func (s *DatastoreStorage) MultiReadTx(datatype, realm, user string, filters [][ } // ReadHistory reads the history. -func (s *DatastoreStorage) ReadHistory(datatype, realm, user, id string, content *[]proto.Message) error { +func (s *Store) ReadHistory(datatype, realm, user, id string, content *[]proto.Message) error { return s.ReadHistoryTx(datatype, realm, user, id, content, nil) } // ReadHistoryTx reads the history inside a transaction. -func (s *DatastoreStorage) ReadHistoryTx(datatype, realm, user, id string, content *[]proto.Message, tx storage.Tx) (ferr error) { +func (s *Store) ReadHistoryTx(datatype, realm, user, id string, content *[]proto.Message, tx storage.Tx) (ferr error) { + ctx := context.Background() /* TODO: pass ctx from request */ if tx == nil { var err error tx, err = s.Tx(false) @@ -291,16 +307,24 @@ func (s *DatastoreStorage) ReadHistoryTx(datatype, realm, user, id string, conte } // TODO: handle pagination. - q := datastore.NewQuery(historyKind).Filter("service =", s.service).Filter("type =", datatype).Filter("realm =", realm).Filter("user_id =", user).Filter("id =", id).Order("rev").Limit(storage.MaxPageSize) - results := make([]DatastoreHistory, storage.MaxPageSize) - if _, err := s.client.GetAll(context.Background() /* TODO: pass ctx from request */, q, &results); err != nil { + q := datastore.NewQuery(historyKind).Filter("service =", s.service). + Filter("type =", datatype). + Filter("realm =", realm). + Filter("user_id =", user). + Filter("id =", id). + Order("rev"). + Limit(storage.MaxPageSize) + + results := make([]History, storage.MaxPageSize) + if _, err := s.client.GetAll(ctx, q, &results); err != nil { return err } + for _, e := range results { - he := new(cpb.HistoryEntry) if len(e.Content) == 0 { continue } + he := &cpb.HistoryEntry{} if err := jsonpb.Unmarshal(strings.NewReader(e.Content), he); err != nil { return err } @@ -310,12 +334,12 @@ func (s *DatastoreStorage) ReadHistoryTx(datatype, realm, user, id string, conte } // Write writes a data entity. -func (s *DatastoreStorage) Write(datatype, realm, user, id string, rev int64, content proto.Message, history proto.Message) error { +func (s *Store) Write(datatype, realm, user, id string, rev int64, content proto.Message, history proto.Message) error { return s.WriteTx(datatype, realm, user, id, rev, content, history, nil) } // WriteTx writes a data entity inside a transaction. -func (s *DatastoreStorage) WriteTx(datatype, realm, user, id string, rev int64, content proto.Message, history proto.Message, tx storage.Tx) (ferr error) { +func (s *Store) WriteTx(datatype, realm, user, id string, rev int64, content proto.Message, history proto.Message, tx storage.Tx) (ferr error) { if tx == nil { var err error tx, err = s.Tx(true) @@ -329,14 +353,16 @@ func (s *DatastoreStorage) WriteTx(datatype, realm, user, id string, rev int64, } }() } - dstx, ok := tx.(*DatastoreTx) + dstx, ok := tx.(*Tx) if !ok { return status.Errorf(codes.InvalidArgument, "invalid transaction") } + // TODO: ensure that the handling of last rev between write and delete are correct. + if rev != storage.LatestRev { - rk := datastore.NameKey(entityKind, s.entityKey(datatype, realm, user, id, rev), nil) - re, err := s.datastoreEntity(rk, datatype, realm, user, id, rev, content) + rk := datastore.NameKey(entityKind, s.newEntityKey(datatype, realm, user, id, rev), nil) + re, err := s.newEntity(rk, datatype, realm, user, id, rev, content) if err != nil { return err } @@ -345,9 +371,10 @@ func (s *DatastoreStorage) WriteTx(datatype, realm, user, id string, rev int64, return err } } + if history != nil { - hk := datastore.NameKey(historyKind, s.historyKey(datatype, realm, user, id, rev), nil) - he, err := s.datastoreHistory(hk, datatype, realm, user, id, rev, history) + hk := datastore.NameKey(historyKind, s.newHistoryKey(datatype, realm, user, id, rev), nil) + he, err := s.newHistory(hk, datatype, realm, user, id, rev, history) if err != nil { dstx.Rollback() return err @@ -357,8 +384,9 @@ func (s *DatastoreStorage) WriteTx(datatype, realm, user, id string, rev int64, return err } } - k := datastore.NameKey(entityKind, s.entityKey(datatype, realm, user, id, storage.LatestRev), nil) - e, err := s.datastoreEntity(k, datatype, realm, user, id, storage.LatestRev, content) + + k := datastore.NameKey(entityKind, s.newEntityKey(datatype, realm, user, id, storage.LatestRev), nil) + e, err := s.newEntity(k, datatype, realm, user, id, storage.LatestRev, content) if err != nil { dstx.Rollback() return err @@ -371,12 +399,12 @@ func (s *DatastoreStorage) WriteTx(datatype, realm, user, id string, rev int64, } // Delete deletes a data entity. -func (s *DatastoreStorage) Delete(datatype, realm, user, id string, rev int64) error { +func (s *Store) Delete(datatype, realm, user, id string, rev int64) error { return s.DeleteTx(datatype, realm, user, id, rev, nil) } // DeleteTx deletes a data entity inside a transaction. -func (s *DatastoreStorage) DeleteTx(datatype, realm, user, id string, rev int64, tx storage.Tx) (ferr error) { +func (s *Store) DeleteTx(datatype, realm, user, id string, rev int64, tx storage.Tx) (ferr error) { if tx == nil { var err error tx, err = s.Tx(true) @@ -390,12 +418,13 @@ func (s *DatastoreStorage) DeleteTx(datatype, realm, user, id string, rev int64, } }() } - dstx, ok := tx.(*DatastoreTx) + + dstx, ok := tx.(*Tx) if !ok { return status.Errorf(codes.InvalidArgument, "invalid transaction") } - k := datastore.NameKey(entityKind, s.entityKey(datatype, realm, user, id, rev), nil) + k := datastore.NameKey(entityKind, s.newEntityKey(datatype, realm, user, id, rev), nil) if err := dstx.Tx.Delete(k); err != nil { dstx.Rollback() if err == datastore.ErrNoSuchEntity { @@ -403,28 +432,35 @@ func (s *DatastoreStorage) DeleteTx(datatype, realm, user, id string, rev int64, } return err } + return nil } // MultiDeleteTx deletes all records of a certain data type within a realm. // If user is "", deletes for all users. -func (s *DatastoreStorage) MultiDeleteTx(datatype, realm, user string, tx storage.Tx) error { - q := datastore.NewQuery(entityKind).Filter("service =", s.service).Filter("type =", datatype).Filter("realm =", realm) +func (s *Store) MultiDeleteTx(datatype, realm, user string, tx storage.Tx) error { + q := datastore.NewQuery(entityKind). + Filter("service =", s.service). + Filter("type =", datatype). + Filter("realm =", realm) if user != storage.DefaultUser { q = q.Filter("user_id =", user) } - q = q.Filter("rev = ", storage.LatestRev).Order("id") + q = q.Filter("rev = ", storage.LatestRev). + Order("id") + _, err := s.multiDelete(q) return err } // Wipe deletes all data and history within a realm. // If realm is "" deletes for all realms. -func (s *DatastoreStorage) Wipe(realm string) error { +func (s *Store) Wipe(realm string) error { glog.Infof("Datastore wipe project %q service %q realm %q: started", s.project, s.service, realm) results := make(map[string]int) - for _, kind := range wipeKinds { - q := datastore.NewQuery(kind).Filter("service =", s.service) + for _, kind := range []string{historyKind, entityKind} { + q := datastore.NewQuery(kind). + Filter("service =", s.service) if realm != storage.AllRealms { q = q.Filter("realm =", realm) } @@ -438,11 +474,17 @@ func (s *DatastoreStorage) Wipe(realm string) error { return nil } -func (s *DatastoreStorage) multiDelete(q *datastore.Query) (int, error) { - keys, err := s.client.GetAll(context.Background() /* TODO: pass ctx from request */, q.KeysOnly(), nil) +// multiDelete all entities matching the provided query. +// Returns the total number of items matching the query. +func (s *Store) multiDelete(q *datastore.Query) (int, error) { + ctx := context.Background() /* TODO: pass ctx from request */ + keys, err := s.client.GetAll(ctx, q.KeysOnly(), nil) if err != nil { return 0, err } + + // Datastore API doesn't allow more than 500 per MultiDelete rpc. + const multiDeleteChunkSize = 400 total := len(keys) for i := 0; i < total; i += multiDeleteChunkSize { end := i + multiDeleteChunkSize @@ -457,7 +499,8 @@ func (s *DatastoreStorage) multiDelete(q *datastore.Query) (int, error) { return total, nil } -func (s *DatastoreStorage) Tx(update bool) (storage.Tx, error) { +// Tx creates a new transaction for the store. +func (s *Store) Tx(update bool) (storage.Tx, error) { var err error var dstx *datastore.Transaction if update { @@ -468,15 +511,22 @@ func (s *DatastoreStorage) Tx(update bool) (storage.Tx, error) { if err != nil { return nil, err } - return &DatastoreTx{ - writer: update, + return &Tx{ + update: update, Tx: dstx, }, nil } +const ( + minJitter = 1 * 1e9 // nanoseconds as integer for math + maxJitter = 3 * 1e9 // nanoseconds as integer for math +) + // LockTx returns a storage-wide lock by the given name. Only one such lock should // be requested at a time. If Tx is provided, it must be an update Tx. -func (s *DatastoreStorage) LockTx(lockName string, minFrequency time.Duration, tx storage.Tx) storage.Tx { +// TODO: get rid of this function and fix the code using it. +// Note: This doesn't provide distributed mutual exclusion, don't use this. +func (s *Store) LockTx(lockName string, minFrequency time.Duration, tx storage.Tx) storage.Tx { if tx == nil { var err error tx, err = s.Tx(true) @@ -515,12 +565,15 @@ func (s *DatastoreStorage) LockTx(lockName string, minFrequency time.Duration, t return tx } -// Init initilizes the storage. -func (s *DatastoreStorage) Init() error { - k := datastore.NameKey(metaKind, s.metaKey(metaVersion), nil) - meta := new(DatastoreMeta) +// Init initilizes the store. +// It creates some metadata information about the store on datastore. +// If metada information already exists on datastore, it comapres to see if they +// are compatible with the metadata information of the current store. +func (s *Store) Init(ctx context.Context) error { + k := datastore.NameKey(metaKind, s.newMetaKey(metaVersion), nil) + meta := &Meta{} if err := s.client.Get(context.Background() /* TODO: pass ctx from request */, k, meta); err == datastore.ErrNoSuchEntity { - meta = &DatastoreMeta{ + meta = &Meta{ Key: k, Name: metaVersion, Value: storageVersion, @@ -539,7 +592,9 @@ func (s *DatastoreStorage) Init() error { return nil } -func (s *DatastoreStorage) entityKey(datatype, realm, user, id string, rev int64) string { +// Data + +func (s *Store) newHistoryKey(datatype, realm, user, id string, rev int64) string { r := storage.LatestRevName if rev > 0 { r = fmt.Sprintf("%06d", rev) @@ -547,14 +602,22 @@ func (s *DatastoreStorage) entityKey(datatype, realm, user, id string, rev int64 if user == storage.DefaultUser { user = "~" } - return fmt.Sprintf("%s/%s/%s/%s/%s/%s", s.service, datatype, realm, user, id, r) + return fmt.Sprintf("%s/%s.%s/%s/%s/%s/%s", s.service, datatype, storage.HistoryRevName, realm, user, id, r) +} + +func (s *Store) newMeta(key *datastore.Key) *Meta { + return &Meta{ + Key: key, + Name: "version", + Value: storageVersion, + } } -func (s *DatastoreStorage) metaKey(id string) string { +func (s *Store) newMetaKey(id string) string { return fmt.Sprintf("%s/%s/%s/%s", s.service, "meta", id, "meta") } -func (s *DatastoreStorage) historyKey(datatype, realm, user, id string, rev int64) string { +func (s *Store) newEntityKey(datatype, realm, user, id string, rev int64) string { r := storage.LatestRevName if rev > 0 { r = fmt.Sprintf("%06d", rev) @@ -562,22 +625,21 @@ func (s *DatastoreStorage) historyKey(datatype, realm, user, id string, rev int6 if user == storage.DefaultUser { user = "~" } - return fmt.Sprintf("%s/%s.%s/%s/%s/%s/%s", s.service, datatype, storage.HistoryRevName, realm, user, id, r) + return fmt.Sprintf("%s/%s/%s/%s/%s/%s", s.service, datatype, realm, user, id, r) } -func (s *DatastoreStorage) datastoreEntity(key *datastore.Key, datatype, realm, user, id string, rev int64, content proto.Message) (*DatastoreEntity, error) { - m := jsonpb.Marshaler{} - js, err := m.MarshalToString(content) +func (s *Store) newEntity(key *datastore.Key, datatype, realm, user, id string, rev int64, content proto.Message) (*Entity, error) { + js, err := (&jsonpb.Marshaler{}).MarshalToString(content) if err != nil { return nil, err } - return &DatastoreEntity{ + return &Entity{ Key: key, Service: s.service, Datatype: datatype, Realm: realm, User: user, - Id: id, + ID: id, Rev: rev, Version: storageVersion, Modified: time.Now().Unix(), @@ -585,19 +647,18 @@ func (s *DatastoreStorage) datastoreEntity(key *datastore.Key, datatype, realm, }, nil } -func (s *DatastoreStorage) datastoreHistory(key *datastore.Key, datatype, realm, user, id string, rev int64, content proto.Message) (*DatastoreHistory, error) { - m := jsonpb.Marshaler{} - js, err := m.MarshalToString(content) +func (s *Store) newHistory(key *datastore.Key, datatype, realm, user, id string, rev int64, content proto.Message) (*History, error) { + js, err := (&jsonpb.Marshaler{}).MarshalToString(content) if err != nil { return nil, err } - return &DatastoreHistory{ + return &History{ Key: key, Service: s.service, Datatype: datatype, Realm: realm, User: user, - Id: id, + ID: id, Rev: rev, Version: storageVersion, Modified: time.Now().Unix(), @@ -605,18 +666,21 @@ func (s *DatastoreStorage) datastoreHistory(key *datastore.Key, datatype, realm, }, nil } -type DatastoreTx struct { - writer bool +// Transaction + +// Tx is a transaction. +type Tx struct { + update bool Tx *datastore.Transaction } // IsUpdate tells if the transaction is an update or read-only. -func (tx *DatastoreTx) IsUpdate() bool { - return tx.writer +func (tx *Tx) IsUpdate() bool { + return tx.update } // Finish attempts to commit a transaction. -func (tx *DatastoreTx) Finish() error { +func (tx *Tx) Finish() error { if tx.Tx == nil { return nil } @@ -630,7 +694,7 @@ func (tx *DatastoreTx) Finish() error { } // Rollback attempts to rollback a transaction. -func (tx *DatastoreTx) Rollback() error { +func (tx *Tx) Rollback() error { if tx.Tx == nil { return nil }