Skip to content

Commit

Permalink
Handle ES endpoint to add single row (siglens#2026)
Browse files Browse the repository at this point in the history
* Handle elastic/<index>/_doc endpoint

* fixup
  • Loading branch information
AndrewHess authored Dec 31, 2024
1 parent b3d4f4f commit d080492
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 0 deletions.
198 changes: 198 additions & 0 deletions pkg/es/writer/esDocIndexingHandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package writer

import (
"bytes"
"net/url"
"strings"

"github.com/siglens/siglens/pkg/config"
"github.com/siglens/siglens/pkg/hooks"
segwriter "github.com/siglens/siglens/pkg/segment/writer"
vtable "github.com/siglens/siglens/pkg/virtualtable"

"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
log "github.com/sirupsen/logrus"

segutils "github.com/siglens/siglens/pkg/segment/utils"
"github.com/siglens/siglens/pkg/utils"
"github.com/valyala/fasthttp"
)

func ProcessPutPostSingleDocRequest(ctx *fasthttp.RequestCtx, updateArg bool, myid uint64) {
r := bytes.NewReader(ctx.PostBody())
indexNameIn := utils.ExtractParamAsString(ctx.UserValue("indexName"))
tsNow := utils.GetCurrentTimeInMs()
tsKey := config.GetTimeStampKey()

idInUrl := utils.ExtractParamAsString(ctx.UserValue("_id"))
idVal, err := url.QueryUnescape(idInUrl)
if err != nil {
log.Errorf("ProcessPutPostSingleDocRequest: could not decode idVal=%v, err=%v", idInUrl, err)
ctx.SetStatusCode(fasthttp.StatusServiceUnavailable)
utils.SetBadMsg(ctx, "")
return
}

docTypeInUrl := utils.ExtractParamAsString(ctx.UserValue("docType"))
docType, err := url.QueryUnescape(docTypeInUrl)
if err != nil {
log.Errorf("ProcessPutPostSingleDocRequest: could not decode docTypeInUrl=%v, err=%v", docTypeInUrl, err)
ctx.SetStatusCode(fasthttp.StatusServiceUnavailable)
utils.SetBadMsg(ctx, "")
return
}

refreshArg := string(ctx.QueryArgs().Peek("refresh"))
var flush bool
if refreshArg == "" {
flush = false
} else {
flush = true
}

var isKibanaReq bool
if strings.Contains(indexNameIn, ".kibana") {
flush = true
isKibanaReq = true
} else {
isKibanaReq = false
}

log.Debugf("ProcessPutPostSingleDocRequest: got doc with index %s and id %s, flush=%v", indexNameIn, idVal, flush)

request := make(map[string]interface{})
var json = jsoniter.ConfigCompatibleWithStandardLibrary
decoder := json.NewDecoder(r)
decoder.UseNumber()
err = decoder.Decode(&request)
if err != nil {
log.Errorf("ProcessPutPostSingleDocRequest: error un-marshalling JSON: %v", err)
utils.SetBadMsg(ctx, "")
return
}

if indexNameIn == "" {
log.Error("ProcessPutPostSingleDocRequest: error processing request: IndexName is a required parameter.")
utils.SetBadMsg(ctx, "")
return
} else if !vtable.IsVirtualTablePresent(&indexNameIn, myid) {
log.Infof("ProcessPutPostSingleDocRequest: Index name %v does not exist. Adding virtual table name and mapping.", indexNameIn)
body := string(ctx.PostBody())
err := vtable.AddVirtualTable(&indexNameIn, myid)
if err != nil {
log.Errorf("ProcessPutPostSingleDocRequest: Failed to add virtual table for indexName=%v, err=%v", indexNameIn, err)
ctx.SetStatusCode(fasthttp.StatusServiceUnavailable)
utils.SetBadMsg(ctx, "")
return
}
err = vtable.AddMappingFromADoc(&indexNameIn, &body, myid)
if err != nil {
log.Errorf("ProcessPutPostSingleDocRequest: Failed to add mapping from a doc for indexName=%v, err=%v", indexNameIn, err)
ctx.SetStatusCode(fasthttp.StatusServiceUnavailable)
utils.SetBadMsg(ctx, "")
return
}
}

if idVal == "" {
idVal = uuid.New().String()
}

request["_id"] = idVal
if docType != "" {
request["_type"] = docType
}
if len(request) > segutils.MAX_RECORD_SIZE {
var httpResp utils.HttpServerResponse
ctx.SetStatusCode(fasthttp.StatusRequestEntityTooLarge)
httpResp.Message = "Request entity too large"
httpResp.StatusCode = fasthttp.StatusRequestEntityTooLarge
utils.WriteResponse(ctx, httpResp)
return
}
localIndexMap := make(map[string]string)
cnameCacheByteHashToStr := make(map[uint64]string)
idxToStreamIdCache := make(map[string]string)
var jsParsingStackbuf [utils.UnescapeStackBufSize]byte
pleArray := make([]*segwriter.ParsedLogEvent, 0)
defer func() {
segwriter.ReleasePLEs(pleArray)
}()

indexNameConverted := AddAndGetRealIndexName(indexNameIn, localIndexMap, myid)
if isKibanaReq {
if hook := hooks.GlobalHooks.KibanaIngestSingleDocHook; hook != nil {
err := hook(ctx, request, indexNameConverted, updateArg, idVal, tsNow, myid)
if err != nil {
utils.SendError(ctx, "Failed to process kibana ingest request", "", err)
return
}
} else {
utils.SendError(ctx, "Kibana is not supported", "", nil)
}

return
} else {
rawData, _ := json.Marshal(request)
ple, err := segwriter.GetNewPLE(rawData, tsNow, indexNameConverted, &tsKey, jsParsingStackbuf[:])
if err != nil {
log.Errorf("ProcessPutPostSingleDocRequest: failed in GetNewPLE , rawData: %v err: %v", string(rawData), err)
ctx.SetStatusCode(fasthttp.StatusServiceUnavailable)
utils.SetBadMsg(ctx, "")
return
}
pleArray = append(pleArray, ple)
err = ProcessIndexRequestPle(tsNow, indexNameConverted, flush, localIndexMap, myid, 0, idxToStreamIdCache, cnameCacheByteHashToStr, jsParsingStackbuf[:], pleArray)
if err != nil {
log.Errorf("ProcessPutPostSingleDocRequest: Failed to ingest request, err: %v", err)
ctx.SetStatusCode(fasthttp.StatusServiceUnavailable)
utils.SetBadMsg(ctx, "")
return
}
SendIndexSuccess(ctx, request, updateArg)
}
}

func SendIndexSuccess(ctx *fasthttp.RequestCtx, request map[string]interface{}, updateArg bool) {

var docResp utils.DocIndexedResponse

if val, pres := request["_type"]; pres {
docResp.Type = val.(string)
}

if val, pres := request["_index"]; pres {
docResp.Index = val.(string)
}

if val, pres := request["_id"]; pres {
docResp.Id = val.(string)
}

docResp.Version = 1
docResp.SequenceNumber = 1
//todo this val can be "created" or "updated"
if updateArg {
docResp.Result = "updated"
} else {
docResp.Result = "created"
}
docResp.PrimaryTerm = 2

shards := make(map[string]interface{})
shards["total"] = 1
shards["successful"] = 1
shards["skipped"] = 0
shards["failed"] = 0
docResp.Shards = shards

var subField utils.DocIndexedResponseSubFieldGet
subField.SequenceNumber = 1
subField.PrimaryTerm = 2
subField.Found = true
subField.Source = make(map[string]interface{})
docResp.Get = subField

utils.WriteJsonResponse(ctx, docResp)
}
1 change: 1 addition & 0 deletions pkg/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type Hooks struct {
// Ingest server
IngestMiddlewareRecoveryHook func(ctx *fasthttp.RequestCtx) error
KibanaIngestHandlerHook func(ctx *fasthttp.RequestCtx)
KibanaIngestSingleDocHook func(*fasthttp.RequestCtx, map[string]interface{}, string, bool, string, uint64, uint64) error
EsBulkIngestInternalHook func(*fasthttp.RequestCtx, map[string]interface{}, string, bool, string, uint64, uint64) error
GetIdsConditionHook func() (bool, []uint64)
ExtraIngestEndpointsHook func(router *router.Router, recovery func(next func(ctx *fasthttp.RequestCtx)) func(ctx *fasthttp.RequestCtx))
Expand Down
7 changes: 7 additions & 0 deletions pkg/server/ingest/entryHandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ func EsPutIndexHandler() func(ctx *fasthttp.RequestCtx) {
}
}

func esPutPostSingleDocHandler(update bool) func(ctx *fasthttp.RequestCtx) {
return func(ctx *fasthttp.RequestCtx) {
instrumentation.IncrementInt64Counter(instrumentation.POST_REQUESTS_COUNT, 1)
eswriter.ProcessPutPostSingleDocRequest(ctx, update, 0)
}
}

func otsdbPutMetricsHandler() func(ctx *fasthttp.RequestCtx) {
return func(ctx *fasthttp.RequestCtx) {
serverutils.CallWithOrgId(otsdbwriter.PutMetrics, ctx)
Expand Down
32 changes: 32 additions & 0 deletions pkg/server/ingest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ingestserver

import (
"net"
"strings"
"time"

"github.com/siglens/siglens/pkg/hooks"
Expand Down Expand Up @@ -87,6 +88,37 @@ func (hs *ingestionServerCfg) Run() (err error) {
hs.router.GET(server_utils.ELASTIC_PREFIX+"/_xpack", hs.Recovery(esGreetHandler()))
hs.router.POST(server_utils.ELASTIC_PREFIX+"/_bulk", hs.Recovery(esPostBulkHandler()))
hs.router.PUT(server_utils.ELASTIC_PREFIX+"/{indexName}", hs.Recovery(EsPutIndexHandler()))
hs.router.HEAD(server_utils.ELASTIC_PREFIX+"/{indexName}", hs.Recovery(EsPutIndexHandler()))

hs.router.PUT(server_utils.ELASTIC_PREFIX+"/{indexName}/_mapping", hs.Recovery(EsPutIndexHandler()))
hs.router.PUT(server_utils.ELASTIC_PREFIX+"/{indexName}/_mapping/{docType}", hs.Recovery(EsPutIndexHandler()))

// without the doctype (>7.x format)
if strings.HasPrefix(*config.GetESVersion(), "7.") {
hs.router.PUT(server_utils.ELASTIC_PREFIX+"/{indexName}/_doc/{_id}", hs.Recovery(esPutPostSingleDocHandler(false)))
hs.router.POST(server_utils.ELASTIC_PREFIX+"/{indexName}/_doc/{_id?}", hs.Recovery(esPutPostSingleDocHandler(false)))

hs.router.PUT(server_utils.ELASTIC_PREFIX+"/{indexName}/_create/{_id}", hs.Recovery(esPutPostSingleDocHandler(false)))
hs.router.POST(server_utils.ELASTIC_PREFIX+"/{indexName}/_create/{_id}", hs.Recovery(esPutPostSingleDocHandler(false)))

hs.router.POST(server_utils.ELASTIC_PREFIX+"/{indexName}/_update/{_id}", hs.Recovery(esPutPostSingleDocHandler(true)))

} else {
// with the doctype (<7.x format)

hs.router.PUT(server_utils.ELASTIC_PREFIX+"/{indexName}/{docType}/{_id}",
hs.Recovery(esPutPostSingleDocHandler(false)))
hs.router.POST(server_utils.ELASTIC_PREFIX+"/{indexName}/{docType}/{_id?}",
hs.Recovery(esPutPostSingleDocHandler(false)))

hs.router.PUT(server_utils.ELASTIC_PREFIX+"/{indexName}/{docType}/{_id}/_create",
hs.Recovery(esPutPostSingleDocHandler(false)))
hs.router.POST(server_utils.ELASTIC_PREFIX+"/{indexName}/{docType}/{_id}/_create",
hs.Recovery(esPutPostSingleDocHandler(false)))

hs.router.POST(server_utils.ELASTIC_PREFIX+"/{indexName}/{docType}/{_id}/_update",
hs.Recovery(esPutPostSingleDocHandler(true)))
}

// Loki endpoints
hs.router.POST(server_utils.LOKI_PREFIX+"/api/v1/push", hs.Recovery(lokiPostBulkHandler()))
Expand Down

0 comments on commit d080492

Please sign in to comment.