Skip to content

Commit

Permalink
Refactoring: pg to bun
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Oct 14, 2023
1 parent a56ae14 commit 2f69013
Show file tree
Hide file tree
Showing 54 changed files with 935 additions and 946 deletions.
4 changes: 3 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ linters:
- ineffassign
- containedctx
- tenv
- musttag
- musttag
- mirror
- tagalign
9 changes: 3 additions & 6 deletions build/dipdup.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ mempool:
- smart_rollup_cement
datasources:
tzkt: ghostnet_tzkt
rpc:
- ghostnet_rpc
rpc: ghostnet_rpc

nairobinet:
filters:
Expand Down Expand Up @@ -92,8 +91,7 @@ mempool:
- smart_rollup_cement
datasources:
tzkt: nairobinet_tzkt
rpc:
- nairobinet_rpc
rpc: nairobinet_rpc

# oxfordnet:
# filters:
Expand Down Expand Up @@ -136,8 +134,7 @@ mempool:
# - smart_rollup_cement
# datasources:
# tzkt: oxfordnet_tzkt
# rpc:
# - oxfordnet_rpc
# rpc: oxfordnet_rpc

database:
kind: postgres
Expand Down
8 changes: 5 additions & 3 deletions cmd/mempool/bakers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
const unknownBaker = "unknown"

