Skip to content

Commit

Permalink
Refactor transaction into batch and snapshot (#2182)
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann authored Oct 3, 2024
1 parent 0c0700c commit 262c51f
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 150 deletions.
3 changes: 0 additions & 3 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ type Transaction interface {
// Get fetches the value for the given key, should return ErrKeyNotFound if key is not present
// Caller should not assume that the slice would stay valid after the call to cb
Get(key []byte, cb func([]byte) error) error

// Impl returns the underlying transaction object
Impl() any
}

// View : see db.DB.View
Expand Down
105 changes: 105 additions & 0 deletions db/pebble/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package pebble

import (
"errors"
"sync"
"time"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/utils"
"github.com/cockroachdb/pebble"
)

var _ db.Transaction = (*batch)(nil)

type batch struct {
batch *pebble.Batch
lock *sync.Mutex
listener db.EventListener
}

func NewBatch(dbBatch *pebble.Batch, lock *sync.Mutex, listener db.EventListener) *batch {
return &batch{
batch: dbBatch,
lock: lock,
listener: listener,
}
}

// Discard : see db.Transaction.Discard
func (b *batch) Discard() error {
if b.batch == nil {
return nil
}

err := b.batch.Close()
b.batch = nil
b.lock.Unlock()
b.lock = nil

return err
}

// Commit : see db.Transaction.Commit
func (b *batch) Commit() error {
if b.batch == nil {
return ErrDiscardedTransaction
}

start := time.Now()
defer func() { b.listener.OnCommit(time.Since(start)) }()
return utils.RunAndWrapOnError(b.Discard, b.batch.Commit(pebble.Sync))
}

// Set : see db.Transaction.Set
func (b *batch) Set(key, val []byte) error {
start := time.Now()
if len(key) == 0 {
return errors.New("empty key")
}

if b.batch == nil {
return ErrDiscardedTransaction
}

defer func() { b.listener.OnIO(true, time.Since(start)) }()

return b.batch.Set(key, val, pebble.Sync)
}

// Delete : see db.Transaction.Delete
func (b *batch) Delete(key []byte) error {
if b.batch == nil {
return ErrDiscardedTransaction
}

start := time.Now()
defer func() { b.listener.OnIO(true, time.Since(start)) }()

return b.batch.Delete(key, pebble.Sync)
}

// Get : see db.Transaction.Get
func (b *batch) Get(key []byte, cb func([]byte) error) error {
if b.batch == nil {
return ErrDiscardedTransaction
}
return get(b.batch, key, cb, b.listener)
}

// NewIterator : see db.Transaction.NewIterator
func (b *batch) NewIterator() (db.Iterator, error) {
var iter *pebble.Iterator
var err error

if b.batch == nil {
return nil, ErrDiscardedTransaction
}

iter, err = b.batch.NewIter(nil)
if err != nil {
return nil, err
}

return &iterator{iter: iter}, nil
}
34 changes: 34 additions & 0 deletions db/pebble/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package pebble

import (
"errors"
"io"
"time"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/utils"
"github.com/cockroachdb/pebble"
)

type getter interface {
Get([]byte) ([]byte, io.Closer, error)
}

func get(g getter, key []byte, cb func([]byte) error, listener db.EventListener) error {
start := time.Now()
var val []byte
var closer io.Closer

val, closer, err := g.Get(key)

// We need it evaluated immediately so the duration doesn't include the runtime of the user callback that we call below.
defer listener.OnIO(false, time.Since(start)) //nolint:govet
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return db.ErrKeyNotFound
}
return err
}

return utils.RunAndWrapOnError(closer.Close, cb(val))
}
16 changes: 8 additions & 8 deletions db/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pebble

import (
"context"
"errors"
"fmt"
"sync"
"testing"
Expand All @@ -18,6 +19,10 @@ const (
minCacheSizeMB = 8
)

var (
ErrDiscardedTransaction = errors.New("discarded transaction")
ErrReadOnlyTransaction = errors.New("read-only transaction")
)
var _ db.DB = (*DB)(nil)

type DB struct {
Expand Down Expand Up @@ -83,19 +88,14 @@ func (d *DB) WithListener(listener db.EventListener) db.DB {
}

// NewTransaction : see db.DB.NewTransaction
// Batch is used for read-write operations, while snapshot is used for read-only operations
func (d *DB) NewTransaction(update bool) (db.Transaction, error) {
txn := &Transaction{
listener: d.listener,
}
if update {
d.wMutex.Lock()
txn.lock = d.wMutex
txn.batch = d.pebble.NewIndexedBatch()
} else {
txn.snapshot = d.pebble.NewSnapshot()
return NewBatch(d.pebble.NewIndexedBatch(), d.wMutex, d.listener), nil
}

return txn, nil
return NewSnapshot(d.pebble.NewSnapshot(), d.listener), nil
}

// Close : see io.Closer.Close
Expand Down
2 changes: 1 addition & 1 deletion db/pebble/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func TestPanic(t *testing.T) {
require.ErrorIs(t, testDB.View(func(txn db.Transaction) error {
return txn.Get([]byte{0}, func(b []byte) error { return nil })
}), db.ErrKeyNotFound)
require.EqualError(t, panicingTxn.Get([]byte{0}, func(b []byte) error { return nil }), "discarded txn")
require.EqualError(t, panicingTxn.Get([]byte{0}, func(b []byte) error { return nil }), "discarded transaction")
}()

require.NoError(t, testDB.Update(func(txn db.Transaction) error {
Expand Down
75 changes: 75 additions & 0 deletions db/pebble/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package pebble

import (
"github.com/NethermindEth/juno/db"
"github.com/cockroachdb/pebble"
)

var _ db.Transaction = (*snapshot)(nil)

type snapshot struct {
snapshot *pebble.Snapshot
listener db.EventListener
}

func NewSnapshot(dbSnapshot *pebble.Snapshot, listener db.EventListener) *snapshot {
return &snapshot{
snapshot: dbSnapshot,
listener: listener,
}
}

// Discard : see db.Transaction.Discard
func (s *snapshot) Discard() error {
if s.snapshot == nil {
return nil
}

if err := s.snapshot.Close(); err != nil {
return err
}

s.snapshot = nil

return nil
}

// Commit : see db.Transaction.Commit
func (s *snapshot) Commit() error {
return ErrReadOnlyTransaction
}

// Set : see db.Transaction.Set
func (s *snapshot) Set(key, val []byte) error {
return ErrReadOnlyTransaction
}

// Delete : see db.Transaction.Delete
func (s *snapshot) Delete(key []byte) error {
return ErrReadOnlyTransaction
}

// Get : see db.Transaction.Get
func (s *snapshot) Get(key []byte, cb func([]byte) error) error {
if s.snapshot == nil {
return ErrDiscardedTransaction
}
return get(s.snapshot, key, cb, s.listener)
}

// NewIterator : see db.Transaction.NewIterator
func (s *snapshot) NewIterator() (db.Iterator, error) {
var iter *pebble.Iterator
var err error

if s.snapshot == nil {
return nil, ErrDiscardedTransaction
}

iter, err = s.snapshot.NewIter(nil)
if err != nil {
return nil, err
}

return &iterator{iter: iter}, nil
}
Loading

0 comments on commit 262c51f

Please sign in to comment.