Skip to content

Commit

Permalink
core: add engine package
Browse files Browse the repository at this point in the history
  • Loading branch information
asdine committed Dec 25, 2023
1 parent 69da8a8 commit 372aa7b
Show file tree
Hide file tree
Showing 16 changed files with 140 additions and 118 deletions.
4 changes: 3 additions & 1 deletion cmd/chai/commands/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"github.com/chaisql/chai/cmd/chai/dbutil"
"github.com/chaisql/chai/internal/kv"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -35,7 +36,8 @@ func NewPebbleCommand() *cli.Command {
}
defer db.Close()

return dbutil.DumpPebble(c.Context, db.DB.Store.DB(), dbutil.DumpPebbleOptions{
ng := db.DB.Engine.(*kv.PebbleEngine)
return dbutil.DumpPebble(c.Context, ng.DB(), dbutil.DumpPebbleOptions{
KeysOnly: c.Bool("keys-only"),
})
}
Expand Down
21 changes: 11 additions & 10 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync/atomic"
"time"

"github.com/chaisql/chai/internal/engine"
"github.com/chaisql/chai/internal/kv"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -41,7 +42,7 @@ type Database struct {
closeOnce sync.Once

// Underlying kv store.
Store *kv.Store
Engine engine.Engine
}

// Options are passed to Open to control
Expand All @@ -54,7 +55,7 @@ type Options struct {
// It may parse a SQL representation of the catalog
// and return a Catalog that represents all entities stored on disk.
type CatalogLoader interface {
LoadCatalog(kv.Session) (*Catalog, error)
LoadCatalog(engine.Session) (*Catalog, error)
}

// TxOptions are passed to Begin to configure transactions.
Expand All @@ -78,18 +79,18 @@ func Open(path string, opts *Options) (*Database, error) {
}

db := Database{
Store: store,
Engine: store,
}

// ensure the rollback segment doesn't contain any data that needs to be rolled back
// due to a previous crash.
err = db.Store.ResetRollbackSegment()
err = db.Engine.Recover()
if err != nil {
return nil, err
}

// clean up the transient namespaces
err = db.Store.CleanupTransientNamespaces()
err = db.Engine.CleanupTransientNamespaces()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +168,7 @@ func (db *Database) closeDatabase() error {
return err
}

return db.Store.Close()
return db.Engine.Close()
}

// GetAttachedTx returns the transaction attached to the database. It returns nil if there is no
Expand Down Expand Up @@ -222,16 +223,16 @@ func (db *Database) beginTx(opts *TxOptions) (*Transaction, error) {
opts = &TxOptions{}
}

var sess kv.Session
var sess engine.Session
if opts.ReadOnly {
sess = db.Store.NewSnapshotSession()
sess = db.Engine.NewSnapshotSession()
} else {
sess = db.Store.NewBatchSession()
sess = db.Engine.NewBatchSession()
}

tx := Transaction{
db: db,
Store: db.Store,
Engine: db.Engine,
Session: sess,
Writable: !opts.ReadOnly,
ID: atomic.AddUint64(&db.TransactionIDs, 1),
Expand Down
4 changes: 2 additions & 2 deletions internal/database/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"fmt"

"github.com/chaisql/chai/internal/kv"
"github.com/chaisql/chai/internal/engine"
"github.com/chaisql/chai/internal/tree"
"github.com/chaisql/chai/internal/types"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -122,7 +122,7 @@ func (idx *Index) Delete(vs []types.Value, key []byte) error {
return err
}

return errors.WithStack(kv.ErrKeyNotFound)
return errors.WithStack(engine.ErrKeyNotFound)
}

func (idx *Index) IterateOnRange(rng *tree.Range, reverse bool, fn func(key *tree.Key) error) error {
Expand Down
8 changes: 4 additions & 4 deletions internal/database/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package database
import (
"fmt"

"github.com/chaisql/chai/internal/engine"
errs "github.com/chaisql/chai/internal/errors"
"github.com/chaisql/chai/internal/kv"
"github.com/chaisql/chai/internal/object"
"github.com/chaisql/chai/internal/tree"
"github.com/chaisql/chai/internal/types"
Expand Down Expand Up @@ -55,7 +55,7 @@ func (t *Table) Insert(o types.Object) (*tree.Key, Row, error) {
err = t.Tree.Put(key, enc)
}
if err != nil {
if errors.Is(err, kv.ErrKeyAlreadyExists) {
if errors.Is(err, engine.ErrKeyAlreadyExists) {
return nil, nil, &ConstraintViolationError{
Constraint: "PRIMARY KEY",
Paths: t.Info.PrimaryKey.Paths,
Expand Down Expand Up @@ -95,7 +95,7 @@ func (t *Table) Delete(key *tree.Key) error {
}

err := t.Tree.Delete(key)
if errors.Is(err, kv.ErrKeyNotFound) {
if errors.Is(err, engine.ErrKeyNotFound) {
return errors.WithStack(errs.NewNotFoundError(key.String()))
}

Expand Down Expand Up @@ -178,7 +178,7 @@ func (t *Table) IterateOnRange(rng *Range, reverse bool, fn func(key *tree.Key,
func (t *Table) GetRow(key *tree.Key) (Row, error) {
enc, err := t.Tree.Get(key)
if err != nil {
if errors.Is(err, kv.ErrKeyNotFound) {
if errors.Is(err, engine.ErrKeyNotFound) {
return nil, errors.WithStack(errs.NewNotFoundError(key.String()))
}
return nil, fmt.Errorf("failed to fetch row %q: %w", key, err)
Expand Down
8 changes: 4 additions & 4 deletions internal/database/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"sync"
"time"

"github.com/chaisql/chai/internal/kv"
"github.com/chaisql/chai/internal/engine"
"github.com/cockroachdb/errors"
)

Expand All @@ -19,8 +19,8 @@ type Transaction struct {
// The timestamp must use the local timezone.
TxStart time.Time

Session kv.Session
Store *kv.Store
Session engine.Session
Engine engine.Engine
ID uint64
Writable bool
WriteTxMu *sync.Mutex
Expand All @@ -41,7 +41,7 @@ func (tx *Transaction) Rollback() error {
}

if tx.Writable {
err = tx.Store.Rollback()
err = tx.Engine.Rollback()
if err != nil {
return err
}
Expand Down
66 changes: 66 additions & 0 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package engine

import "github.com/cockroachdb/errors"

// Common errors returned by the engine.
var (
// ErrKeyNotFound is returned when the targeted key doesn't exist.
ErrKeyNotFound = errors.New("key not found")

// ErrKeyAlreadyExists is returned when the targeted key already exists.
ErrKeyAlreadyExists = errors.New("key already exists")
)

type Engine interface {
Close() error
Rollback() error
Recover() error
LockSharedSnapshot()
UnlockSharedSnapshot()
CleanupTransientNamespaces() error
NewSnapshotSession() Session
NewBatchSession() Session
NewTransientSession() Session
}

type Session interface {
Commit() error
Close() error
// Insert inserts a key-value pair. If it already exists, it returns ErrKeyAlreadyExists.
Insert(k, v []byte) error
// Put stores a key-value pair. If it already exists, it overrides it.
Put(k, v []byte) error
// Get returns a value associated with the given key. If not found, returns ErrKeyNotFound.
Get(k []byte) ([]byte, error)
// Exists returns whether a key exists and is visible by the current session.
Exists(k []byte) (bool, error)
// Delete a record by key. If not found, returns ErrKeyNotFound.
Delete(k []byte) error
DeleteRange(start []byte, end []byte) error
Iterator(opts *IterOptions) (Iterator, error)
}

type Iterator interface {
Close() error
First() bool
Last() bool
Valid() bool
Next() bool
Prev() bool
Error() error
Key() []byte
Value() ([]byte, error)
}

type IterOptions struct {
// LowerBound specifies the smallest key (inclusive) that the iterator will
// return during iteration. If the iterator is seeked or iterated past this
// boundary the iterator will return Valid()==false. Setting LowerBound
// effectively truncates the key space visible to the iterator.
LowerBound []byte
// UpperBound specifies the largest key (exclusive) that the iterator will
// return during iteration. If the iterator is seeked or iterated past this
// boundary the iterator will return Valid()==false. Setting UpperBound
// effectively truncates the key space visible to the iterator.
UpperBound []byte
}
13 changes: 7 additions & 6 deletions internal/kv/batch.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package kv

import (
"github.com/chaisql/chai/internal/engine"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)

var _ Session = (*BatchSession)(nil)
var _ engine.Session = (*BatchSession)(nil)

var (
tombStone = []byte{0}
)

type BatchSession struct {
Store *Store
Store *PebbleEngine
DB *pebble.DB
Batch *pebble.Batch
closed bool
Expand All @@ -21,7 +22,7 @@ type BatchSession struct {
keys map[string]struct{}
}

func (s *Store) NewBatchSession() *BatchSession {
func (s *PebbleEngine) NewBatchSession() engine.Session {
// before creating a batch session, create a shared snapshot
// at this point-in-time.
s.LockSharedSnapshot()
Expand Down Expand Up @@ -142,7 +143,7 @@ func (s *BatchSession) Insert(k, v []byte) error {
return err
}
if ok {
return ErrKeyAlreadyExists
return engine.ErrKeyAlreadyExists
}

s.keys[string(k)] = struct{}{}
Expand Down Expand Up @@ -190,7 +191,7 @@ func (s *BatchSession) Delete(k []byte) error {
// DeleteRange deletes all keys in the given range.
// This implementation deletes all keys one by one to simplify the rollback.
func (s *BatchSession) DeleteRange(start []byte, end []byte) error {
it, err := s.Iterator(&IterOptions{
it, err := s.Iterator(&engine.IterOptions{
LowerBound: start,
UpperBound: end,
})
Expand All @@ -209,7 +210,7 @@ func (s *BatchSession) DeleteRange(start []byte, end []byte) error {
return nil
}

func (s *BatchSession) Iterator(opts *IterOptions) (Iterator, error) {
func (s *BatchSession) Iterator(opts *engine.IterOptions) (engine.Iterator, error) {
err := s.applyBatch()
if err != nil {
return nil, err
Expand Down
24 changes: 12 additions & 12 deletions internal/kv/store.go → internal/kv/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
defaultMaxTransientBatchSize int = 1 << 19 // 512KB
)

type Store struct {
type PebbleEngine struct {
db *pebble.DB
opts Options
rollbackSegment *RollbackSegment
Expand All @@ -47,7 +47,7 @@ type Options struct {
MaxTransientNamespace uint64
}

func NewEngineWith(path string, opts Options, popts *pebble.Options) (*Store, error) {
func NewEngineWith(path string, opts Options, popts *pebble.Options) (*PebbleEngine, error) {
if popts == nil {
popts = &pebble.Options{}
}
Expand All @@ -68,7 +68,7 @@ func NewEngineWith(path string, opts Options, popts *pebble.Options) (*Store, er
return NewStore(db, opts), nil
}

func NewEngine(path string, opts Options) (*Store, error) {
func NewEngine(path string, opts Options) (*PebbleEngine, error) {
var popts pebble.Options
var pbpath string

Expand Down Expand Up @@ -116,7 +116,7 @@ var DefaultComparer = &pebble.Comparer{
Name: "leveldb.BytewiseComparator",
}

func NewStore(db *pebble.DB, opts Options) *Store {
func NewStore(db *pebble.DB, opts Options) *PebbleEngine {
if opts.MaxBatchSize <= 0 {
opts.MaxBatchSize = defaultMaxBatchSize
}
Expand All @@ -130,26 +130,26 @@ func NewStore(db *pebble.DB, opts Options) *Store {
panic("max transient namespace cannot be 0")
}

return &Store{
return &PebbleEngine{
db: db,
opts: opts,
rollbackSegment: NewRollbackSegment(db, opts.RollbackSegmentNamespace),
}
}

func (s *Store) Close() error {
func (s *PebbleEngine) Close() error {
return s.db.Close()
}

func (s *Store) Rollback() error {
func (s *PebbleEngine) Rollback() error {
return s.rollbackSegment.Rollback()
}

func (s *Store) ResetRollbackSegment() error {
func (s *PebbleEngine) Recover() error {
return s.rollbackSegment.Reset()
}

func (s *Store) LockSharedSnapshot() {
func (s *PebbleEngine) LockSharedSnapshot() {
s.sharedSnapshot.Lock()
s.sharedSnapshot.snapshot = &snapshot{
snapshot: s.db.NewSnapshot(),
Expand All @@ -159,18 +159,18 @@ func (s *Store) LockSharedSnapshot() {
s.sharedSnapshot.Unlock()
}

func (s *Store) UnlockSharedSnapshot() {
func (s *PebbleEngine) UnlockSharedSnapshot() {
s.sharedSnapshot.Lock()
s.sharedSnapshot.snapshot.Done()
s.sharedSnapshot.snapshot = nil
s.sharedSnapshot.Unlock()
}

func (s *Store) DB() *pebble.DB {
func (s *PebbleEngine) DB() *pebble.DB {
return s.db
}

func (s *Store) CleanupTransientNamespaces() error {
func (s *PebbleEngine) CleanupTransientNamespaces() error {
return s.db.DeleteRange(
encoding.EncodeUint(nil, uint64(s.minTransientNamespace)),
encoding.EncodeUint(nil, uint64(s.maxTransientNamespace)),
Expand Down
Loading

0 comments on commit 372aa7b

Please sign in to comment.