Skip to content

Commit

Permalink
Add index name and persist sort columns (siglens#2018)
Browse files Browse the repository at this point in the history
* Add index name and persist sort columns

* Addressed Comments
  • Loading branch information
sonamgupta21 authored Jan 2, 2025
1 parent 52be560 commit b5a68ce
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 21 deletions.
94 changes: 76 additions & 18 deletions pkg/segment/sortindex/sortindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sort"
"sync"

"github.com/siglens/siglens/pkg/config"
"github.com/siglens/siglens/pkg/segment/reader/segread/segreader"
segutils "github.com/siglens/siglens/pkg/segment/utils"
"github.com/siglens/siglens/pkg/utils"
Expand All @@ -41,13 +42,10 @@ type Block struct {
RecNums []uint16 `json:"recNums"`
}

type SortColumnConfig struct {
columns []string
mu sync.RWMutex
}
var sortConfigMutex sync.RWMutex

var sortConfig = &SortColumnConfig{
columns: make([]string, 0),
type SortColumnsConfig struct {
Indexes map[string][]string `json:"indexes"`
}

var VERSION_SORT_INDEX = []byte{1}
Expand All @@ -67,6 +65,10 @@ const (

var AllSortModes = []SortMode{SortAsAuto, SortAsNumeric, SortAsString}

func getSortColumnsConfigPath() string {
return filepath.Join(config.GetDataPath(), "common", "sort_columns.json")
}

func (sm SortMode) String() string {
switch sm {
case SortAsAuto:
Expand Down Expand Up @@ -658,37 +660,93 @@ func readMetadata(file *os.File) (*metadata, error) {
return meta, nil
}

func SetSortColumns(columnNames []string) error {
sortConfig.mu.Lock()
defer sortConfig.mu.Unlock()
func SetSortColumns(indexName string, columnNames []string) error {
configPath := getSortColumnsConfigPath()

sortConfigMutex.Lock()
defer sortConfigMutex.Unlock()

dir := filepath.Dir(configPath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("SetSortColumns: failed to create config directory: %v", err)
}

// TODO: Persist sort column names to disk
for _, col := range columnNames {
if col == "" {
return fmt.Errorf("SetSortColumns: column names must be non-empty strings")
}
}

sortConfig.columns = columnNames
config := SortColumnsConfig{
Indexes: make(map[string][]string),
}

data, err := os.ReadFile(configPath)
if err == nil {
if err := json.Unmarshal(data, &config); err != nil {
return fmt.Errorf("SetSortColumns: failed to parse sort columns config: %v", err)
}
}

config.Indexes[indexName] = columnNames

data, err = json.MarshalIndent(config, "", " ")
if err != nil {
return fmt.Errorf("SetSortColumns: failed to marshal sort columns config: %v", err)
}

err = os.WriteFile(configPath, data, 0644)
if err != nil {
return fmt.Errorf("SetSortColumns: failed to write sort columns config: %v", err)
}

return nil
}

func GetSortColumns() []string {
sortConfig.mu.RLock()
defer sortConfig.mu.RUnlock()
return sortConfig.columns
func GetSortColumnNamesForIndex(indexName string) []string {
configPath := getSortColumnsConfigPath()

sortConfigMutex.RLock()
defer sortConfigMutex.RUnlock()

data, err := os.ReadFile(configPath)
if err != nil {
if os.IsNotExist(err) {
log.Debugf("GetSortColumnNamesForIndex: no sort columns config file found: %v", err)
} else {
log.Errorf("GetSortColumnNamesForIndex: error reading config file: %v", err)
}
return make([]string, 0)
}

var config SortColumnsConfig
if err := json.Unmarshal(data, &config); err != nil {
log.Errorf("GetSortColumnNamesForIndex: failed to parse sort columns config: %v", err)
return make([]string, 0)
}

return config.Indexes[indexName]
}

func SetSortColumnsAPI(ctx *fasthttp.RequestCtx) {
var columns []string
if err := json.Unmarshal(ctx.PostBody(), &columns); err != nil {
var request struct {
IndexName string `json:"indexName"`
Columns []string `json:"columns"`
}

if err := json.Unmarshal(ctx.PostBody(), &request); err != nil {
ctx.SetStatusCode(fasthttp.StatusBadRequest)
ctx.SetBodyString("Invalid request body: " + err.Error())
return
}

if err := SetSortColumns(columns); err != nil {
if request.IndexName == "" {
ctx.SetStatusCode(fasthttp.StatusBadRequest)
ctx.SetBodyString("Missing index name")
return
}

if err := SetSortColumns(request.IndexName, request.Columns); err != nil {
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.SetBodyString("Failed to set sort columns: " + err.Error())
return
Expand Down
6 changes: 3 additions & 3 deletions pkg/segment/writer/segstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ func (segstore *SegStore) checkAndRotateColFiles(streamid string, forceRotate bo
updateRecentlyRotatedSegmentFiles(segstore.SegmentKey)
metadata.AddSegMetaToMetadata(&segmeta)

go writeSortIndexes(segstore.SegmentKey)
go writeSortIndexes(segstore.SegmentKey, segstore.VirtualTableName)

if blobErr == nil {
// upload ingest node dir to s3
Expand All @@ -847,13 +847,13 @@ func (segstore *SegStore) checkAndRotateColFiles(streamid string, forceRotate bo
return nil
}

func writeSortIndexes(segkey string) {
func writeSortIndexes(segkey string, indexName string) {

if config.IsSortIndexEnabled() {
sortedIndexWG.Add(1)
defer sortedIndexWG.Done()

for _, cname := range sortindex.GetSortColumns() {
for _, cname := range sortindex.GetSortColumnNamesForIndex(indexName) {
err := sortindex.WriteSortIndex(segkey, cname, sortindex.AllSortModes)
if err != nil {
log.Errorf("writeSortIndexes: failed to write sort index for segkey=%v, cname=%v; err=%v",
Expand Down

0 comments on commit b5a68ce

Please sign in to comment.