Skip to content

Commit

Permalink
Rework passing of shards and replicas so they're passed to the indexe…
Browse files Browse the repository at this point in the history
…r constructor rather than the Index method
  • Loading branch information
rowanseymour committed Oct 11, 2022
1 parent 2970fd3 commit 8914509
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 26 deletions.
4 changes: 2 additions & 2 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ func main() {
}

idxrs := []indexers.Indexer{
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, 500),
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500),
}

if cfg.Rebuild {
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
idxr := idxrs[0]
if _, err := idxr.Index(db, true, cfg.Cleanup, cfg.ContactsShards, cfg.ContactsReplicas); err != nil {
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
case <-d.quit:
return
case <-time.After(d.poll):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup, d.cfg.ContactsShards, d.cfg.ContactsReplicas)
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
if err != nil {
log.WithError(err).Error("error during indexing")
}
Expand Down
18 changes: 15 additions & 3 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ type Stats struct {
// Indexer is base interface for indexers
type Indexer interface {
Name() string
Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error)
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
Stats() Stats
}

// IndexDefinition is what we pass to elastic to create an index,
// see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
type IndexDefinition struct {
Settings struct {
Index struct {
Expand All @@ -45,15 +47,25 @@ type IndexDefinition struct {
Mappings json.RawMessage `json:"mappings"`
}

func newIndexDefinition(base []byte, shards, replicas int) *IndexDefinition {
d := &IndexDefinition{}
jsonx.MustUnmarshal(contactsIndexDef, d)

d.Settings.Index.NumberOfShards = shards
d.Settings.Index.NumberOfReplicas = replicas
return d
}

type baseIndexer struct {
elasticURL string
name string // e.g. contacts, used as the alias
definition *IndexDefinition

stats Stats
}

func newBaseIndexer(elasticURL, name string) baseIndexer {
return baseIndexer{elasticURL: elasticURL, name: name}
func newBaseIndexer(elasticURL, name string, def *IndexDefinition) baseIndexer {
return baseIndexer{elasticURL: elasticURL, name: name, definition: def}
}

func (i *baseIndexer) Name() string {
Expand Down
19 changes: 7 additions & 12 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (
"fmt"
"time"

"github.com/nyaruka/gocommon/jsonx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

//go:embed contacts.index.json
var contactsIndexDefinition []byte
var contactsIndexDef []byte

// ContactIndexer is an indexer for contacts
type ContactIndexer struct {
Expand All @@ -23,15 +22,17 @@ type ContactIndexer struct {
}

// NewContactIndexer creates a new contact indexer
func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer {
func NewContactIndexer(elasticURL, name string, shards, replicas, batchSize int) *ContactIndexer {
def := newIndexDefinition(contactsIndexDef, shards, replicas)

return &ContactIndexer{
baseIndexer: newBaseIndexer(elasticURL, name),
baseIndexer: newBaseIndexer(elasticURL, name, def),
batchSize: batchSize,
}
}

// Index indexes modified contacts and returns the name of the concrete index
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards, replicas int) (string, error) {
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) {
var err error

// find our physical index
Expand All @@ -47,13 +48,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards, replic

// doesn't exist or we are rebuilding, create it
if physicalIndex == "" || rebuild {
def := &IndexDefinition{}
jsonx.MustUnmarshal(contactsIndexDefinition, def)

def.Settings.Index.NumberOfShards = shards
def.Settings.Index.NumberOfReplicas = replicas

physicalIndex, err = i.createNewIndex(def)
physicalIndex, err = i.createNewIndex(i.definition)
if err != nil {
return "", errors.Wrap(err, "error creating new index")
}
Expand Down
16 changes: 8 additions & 8 deletions indexers/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ var contactQueryTests = []struct {
func TestContacts(t *testing.T) {
db, es := setup(t)

ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
assert.Equal(t, "indexer_test", ix1.Name())

expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02"))

indexName, err := ix1.Index(db, false, false, 2, 1)
indexName, err := ix1.Index(db, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName, indexName)

Expand All @@ -217,7 +217,7 @@ func TestContacts(t *testing.T) {
require.NoError(t, err)

// and index again...
indexName, err = ix1.Index(db, false, false, 2, 1)
indexName, err = ix1.Index(db, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName, indexName) // same index used
assertIndexerStats(t, ix1, 10, 1)
Expand All @@ -238,9 +238,9 @@ func TestContacts(t *testing.T) {
require.NoError(t, err)

// and simulate another indexer doing a parallel rebuild
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)

indexName2, err := ix2.Index(db, true, false, 2, 1)
indexName2, err := ix2.Index(db, true, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used
assertIndexerStats(t, ix2, 8, 0)
Expand All @@ -254,8 +254,8 @@ func TestContacts(t *testing.T) {
assertQuery(t, es, elastic.NewMatchQuery("name", "eric"), []int64{2})

// simulate another indexer doing a parallel rebuild with cleanup
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
indexName3, err := ix3.Index(db, true, true, 2, 1)
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
indexName3, err := ix3.Index(db, true, true)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used
assertIndexerStats(t, ix3, 8, 0)
Expand All @@ -264,7 +264,7 @@ func TestContacts(t *testing.T) {
assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName + "_2"})

// check that the original indexer now indexes against the new index
indexName, err = ix1.Index(db, false, false, 2, 1)
indexName, err = ix1.Index(db, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_2", indexName)
}

0 comments on commit 8914509

Please sign in to comment.