Skip to content

Commit

Permalink
tx: lock-free catalog updates
Browse files Browse the repository at this point in the history
  • Loading branch information
asdine committed Nov 4, 2023
1 parent cd14cd2 commit ce7c2a3
Show file tree
Hide file tree
Showing 49 changed files with 503 additions and 1,485 deletions.
343 changes: 152 additions & 191 deletions internal/database/catalog.go

Large diffs are not rendered by default.

162 changes: 108 additions & 54 deletions internal/database/catalog_test.go

Large diffs are not rendered by default.

39 changes: 14 additions & 25 deletions internal/database/catalogstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,44 @@ import (
"github.com/genjidb/genji/types"
)

func LoadCatalog(tx *database.Transaction) (*database.Catalog, error) {
c := database.NewCatalog()
func LoadCatalog(tx *database.Transaction) error {
cw := tx.CatalogWriter()

err := c.Init(tx)
err := cw.Init(tx)
if err != nil {
return nil, err
return err
}

tables, indexes, sequences, err := loadCatalogStore(tx, c.CatalogTable)
tables, indexes, sequences, err := loadCatalogStore(tx, tx.Catalog.CatalogTable)
if err != nil {
return nil, errors.Wrap(err, "failed to load catalog store")
}

for _, tb := range tables {
// bind default values with catalog
for _, fc := range tb.FieldConstraints.Ordered {
if fc.DefaultValue == nil {
continue
}

fc.DefaultValue.Bind(c)
}
return errors.Wrap(err, "failed to load catalog store")
}

// add the __genji_catalog table to the list of tables
// so that it can be queried
ti := c.CatalogTable.Info().Clone()
ti := tx.Catalog.CatalogTable.Info().Clone()
// make sure that table is read-only
ti.ReadOnly = true
tables = append(tables, *ti)

// load tables and indexes first
c.Cache.Load(tables, indexes, nil)
tx.Catalog.Cache.Load(tables, indexes, nil)

if len(sequences) > 0 {
var seqList []database.Sequence
seqList, err = loadSequences(tx, c, sequences)
seqList, err = loadSequences(tx, sequences)
if err != nil {
return nil, errors.Wrap(err, "failed to load sequences")
return errors.Wrap(err, "failed to load sequences")
}

c.Cache.Load(nil, nil, seqList)
tx.Catalog.Cache.Load(nil, nil, seqList)
}

return c, nil
return nil
}

func loadSequences(tx *database.Transaction, c *database.Catalog, info []database.SequenceInfo) ([]database.Sequence, error) {
tb, err := c.GetTable(tx, database.SequenceTableName)
func loadSequences(tx *database.Transaction, info []database.SequenceInfo) ([]database.Sequence, error) {
tb, err := tx.Catalog.GetTable(tx, database.SequenceTableName)
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion internal/database/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ func (f FieldConstraints) convertArrayAtPath(path document.Path, a types.Array,
}

type TableExpression interface {
Bind(catalog *Catalog)
Eval(tx *Transaction, d types.Document) (types.Value, error)
String() string
}
Expand Down
62 changes: 43 additions & 19 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ const (
)

type Database struct {
DB *pebble.DB
Catalog *Catalog
DB *pebble.DB

catalogMu sync.RWMutex
catalog *Catalog

// If this is non-nil, the user is running an explicit transaction
// using the BEGIN statement.
Expand All @@ -27,8 +29,12 @@ type Database struct {
attachedTransaction *Transaction
attachedTxMu sync.Mutex

// This is used to prevent creating a new transaction
// during certain operations (commit, close, etc.)
txmu sync.RWMutex

// This limits the number of write transactions to 1.
writetxmu *sync.Mutex
writetxmu sync.Mutex

// TransactionIDs is used to assign transaction an ID at runtime.
// Since transaction IDs are not persisted and not used for concurrent
Expand All @@ -45,7 +51,7 @@ type Database struct {
// Options are passed to Open to control
// how the database is loaded.
type Options struct {
CatalogLoader func(tx *Transaction) (*Catalog, error)
CatalogLoader func(tx *Transaction) error
}

// CatalogLoader loads the catalog from the disk.
Expand Down Expand Up @@ -113,8 +119,7 @@ var DefaultComparer = &pebble.Comparer{

func New(pdb *pebble.DB, opts *Options) (*Database, error) {
db := Database{
DB: pdb,
writetxmu: &sync.Mutex{},
DB: pdb,
Store: kv.NewStore(pdb, kv.Options{
RollbackSegmentNamespace: int64(RollbackSegmentNamespace),
}),
Expand All @@ -133,25 +138,26 @@ func New(pdb *pebble.DB, opts *Options) (*Database, error) {
}
defer tx.Rollback()

if opts.CatalogLoader == nil {
db.Catalog = NewCatalog()
} else {
db.Catalog, err = opts.CatalogLoader(tx)
db.catalog = NewCatalog()
tx.Catalog = db.catalog

if opts.CatalogLoader != nil {
err = opts.CatalogLoader(tx)
if err != nil {
return nil, errors.Wrap(err, "failed to load catalog")
}
} else {
err = tx.CatalogWriter().Init(tx)
if err != nil {
return nil, err
}
}

err = db.cleanupTransientNamespaces(tx)
if err != nil {
return nil, err
}

err = db.Catalog.Init(tx)
if err != nil {
return nil, err
}

err = tx.Commit()
if err != nil {
return nil, err
Expand Down Expand Up @@ -187,13 +193,13 @@ func (db *Database) closeDatabase() error {
}
defer tx.Session.Close()

for _, seqName := range db.Catalog.ListSequences() {
seq, err := db.Catalog.GetSequence(seqName)
for _, seqName := range tx.Catalog.ListSequences() {
seq, err := tx.Catalog.GetSequence(seqName)
if err != nil {
return err
}

err = seq.Release(tx, db.Catalog)
err = seq.Release(tx)
if err != nil {
return err
}
Expand Down Expand Up @@ -232,6 +238,9 @@ func (db *Database) Begin(writable bool) (*Transaction, error) {
// attached to the database and prevents any other transaction to be opened afterwards
// until it gets rolled back or commited.
func (db *Database) BeginTx(opts *TxOptions) (*Transaction, error) {
db.txmu.RLock()
defer db.txmu.RUnlock()

if opts == nil {
opts = new(TxOptions)
}
Expand Down Expand Up @@ -264,14 +273,16 @@ func (db *Database) beginTx(opts *TxOptions) (*Transaction, error) {
}

tx := Transaction{
db: db,
Store: db.Store,
Session: sess,
Writable: !opts.ReadOnly,
ID: atomic.AddUint64(&db.TransactionIDs, 1),
Catalog: db.Catalog(),
}

if !opts.ReadOnly {
tx.WriteTxMu = db.writetxmu
tx.WriteTxMu = &db.writetxmu
}

if opts.Attached {
Expand All @@ -283,6 +294,19 @@ func (db *Database) beginTx(opts *TxOptions) (*Transaction, error) {
return &tx, nil
}

func (db *Database) Catalog() *Catalog {
db.catalogMu.RLock()
c := db.catalog
db.catalogMu.RUnlock()
return c
}

func (db *Database) SetCatalog(c *Catalog) {
db.catalogMu.Lock()
db.catalog = c
db.catalogMu.Unlock()
}

func (db *Database) releaseAttachedTx() {
db.attachedTxMu.Lock()
defer db.attachedTxMu.Unlock()
Expand Down
26 changes: 13 additions & 13 deletions internal/database/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (s *Sequence) key() *tree.Key {
return s.Key
}

func (s *Sequence) Init(tx *Transaction, catalog *Catalog) error {
tb, err := s.GetOrCreateTable(tx, catalog)
func (s *Sequence) Init(tx *Transaction) error {
tb, err := s.GetOrCreateTable(tx)
if err != nil {
return err
}
Expand All @@ -99,7 +99,7 @@ func (s *Sequence) Drop(tx *Transaction, catalog *Catalog) error {
return tb.Delete(k)
}

func (s *Sequence) Next(tx *Transaction, catalog *Catalog) (int64, error) {
func (s *Sequence) Next(tx *Transaction) (int64, error) {
if !tx.Writable {
return 0, errors.New("cannot increment sequence on read-only transaction")
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func (s *Sequence) Next(tx *Transaction, catalog *Catalog) (int64, error) {
}

// store the new lease
err := s.SetLease(tx, catalog, s.Info.Name, newLease)
err := s.SetLease(tx, s.Info.Name, newLease)
if err != nil {
return 0, err
}
Expand All @@ -166,8 +166,8 @@ func (s *Sequence) Next(tx *Transaction, catalog *Catalog) (int64, error) {
return newValue, nil
}

func (s *Sequence) SetLease(tx *Transaction, catalog *Catalog, name string, v int64) error {
tb, err := s.GetOrCreateTable(tx, catalog)
func (s *Sequence) SetLease(tx *Transaction, name string, v int64) error {
tb, err := s.GetOrCreateTable(tx)
if err != nil {
return err
}
Expand All @@ -182,18 +182,18 @@ func (s *Sequence) SetLease(tx *Transaction, catalog *Catalog, name string, v in
return err
}

func (s *Sequence) GetOrCreateTable(tx *Transaction, catalog *Catalog) (*Table, error) {
tb, err := catalog.GetTable(tx, SequenceTableName)
func (s *Sequence) GetOrCreateTable(tx *Transaction) (*Table, error) {
tb, err := tx.Catalog.GetTable(tx, SequenceTableName)
if err == nil || !errs.IsNotFoundError(err) {
return tb, err
}

err = catalog.CreateTable(tx, SequenceTableName, sequenceTableInfo)
err = tx.CatalogWriter().CreateTable(tx, SequenceTableName, sequenceTableInfo)
if err != nil {
return nil, err
}

return catalog.GetTable(tx, SequenceTableName)
return tx.Catalog.GetTable(tx, SequenceTableName)
}

func (s *Sequence) Type() string {
Expand Down Expand Up @@ -221,12 +221,12 @@ func (s *Sequence) GenerateBaseName() string {

// Release the sequence by storing the actual current value to the sequence table.
// If the sequence has cache, the cached value is overwritten.
func (s *Sequence) Release(tx *Transaction, catalog *Catalog) error {
func (s *Sequence) Release(tx *Transaction) error {
if s.CurrentValue == nil {
return nil
}

err := s.SetLease(tx, catalog, s.Info.Name, *s.CurrentValue)
err := s.SetLease(tx, s.Info.Name, *s.CurrentValue)
if err != nil {
return err
}
Expand All @@ -235,7 +235,7 @@ func (s *Sequence) Release(tx *Transaction, catalog *Catalog) error {
return nil
}

func (s *Sequence) Clone() *Sequence {
func (s *Sequence) Clone() Relation {
return &Sequence{
Info: s.Info.Clone(),
CurrentValue: s.CurrentValue,
Expand Down
Loading

0 comments on commit ce7c2a3

Please sign in to comment.