Skip to content

Commit

Permalink
fix: wal opened too many gorutine (zincsearch#343)
Browse files Browse the repository at this point in the history
* fix: wal opened too many gorutine

* fix: unit test coverage

* fix: unit tests

* fix: DATA RACE of unit tests

* fix: fix LGTM error check

* fix: DATA RACE on unit tests
  • Loading branch information
hengfeiyang authored Jul 20, 2022
1 parent 6d1c40b commit 2f726b9
Show file tree
Hide file tree
Showing 18 changed files with 371 additions and 169 deletions.
1 change: 1 addition & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ jobs:
ZINC_FIRST_ADMIN_PASSWORD: Complexpass#123
ZINC_WAL_SYNC_INTERVAL: 10ms
ZINC_WAL_REDOLOG_NO_SYNC: true
ZINC_ENABLE_TEXT_KEYWORD_MAPPING: true
steps:
- name: Check out repository code
uses: actions/checkout@v3
Expand Down
16 changes: 5 additions & 11 deletions pkg/core/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type Index struct {
meta.Index
Analyzers map[string]*analysis.Analyzer `json:"-"`
lock sync.RWMutex `json:"-"`
open uint32 `json:"-"`
close chan struct{} `json:"-"`
closed bool `json:"-"`
}

func (index *Index) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -182,8 +182,8 @@ func (index *Index) UpdateMetadata() error {
}
// update docTime
s := index.Shards[index.GetLatestShardID()]
atomic.StoreInt64(&s.DocTimeMin, index.DocTimeMin)
atomic.StoreInt64(&s.DocTimeMax, index.DocTimeMax)
atomic.StoreInt64(&s.DocTimeMin, atomic.LoadInt64(&index.DocTimeMin))
atomic.StoreInt64(&s.DocTimeMax, atomic.LoadInt64(&index.DocTimeMax))

return metadata.Index.Set(index.Name, index.Index)
}
Expand Down Expand Up @@ -227,21 +227,15 @@ func (index *Index) Reopen() error {
if _, err := index.GetWriter(); err != nil {
return err
}
index.closed = false
return nil
}

func (index *Index) Close() error {
// update metadata before close
// if err = index.UpdateMetadata(); err != nil {
// return err
// }

if index.closed {
if atomic.LoadUint32(&index.open) == 0 {
return nil
}
index.close <- struct{}{}
index.closed = true
atomic.StoreUint32(&index.open, 0)

index.lock.Lock()
defer index.lock.Unlock()
Expand Down
21 changes: 20 additions & 1 deletion pkg/core/index_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,16 @@ func (index *Index) checkField(mappings *meta.Mappings, data map[string]interfac

// CreateDocument inserts or updates a document in the zinc index
func (index *Index) CreateDocument(docID string, doc map[string]interface{}, update bool) error {
data, err := index.CheckDocument(docID, doc, update, index.GetLatestShardID())
// check WAL
if err := index.OpenWAL(); err != nil {
return err
}

shardID := index.GetLatestShardID()
if update {
shardID = -1
}
data, err := index.CheckDocument(docID, doc, update, shardID)
if err != nil {
return err
}
Expand All @@ -318,6 +327,11 @@ func (index *Index) CreateDocument(docID string, doc map[string]interface{}, upd

// UpdateDocument updates a document in the zinc index
func (index *Index) UpdateDocument(docID string, doc map[string]interface{}, insert bool) error {
// check WAL
if err := index.OpenWAL(); err != nil {
return err
}

update := true
shardID, err := index.FindShardByDocID(docID)
if err != nil {
Expand All @@ -338,6 +352,11 @@ func (index *Index) UpdateDocument(docID string, doc map[string]interface{}, ins

// DeleteDocument deletes a document in the zinc index
func (index *Index) DeleteDocument(docID string) error {
// check WAL
if err := index.OpenWAL(); err != nil {
return err
}

shardID, err := index.FindShardByDocID(docID)
if err != nil {
return err
Expand Down
72 changes: 72 additions & 0 deletions pkg/core/index_document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ func TestIndex_CreateUpdateDocument(t *testing.T) {
},
wantErr: false,
},
{
name: "Document with error date format",
args: args{
docID: "test1",
doc: map[string]interface{}{
"name": "Hello",
"@timestamp": "2020-01-01T00:00:00Z8",
},
update: true,
},
wantErr: true,
},
}

indexName := "TestDocument.index_1"
Expand Down Expand Up @@ -198,6 +210,66 @@ func TestIndex_UpdateDocument(t *testing.T) {
})
}

func TestIndex_DeleteDocument(t *testing.T) {
type args struct {
docID string
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "normal",
args: args{
docID: "1",
},
},
{
name: "normal",
args: args{
docID: "2",
},
wantErr: true,
},
}
indexName := "TestIndex_DeleteDocument.index_1"
var index *Index
var err error
t.Run("prepare", func(t *testing.T) {
index, err = NewIndex(indexName, "disk")
assert.NoError(t, err)
assert.NotNil(t, index)
err = StoreIndex(index)
assert.NoError(t, err)

err = index.CreateDocument("1", map[string]interface{}{
"name": "Hello",
"time": float64(1579098983),
}, false)
assert.NoError(t, err)

// wait for WAL write to index
time.Sleep(time.Second)
})

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := index.DeleteDocument(tt.args.docID)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
})
}

t.Run("cleanup", func(t *testing.T) {
err = DeleteIndex(indexName)
assert.NoError(t, err)
})
}

func TestDateLayoutDetection(t *testing.T) {
type args struct {
layout string
Expand Down
5 changes: 3 additions & 2 deletions pkg/core/index_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ func (index *Index) NewShard() error {
// update current shard
index.UpdateMetadataByShard(index.GetLatestShardID())
index.lock.Lock()
index.Shards[index.ShardNum-1].DocTimeMin = index.DocTimeMin
index.Shards[index.ShardNum-1].DocTimeMax = index.DocTimeMax
shard := index.Shards[index.ShardNum-1]
atomic.StoreInt64(&shard.DocTimeMin, index.DocTimeMin)
atomic.StoreInt64(&shard.DocTimeMax, index.DocTimeMax)
index.DocTimeMin = 0
index.DocTimeMax = 0
// create new shard
Expand Down
15 changes: 9 additions & 6 deletions pkg/core/index_shards_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"time"

"github.com/stretchr/testify/assert"

"github.com/zinclabs/zinc/pkg/config"
)

func TestIndex_CheckShards(t *testing.T) {
func TestIndex_Shards(t *testing.T) {
type args struct {
docID string
doc map[string]interface{}
Expand Down Expand Up @@ -61,9 +59,7 @@ func TestIndex_CheckShards(t *testing.T) {
var index *Index
var err error
t.Run("perpare", func(t *testing.T) {
config.Global.Shard.MaxSize = 1024

index, err = NewIndex("TestIndex_CheckShards.index_1", "disk")
index, err = NewIndex("TestIndex_Shards.index_1", "disk")
assert.NoError(t, err)
assert.NotNil(t, index)

Expand All @@ -76,6 +72,13 @@ func TestIndex_CheckShards(t *testing.T) {
err := index.CreateDocument(tt.args.docID, tt.args.doc, false)
assert.NoError(t, err)

// wait for WAL write to index
time.Sleep(time.Second)

if err := index.NewShard(); (err != nil) != tt.wantErr {
t.Errorf("Index.NewShard() error = %v, wantErr %v", err, tt.wantErr)
}

if err := index.CheckShards(); (err != nil) != tt.wantErr {
t.Errorf("Index.CheckShards() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down
Loading

0 comments on commit 2f726b9

Please sign in to comment.