Skip to content

Commit

Permalink
Consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
mgdigital committed Mar 2, 2025
1 parent 317bc39 commit 7471f9e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 13 deletions.
2 changes: 0 additions & 2 deletions internal/blocking/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package blocking

import (
"context"
"github.com/bitmagnet-io/bitmagnet/internal/bloom"
"github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy"
"github.com/bitmagnet-io/bitmagnet/internal/protocol"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -32,7 +31,6 @@ func New(params Params) Result {
params.PgxPoolWait.Add(1)
return &manager{
pool: pool,
filter: bloom.NewDefaultStableBloomFilter(),
buffer: make(map[protocol.ID]struct{}, 1000),
maxBufferSize: 1000,
maxFlushWait: time.Minute * 5,
Expand Down
22 changes: 11 additions & 11 deletions internal/blocking/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ func (m *manager) flush(ctx context.Context) error {
}()

if len(hashes) > 0 {
_, err = tx.Exec(ctx, "delete from torrents where info_hash = any($1)", hashes)
_, err = tx.Exec(ctx, "DELETE FROM torrents WHERE info_hash = any($1)", hashes)
if err != nil {
return fmt.Errorf("error deleting from torrents table: %w", err)
return fmt.Errorf("failed to delete from torrents table: %w", err)
}
}

Expand All @@ -113,24 +113,24 @@ func (m *manager) flush(ctx context.Context) error {
oid = uint32(nullOid.Int32)
obj, err := lobs.Open(ctx, oid, pgx.LargeObjectModeRead)
if err != nil {
return fmt.Errorf("error opening large object for reading: %w", err)
return fmt.Errorf("failed to open large object for reading: %w", err)
}
_, err = bf.ReadFrom(obj)
obj.Close()
if err != nil {
return fmt.Errorf("error reading current bloom filter: %w", err)
return fmt.Errorf("failed to read current bloom filter: %w", err)
}
}
} else if !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("error getting bloom filter object ID: %w", err)
return fmt.Errorf("failed to get bloom filter object ID: %w", err)
}

if oid == 0 {
// Create a new Large Object.
// We pass 0, so the DB can pick an available oid for us.
oid, err = lobs.Create(ctx, 0)
if err != nil {
return fmt.Errorf("error creating large object: %w", err)
return fmt.Errorf("failed to create large object: %w", err)
}
}

Expand All @@ -140,30 +140,30 @@ func (m *manager) flush(ctx context.Context) error {

obj, err := lobs.Open(ctx, oid, pgx.LargeObjectModeWrite)
if err != nil {
return fmt.Errorf("error opening large object for writing: %w", err)
return fmt.Errorf("failed to open large object for writing: %w", err)
}

_, err = bf.WriteTo(obj)
if err != nil {
return fmt.Errorf("error writing to large object: %w", err)
return fmt.Errorf("failed to write to large object: %w", err)
}

now := time.Now()
if !found {
_, err = tx.Exec(ctx, "INSERT INTO bloom_filters (key, oid, created_at, updated_at) VALUES ($1, $2, $3, $4)", blockedTorrentsBloomFilterKey, oid, now, now)
if err != nil {
return fmt.Errorf("error saving new bloom filter record: %w", err)
return fmt.Errorf("failed to save new bloom filter record: %w", err)
}
} else if !nullOid.Valid {
_, err = tx.Exec(ctx, "UPDATE bloom_filters SET oid = $1, updated_at = $2 WHERE key = $3", oid, now, blockedTorrentsBloomFilterKey)
if err != nil {
return fmt.Errorf("error updating bloom filter record: %w", err)
return fmt.Errorf("failed to update bloom filter record: %w", err)
}
}

err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("error committing transaction: %w", err)
return fmt.Errorf("failed to commit transaction: %w", err)
}

m.buffer = make(map[protocol.ID]struct{})
Expand Down

0 comments on commit 7471f9e

Please sign in to comment.