Skip to content

Commit

Permalink
drop DB interface, attempt to fix max conn issues using interpolatePa…
Browse files Browse the repository at this point in the history
…rams
  • Loading branch information
lyoshenka committed Aug 7, 2018
1 parent baba10c commit 0e0b2aa
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 73 deletions.
104 changes: 51 additions & 53 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,27 @@ import (
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/querytools"
"github.com/lbryio/reflector.go/dht/bits"
"github.com/lbryio/reflector.go/types"
// blank import for db driver
_ "github.com/go-sql-driver/mysql"

_ "github.com/go-sql-driver/mysql" // blank import for db driver
log "github.com/sirupsen/logrus"
)

// DB interface communicates to a backend database with a simple set of methods that supports tracking blobs that are
// used together with a BlobStore. The DB tracks pointers and the BlobStore stores the data.
type DB interface {
Connect(string) error
HasBlob(string) (bool, error)
AddBlob(string, int, bool) error
AddSDBlob(string, int, types.SdBlob) error
HasFullStream(string) (bool, error)
// SdBlob is a special blob that contains information on the rest of the blobs in the stream
type SdBlob struct {
StreamName string `json:"stream_name"`
Blobs []struct {
Length int `json:"length"`
BlobNum int `json:"blob_num"`
BlobHash string `json:"blob_hash,omitempty"`
Iv string `json:"iv"`
} `json:"blobs"`
StreamType string `json:"stream_type"`
Key string `json:"key"`
SuggestedFileName string `json:"suggested_file_name"`
StreamHash string `json:"stream_hash"`
}

// SQL is the container for the supporting MySQL database connection.
// SQL implements the DB interface
type SQL struct {
conn *sql.DB
}
Expand All @@ -40,7 +44,10 @@ func logQuery(query string, args ...interface{}) {
// Connect will create a connection to the database
func (s *SQL) Connect(dsn string) error {
var err error
dsn += "?parseTime=1&collation=utf8mb4_unicode_ci"
// interpolateParams is necessary. otherwise uploading a stream with thousands of blobs
// will hit MySQL's max_prepared_stmt_count limit because the prepared statements are all
// opened inside a transaction. closing them manually doesn't seem to help
dsn += "?parseTime=1&collation=utf8mb4_unicode_ci&interpolateParams=1"
s.conn, err = sql.Open("mysql", dsn)
if err != nil {
return errors.Err(err)
Expand All @@ -49,7 +56,7 @@ func (s *SQL) Connect(dsn string) error {
return errors.Err(s.conn.Ping())
}

// AddBlob adds a blobs information to the database.
// AddBlob adds a blob to the database.
func (s *SQL) AddBlob(hash string, length int, isStored bool) error {
if s.conn == nil {
return errors.Err("not connected")
Expand All @@ -65,17 +72,10 @@ func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error {
return errors.Err("length must be positive")
}

query := "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))"
args := []interface{}{hash, isStored, length}

logQuery(query, args...)

stmt, err := tx.Prepare(query)
if err != nil {
return errors.Err(err)
}

_, err = stmt.Exec(args...)
err := execPrepared(tx,
"INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
[]interface{}{hash, isStored, length},
)
if err != nil {
return errors.Err(err)
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (s *SQL) HasFullStream(sdHash string) (bool, error) {
// AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information
// into a stream, and inserts the associated blobs' information in the database. If a blob fails the transaction is
// rolled back and error(s) are returned.
func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) error {
func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
if s.conn == nil {
return errors.Err("not connected")
}
Expand All @@ -200,17 +200,10 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) er
}

// insert stream
query := "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)"
args := []interface{}{sdBlob.StreamHash, sdHash}

logQuery(query, args...)

stmt, err := tx.Prepare(query)
if err != nil {
return errors.Err(err)
}

_, err = stmt.Exec(args...)
err = execPrepared(tx,
"INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)",
[]interface{}{sdBlob.StreamHash, sdHash},
)
if err != nil {
return errors.Err(err)
}
Expand All @@ -227,22 +220,10 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) er
return err
}

query := "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)"
args := []interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum}

logQuery(query, args...)

stmt, err := tx.Prepare(query)
if err != nil {
return errors.Err(err)
}

_, err = stmt.Exec(args...)
if err != nil {
return errors.Err(err)
}

err = stmt.Close()
err = execPrepared(tx,
"INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)",
[]interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum},
)
if err != nil {
return errors.Err(err)
}
Expand Down Expand Up @@ -371,6 +352,23 @@ func closeRows(rows *sql.Rows) {
}
}

func execPrepared(tx *sql.Tx, query string, args []interface{}) error {
logQuery(query, args...)

stmt, err := tx.Prepare(query)
if err != nil {
return errors.Err(err)
}

_, err = stmt.Exec(args...)
if err != nil {
return errors.Err(err)
}

err = stmt.Close()
return errors.Err(err)
}

/* SQL schema
CREATE TABLE blob_ (
Expand Down
7 changes: 3 additions & 4 deletions store/dbbacked.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ import (

"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/types"
)

// DBBackedS3Store is an instance of an S3 Store that is backed by a DB for what is stored.
type DBBackedS3Store struct {
s3 *S3BlobStore
db db.DB
db *db.SQL
}

// NewDBBackedS3Store returns an initialized store pointer.
func NewDBBackedS3Store(s3 *S3BlobStore, db db.DB) *DBBackedS3Store {
func NewDBBackedS3Store(s3 *S3BlobStore, db *db.SQL) *DBBackedS3Store {
return &DBBackedS3Store{s3: s3, db: db}
}

Expand All @@ -42,7 +41,7 @@ func (d *DBBackedS3Store) Put(hash string, blob []byte) error {
// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if
// there is an error storing the blob information in the DB.
func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error {
var blobContents types.SdBlob
var blobContents db.SdBlob
err := json.Unmarshal(blob, &blobContents)
if err != nil {
return err
Expand Down
16 changes: 0 additions & 16 deletions types/types.go

This file was deleted.

0 comments on commit 0e0b2aa

Please sign in to comment.