Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging to release-1.11: TT-13421 create indexes on sharded sql pumps (#861) #863

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ type AnalyticsRecord struct {
Month time.Month `json:"month" sql:"-"`
Year int `json:"year" sql:"-"`
Hour int `json:"hour" sql:"-"`
ResponseCode int `json:"response_code" gorm:"column:responsecode;index"`
APIKey string `json:"api_key" gorm:"column:apikey;index"`
TimeStamp time.Time `json:"timestamp" gorm:"column:timestamp;index"`
ResponseCode int `json:"response_code" gorm:"column:responsecode"`
APIKey string `json:"api_key" gorm:"column:apikey"`
TimeStamp time.Time `json:"timestamp" gorm:"column:timestamp"`
APIVersion string `json:"api_version" gorm:"column:apiversion"`
APIName string `json:"api_name" sql:"-"`
APIID string `json:"api_id" gorm:"column:apiid;index"`
OrgID string `json:"org_id" gorm:"column:orgid;index"`
OauthID string `json:"oauth_id" gorm:"column:oauthid;index"`
APIID string `json:"api_id" gorm:"column:apiid"`
OrgID string `json:"org_id" gorm:"column:orgid"`
OauthID string `json:"oauth_id" gorm:"column:oauthid"`
RequestTime int64 `json:"request_time" gorm:"column:requesttime"`
RawRequest string `json:"raw_request" gorm:"column:rawrequest"`
RawResponse string `json:"raw_response" gorm:"column:rawresponse"`
Expand Down
115 changes: 113 additions & 2 deletions pumps/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"encoding/hex"
"errors"
"fmt"
"sync"

"github.com/sirupsen/logrus"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -45,6 +48,9 @@ type SQLPump struct {
db *gorm.DB
dbType string
dialect gorm.Dialector

// this channel is used to signal that the background index creation has finished - this is used for testing
backgroundIndexCreated chan bool
}

// @PumpConf SQL
Expand Down Expand Up @@ -107,6 +113,18 @@ var (
SQLPrefix = "SQL-pump"
SQLDefaultENV = PUMPS_ENV_PREFIX + "_SQL" + PUMPS_ENV_META_PREFIX
SQLDefaultQueryBatchSize = 1000

indexes = []struct {
baseName string
column string
}{
{"idx_responsecode", "responsecode"},
{"idx_apikey", "apikey"},
{"idx_timestamp", "timestamp"},
{"idx_apiid", "apiid"},
{"idx_orgid", "orgid"},
{"idx_oauthid", "oauthid"},
}
)

func (c *SQLPump) New() Pump {
Expand Down Expand Up @@ -231,8 +249,8 @@ func (c *SQLPump) WriteData(ctx context.Context, data []interface{}) error {

table := analytics.SQLTable + "_" + recDate
c.db = c.db.Table(table)
if !c.db.Migrator().HasTable(table) {
c.db.AutoMigrate(&analytics.AnalyticsRecord{})
if errTable := c.ensureTable(table); errTable != nil {
return errTable
}
} else {
i = dataLen // write all records at once for non-sharded case, stop for loop after 1 iteration
Expand Down Expand Up @@ -354,3 +372,96 @@ func (c *SQLPump) WriteUptimeData(data []interface{}) {

c.log.Debug("Purged ", len(data), " records...")
}

func (c *SQLPump) buildIndexName(indexBaseName, tableName string) string {
return fmt.Sprintf("%s_%s", tableName, indexBaseName)
}

func (c *SQLPump) createIndex(indexBaseName, tableName, column string) error {
indexName := c.buildIndexName(indexBaseName, tableName)
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

columnExist := c.db.Migrator().HasColumn(&analytics.AnalyticsRecord{}, column)
if !columnExist {
return errors.New("cannot create index for non existent column " + column)
}

query := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, column)
err := c.db.Exec(query).Error
if err != nil {
c.log.WithFields(logrus.Fields{
"index": indexName,
"table": tableName,
}).WithError(err).Error("Error creating index")
return err
}

c.log.Infof("Index %s created for table %s", indexName, tableName)
c.log.WithFields(logrus.Fields{
"index": indexName,
"table": tableName,
}).Info("Index created")
return nil
}

// ensureIndex check that all indexes for the analytics SQL table are in place
func (c *SQLPump) ensureIndex(tableName string, background bool) error {
if !c.db.Migrator().HasTable(tableName) {
return errors.New("cannot create indexes as table doesn't exist: " + tableName)
}

// waitgroup to facilitate testing and track when all indexes are created
var wg sync.WaitGroup
if background {
wg.Add(len(indexes))
}

for _, idx := range indexes {
indexName := tableName + idx.baseName

if c.db.Migrator().HasIndex(tableName, indexName) {
c.log.WithFields(logrus.Fields{
"index": indexName,
"table": tableName,
}).Info("Index already exists")
continue
}

if background {
go func(baseName, cols string) {
defer wg.Done()
if err := c.createIndex(baseName, tableName, cols); err != nil {
c.log.Error(err)
}
}(idx.baseName, idx.column)
} else {
if err := c.createIndex(idx.baseName, tableName, idx.column); err != nil {
return err
}
}
}

if background {
wg.Wait()
c.backgroundIndexCreated <- true
}
return nil
}

// ensureTable creates the table if it doesn't exist
func (c *SQLPump) ensureTable(tableName string) error {
if !c.db.Migrator().HasTable(tableName) {
c.db = c.db.Table(tableName)
if err := c.db.Migrator().CreateTable(&analytics.AnalyticsRecord{}); err != nil {
c.log.Error("error creating table", err)
return err
}
if err := c.ensureIndex(tableName, false); err != nil {
return err
}
}
return nil
}
9 changes: 4 additions & 5 deletions pumps/sql_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.log.Info("omit_index_creation set to true, omitting index creation..")
return nil
}

if !c.db.Migrator().HasIndex(tableName, newAggregatedIndexName) {
indexName := fmt.Sprintf("%s_%s", tableName, newAggregatedIndexName)
if !c.db.Migrator().HasIndex(tableName, indexName) {
createIndexFn := func(c *SQLAggregatePump) error {
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, newAggregatedIndexName, tableName)).Error
err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, indexName, tableName)).Error
if err != nil {
c.log.Errorf("error creating index for table %s : %s", tableName, err.Error())
return err
Expand All @@ -178,7 +178,7 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.backgroundIndexCreated <- true
}

c.log.Info("Index ", newAggregatedIndexName, " for table ", tableName, " created successfully")
c.log.Info("Index ", indexName, " for table ", tableName, " created successfully")

return nil
}
Expand All @@ -198,7 +198,6 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.log.Info("Creating index for table ", tableName, "...")
return createIndexFn(c)
}
c.log.Info(newAggregatedIndexName, " already exists.")