func (indexer *Indexer) setEndorsementBakers(ctx context.Context) {
defer indexer.wg.Done()

indexer.info().Msg("Thread for finding endorsement baker started")

for {
Expand All @@ -35,7 +33,11 @@ func (indexer *Indexer) setEndorsementBakers(ctx context.Context) {
} else {
endorsement.Baker = unknownBaker
}
if _, err := indexer.db.DB().Model(endorsement).WherePK().Update("baker", endorsement.Baker); err != nil {
if _, err := indexer.db.DB().NewUpdate().
Model(endorsement).
WherePK().
Set("baker = ?", endorsement.Baker).
Exec(ctx); err != nil {
log.Err(err).Msg("set baker to endorsement")
}
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/mempool/block_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ func fromMessage(block tzkt.BlockMessage) Block {
type BlockQueue struct {
queue []Block
levels map[string]uint64
onPop func(block Block) error
onPop func(ctx context.Context, block Block) error
onRollback func(ctx context.Context, block Block) error
capacity uint64
}

func newBlockQueue(capacity uint64, onPop func(block Block) error, onRollback func(ctx context.Context, block Block) error) *BlockQueue {
func newBlockQueue(capacity uint64, onPop func(ctx context.Context, block Block) error, onRollback func(ctx context.Context, block Block) error) *BlockQueue {
if capacity == 0 {
capacity = 60
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func (bq *BlockQueue) Add(ctx context.Context, block tzkt.BlockMessage) error {
item := bq.queue[0]
bq.queue = bq.queue[1:]
if bq.onPop != nil {
if err := bq.onPop(item); err != nil {
if err := bq.onPop(ctx, item); err != nil {
return err
}
}
Expand Down
26 changes: 14 additions & 12 deletions cmd/mempool/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"
"time"

"github.com/dipdup-io/workerpool"
)

// Cache -
Expand All @@ -13,7 +15,7 @@ type Cache struct {
ticker *time.Ticker
ttl time.Duration

wg sync.WaitGroup
g workerpool.Group
}

// NewCache -
Expand All @@ -22,6 +24,7 @@ func NewCache(ttl time.Duration) *Cache {
lookup: make(map[string]int64),
ttl: ttl,
ticker: time.NewTicker(time.Minute),
g: workerpool.NewGroup(),
}
}

Expand All @@ -43,31 +46,30 @@ func (c *Cache) Set(key string) {

// Start -
func (c *Cache) Start(ctx context.Context) {
c.wg.Add(1)
go c.checkExpiration(ctx)
c.g.GoCtx(ctx, c.checkExpiration)
}

func (c *Cache) checkExpiration(ctx context.Context) {
defer c.wg.Done()
func (c *Cache) Close() error {
c.g.Wait()
c.ticker.Stop()
return nil
}

func (c *Cache) checkExpiration(ctx context.Context) {
for {
select {
case <-ctx.Done():
c.ticker.Stop()
return

case <-c.ticker.C:
c.mux.RLock()
c.mux.Lock()
for key, expiration := range c.lookup {
if time.Now().UnixNano() <= expiration {
continue
}
c.mux.RUnlock()
c.mux.Lock()
delete(c.lookup, key)
c.mux.Unlock()
c.mux.RLock()
}
c.mux.RUnlock()
c.mux.Unlock()
}
}
}
35 changes: 17 additions & 18 deletions cmd/mempool/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,26 @@ import (
// Config
type Config struct {
config.Config `yaml:",inline"`
Mempool Mempool `yaml:"mempool" validate:"required"`
Mempool Mempool `validate:"required" yaml:"mempool"`
Profiler *profiler.Config `yaml:"profiler,omitempty"`
}

// Mempool -
type Mempool struct {
Indexers map[string]*Indexer `yaml:"indexers" validate:"required"`
Settings Settings `yaml:"settings" validate:"required"`
Indexers map[string]*Indexer `validate:"required" yaml:"indexers"`
Settings Settings `validate:"required" yaml:"settings"`
}

// Indexer -
type Indexer struct {
Filters Filters `yaml:"filters" validate:"required"`
DataSource MempoolDataSource `yaml:"datasources" validate:"required"`
Filters Filters `validate:"required" yaml:"filters"`
DataSource MempoolDataSource `validate:"required" yaml:"datasources"`
}

// Filters -
type Filters struct {
Accounts []*config.Alias[config.Contract] `yaml:"accounts" validate:"max=50"`
Kinds []string `yaml:"kinds" validate:"required,min=1,dive,oneof=activate_account ballot delegation double_baking_evidence double_endorsement_evidence endorsement endorsement_with_slot origination proposals reveal seed_nonce_revelation transaction register_global_constant"`
Accounts []*config.Alias[config.Contract] `validate:"max=50" yaml:"accounts"`
Kinds []string `validate:"required,min=1,dive,oneof=activate_account ballot delegation double_baking_evidence double_endorsement_evidence endorsement endorsement_with_slot origination proposals reveal seed_nonce_revelation transaction register_global_constant" yaml:"kinds"`
}

// Addresses -
Expand All @@ -41,23 +41,22 @@ func (f Filters) Addresses() []string {

// MempoolDataSource -
type MempoolDataSource struct {
Tzkt *config.Alias[config.DataSource] `yaml:"tzkt" validate:"required,url"`
RPC []*config.Alias[config.DataSource] `yaml:"rpc" validate:"required,min=1,dive,url"`
Tzkt *config.Alias[config.DataSource] `validate:"required,url" yaml:"tzkt"`
RPC *config.Alias[config.DataSource] `validate:"required,min=1,dive,url" yaml:"rpc"`
}

// URLs -
func (ds MempoolDataSource) URLs() []string {
urls := make([]string, 0)
for i := range ds.RPC {
urls = append(urls, ds.RPC[i].Struct().URL)
func (ds MempoolDataSource) URL() string {
if ds.RPC == nil {
return ""
}
return urls
return ds.RPC.Struct().URL
}

// Settings -
type Settings struct {
KeepOperations uint64 `yaml:"keep_operations_seconds" validate:"required,min=1"`
ExpiredAfter uint64 `yaml:"expired_after_blocks" validate:"required,min=1"`
KeepInChainBlocks uint64 `yaml:"keep_in_chain_blocks" validate:"required,min=1"`
GasStatsLifetime uint64 `yaml:"gas_stats_lifetime" validate:"required,min=1"`
KeepOperations uint64 `validate:"required,min=1" yaml:"keep_operations_seconds"`
ExpiredAfter uint64 `validate:"required,min=1" yaml:"expired_after_blocks"`
KeepInChainBlocks uint64 `validate:"required,min=1" yaml:"keep_in_chain_blocks"`
GasStatsLifetime uint64 `validate:"required,min=1" yaml:"gas_stats_lifetime"`
}
17 changes: 8 additions & 9 deletions cmd/mempool/config/substitutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ func substituteDataSources(c *Config, dataSource *MempoolDataSource) error {
dataSource.Tzkt.SetStruct(source)
}

for i, link := range dataSource.RPC {
source, ok := c.DataSources[link.Name()]
if !ok {
continue
}
if source.Kind != DataSourceKindNode {
return errors.Errorf("Invalid RPC data source kind. Expected `tezos-node`, got `%s`", source.Kind)
}
dataSource.RPC[i].SetStruct(source)
source, ok := c.DataSources[dataSource.RPC.Name()]
if !ok {
return errors.Errorf("invalid rpc datasource: %s", dataSource.RPC.Name())
}
if source.Kind != DataSourceKindNode {
return errors.Errorf("Invalid RPC data source kind. Expected `tezos-node`, got `%s`", source.Kind)
}
dataSource.RPC.SetStruct(source)

return nil
}
Loading

0 comments on commit 2f69013

Please sign in to comment.