return nil
}
Expand Down
47 changes: 44 additions & 3 deletions pumps/sql_aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pumps
import (
"context"
"errors"
"fmt"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -31,7 +32,8 @@ func TestSQLAggregateInit(t *testing.T) {
assert.Equal(t, "sqlite", pmp.db.Dialector.Name())
assert.Equal(t, true, pmp.db.Migrator().HasTable(analytics.AggregateSQLTable))

assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, newAggregatedIndexName))
indexName := fmt.Sprintf("%s_%s", analytics.AggregateSQLTable, newAggregatedIndexName)
assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, indexName))

// Checking with invalid type
cfg["type"] = "invalid"
Expand Down Expand Up @@ -337,7 +339,7 @@ func TestDecodeRequestAndDecodeResponseSQLAggregate(t *testing.T) {
assert.False(t, newPump.GetDecodedResponse())
}

func TestEnsureIndex(t *testing.T) {
func TestEnsureIndexSQLAggregate(t *testing.T) {
//nolint:govet
tcs := []struct {
testName string
Expand Down Expand Up @@ -419,6 +421,44 @@ func TestEnsureIndex(t *testing.T) {
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created correctly, background on sharded pump",
pmpSetupFn: func(tableName string) *SQLAggregatePump {
pmp := &SQLAggregatePump{}
cfg := &SQLAggregatePumpConf{}
cfg.Type = "sqlite"
cfg.TableSharding = true
cfg.ConnectionString = ""
pmp.SQLConf = cfg

pmp.log = log.WithField("prefix", "sql-aggregate-pump")
dialect, errDialect := Dialect(&pmp.SQLConf.SQLConf)
if errDialect != nil {
return nil
}
db, err := gorm.Open(dialect, &gorm.Config{
AutoEmbedd: true,
UseJSONTags: true,
Logger: logger.Default.LogMode(logger.Info),
})
if err != nil {
return nil
}
pmp.db = db

pmp.backgroundIndexCreated = make(chan bool, 1)

if err := pmp.ensureTable(tableName); err != nil {
return nil
}

return pmp
},
givenTableName: "shard1",
givenRunInBackground: true,
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created on non existing table, not background",
pmpSetupFn: func(tableName string) *SQLAggregatePump {
Expand Down Expand Up @@ -499,7 +539,8 @@ func TestEnsureIndex(t *testing.T) {
// wait for the background index creation to finish
<-pmp.backgroundIndexCreated
} else {
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, newAggregatedIndexName)
indexName := fmt.Sprintf("%s_%s", tc.givenTableName, newAggregatedIndexName)
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName)
assert.Equal(t, tc.shouldHaveIndex, hasIndex)
}
} else {
Expand Down
110 changes: 110 additions & 0 deletions pumps/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,113 @@ func TestDecodeRequestAndDecodeResponseSQL(t *testing.T) {
assert.False(t, newPump.GetDecodedRequest())
assert.False(t, newPump.GetDecodedResponse())
}

func setupSQLPump(t *testing.T, tableName string, useBackground bool) *SQLPump {
t.Helper()
pmp := &SQLPump{}
pmp.log = log.WithField("prefix", "sql-pump")
cfg := map[string]interface{}{
"type": "sqlite",
"connection_string": "",
}

assert.NoError(t, pmp.Init(cfg))
if useBackground {
pmp.backgroundIndexCreated = make(chan bool, 1)
}
assert.NoError(t, pmp.ensureTable(tableName))

return pmp
}

func TestEnsureIndexSQL(t *testing.T) {
//nolint:govet
tcs := []struct {
testName string
givenTableName string
expectedErr error
pmpSetupFn func(t *testing.T, tableName string) *SQLPump
givenRunInBackground bool
shouldHaveIndex bool
}{
{
testName: "index created correctly, not background",
pmpSetupFn: func(t *testing.T, tableName string) *SQLPump {
return setupSQLPump(t, tableName, false)
},
givenTableName: "analytics_no_background",
givenRunInBackground: false,
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created correctly, background",
pmpSetupFn: func(t *testing.T, tableName string) *SQLPump {
return setupSQLPump(t, tableName, true)
},
givenTableName: "analytics_background",
givenRunInBackground: true,
expectedErr: nil,
shouldHaveIndex: true,
},
}

for _, tc := range tcs {
t.Run(tc.testName, func(t *testing.T) {
pmp := tc.pmpSetupFn(t, tc.givenTableName)
defer func() {
err := pmp.db.Migrator().DropTable(tc.givenTableName)
if err != nil {
t.Errorf("Failed to drop table: %v", err)
}
}()
assert.NotNil(t, pmp)

actualErr := pmp.ensureIndex(tc.givenTableName, tc.givenRunInBackground)
isErrExpected := tc.expectedErr != nil
didErr := actualErr != nil
assert.Equal(t, isErrExpected, didErr)

if isErrExpected {
assert.Equal(t, tc.expectedErr.Error(), actualErr.Error())
}

if actualErr == nil {
if tc.givenRunInBackground {
// wait for the background index creation to finish
<-pmp.backgroundIndexCreated
}

indexToUse := indexes[0]
indexName := pmp.buildIndexName(indexToUse.baseName, tc.givenTableName)
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName)
assert.Equal(t, tc.shouldHaveIndex, hasIndex)
}
})
}
}

func TestBuildIndexName(t *testing.T) {
tests := []struct {
indexBaseName string
tableName string
expected string
}{
{"idx_responsecode", "users", "users_idx_responsecode"},
{"idx_apikey", "transactions", "transactions_idx_apikey"},
{"idx_timestamp", "logs", "logs_idx_timestamp"},
{"idx_apiid", "api_calls", "api_calls_idx_apiid"},
{"idx_orgid", "organizations", "organizations_idx_orgid"},
}

c := &SQLPump{} // Create an instance of SQLPump.

for _, tt := range tests {
t.Run(tt.indexBaseName+"_"+tt.tableName, func(t *testing.T) {
result := c.buildIndexName(tt.indexBaseName, tt.tableName)
if result != tt.expected {
t.Errorf("buildIndexName(%s, %s) = %s; want %s", tt.indexBaseName, tt.tableName, result, tt.expected)
}
})
}
}
Loading