From ba530353999550135c70e5a344812e3fde8fb8d1 Mon Sep 17 00:00:00 2001 From: iguazio-deploy Date: Tue, 18 Aug 2020 13:13:39 +0000 Subject: [PATCH] Updated TSDB to v0.10.11 --- .../vendor/github.com/v3io/v3io-tsdb/go.mod | 2 - .../v3io/v3io-tsdb/pkg/appender/appender.go | 31 ++-- .../v3io/v3io-tsdb/pkg/appender/ingest.go | 159 +++++++++++------- .../v3io/v3io-tsdb/pkg/appender/store.go | 22 +++ .../v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go | 7 +- .../vendor/github.com/v3io/v3io-tsdb/go.mod | 2 - .../v3io/v3io-tsdb/pkg/appender/appender.go | 31 ++-- .../v3io/v3io-tsdb/pkg/appender/ingest.go | 159 +++++++++++------- .../v3io/v3io-tsdb/pkg/appender/store.go | 22 +++ .../v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go | 7 +- 10 files changed, 288 insertions(+), 154 deletions(-) diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/go.mod b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/go.mod index 21255ccf..969b9e8a 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/go.mod +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/go.mod @@ -4,7 +4,6 @@ go 1.14 require ( github.com/cespare/xxhash v1.1.0 - github.com/cpuguy83/go-md2man v1.0.10 // indirect github.com/ghodss/yaml v1.0.0 github.com/imdario/mergo v0.3.7 github.com/inconshreveable/mousetrap v1.0.0 // indirect @@ -14,7 +13,6 @@ require ( github.com/pkg/errors v0.8.1 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/spf13/cobra v0.0.3 - github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/testify v1.4.0 github.com/v3io/frames v0.7.33 github.com/v3io/v3io-go v0.1.9 diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go index 26fd9ea5..24273713 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go @@ -59,17 +59,20 @@ type MetricState struct { retryCount uint8 newName bool isVariant bool + + shouldGetState bool } // Metric store states type storeState uint8 const ( - storeStateInit storeState = 0 - storeStatePreGet storeState = 1 // Need to get state - storeStateGet storeState = 2 // Getting old state from storage - storeStateReady storeState = 3 // Ready to update - storeStateUpdate storeState = 4 // Update/write in progress + storeStateInit storeState = 0 + storeStatePreGet storeState = 1 // Need to get state + storeStateGet storeState = 2 // Getting old state from storage + storeStateReady storeState = 3 // Ready to update + storeStateUpdate storeState = 4 // Update/write in progress + storeStateAboutToUpdate storeState = 5 // Like ready state but with updates pending ) // store is ready to update samples into the DB @@ -77,6 +80,11 @@ func (m *MetricState) isReady() bool { return m.state == storeStateReady } +// Indicates whether the metric has no inflight requests and can send new ones +func (m *MetricState) canSendRequests() bool { + return m.state == storeStateReady || m.state == storeStateAboutToUpdate +} + func (m *MetricState) getState() storeState { return m.state } @@ -118,6 +126,9 @@ type MetricsCache struct { updatesComplete chan int newUpdates chan int + outstandingUpdates int64 + requestsInFlight int64 + lastMetric uint64 // TODO: consider switching to synch.Map (https://golang.org/pkg/sync/#Map) @@ -144,7 +155,6 @@ func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config newCache.asyncAppendChan = make(chan *asyncAppend, channelSize) newCache.metricQueue = NewElasticQueue() - newCache.updatesComplete = make(chan int, 100) newCache.newUpdates = make(chan int, 1000) newCache.stopChan = make(chan int, 3) @@ -155,10 +165,11 @@ func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config } type asyncAppend struct { - metric *MetricState - t int64 - v interface{} - resp chan int + metric *MetricState + t int64 + v interface{} + resp chan int + isCompletion bool } func (mc *MetricsCache) Start() error { diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go index eb349e0e..32d05592 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go @@ -24,6 +24,7 @@ import ( "fmt" "net/http" "reflect" + "sync/atomic" "time" "github.com/pkg/errors" @@ -47,8 +48,6 @@ func (mc *MetricsCache) start() error { func (mc *MetricsCache) metricFeed(index int) { go func() { - inFlight := 0 - gotData := false potentialCompletion := false var completeChan chan int @@ -56,30 +55,17 @@ func (mc *MetricsCache) metricFeed(index int) { select { case _ = <-mc.stopChan: return - case inFlight = <-mc.updatesComplete: - // Handle completion notifications from the update loop - length := mc.metricQueue.Length() - mc.logger.Debug(`Complete update cycle - "in-flight requests"=%d; "metric queue length"=%d\n`, inFlight, length) - - // If data was sent and the queue is empty, mark as completion - if length == 0 && gotData { - switch len(mc.asyncAppendChan) { - case 0: - potentialCompletion = true - if completeChan != nil { - completeChan <- 0 - } - case 1: - potentialCompletion = true - } - } case app := <-mc.asyncAppendChan: newMetrics := 0 dataQueued := 0 numPushed := 0 + gotCompletion := false inLoop: for i := 0; i <= mc.cfg.BatchSize; i++ { - if app.metric == nil { + // Handle completion notifications from the update loop + if app.isCompletion { + gotCompletion = true + } else if app.metric == nil { // Handle update completion requests (metric == nil) completeChan = app.resp if potentialCompletion { @@ -88,7 +74,6 @@ func (mc *MetricsCache) metricFeed(index int) { } else { potentialCompletion = false // Handle append requests (Add / AddFast) - gotData = true metric := app.metric metric.Lock() @@ -103,7 +88,7 @@ func (mc *MetricsCache) metricFeed(index int) { metric.setState(storeStatePreGet) } if metric.isReady() { - metric.setState(storeStateUpdate) + metric.setState(storeStateAboutToUpdate) } length := mc.metricQueue.Push(metric) @@ -124,7 +109,22 @@ func (mc *MetricsCache) metricFeed(index int) { } // Notify the update loop that there are new metrics to process if newMetrics > 0 { + atomic.AddInt64(&mc.outstandingUpdates, 1) mc.newUpdates <- newMetrics + } else if gotCompletion { + inFlight := atomic.LoadInt64(&mc.requestsInFlight) + outstanding := atomic.LoadInt64(&mc.outstandingUpdates) + if outstanding == 0 && inFlight == 0 { + switch len(mc.asyncAppendChan) { + case 0: + potentialCompletion = true + if completeChan != nil { + completeChan <- 0 + } + case 1: + potentialCompletion = true + } + } } // If we have too much work, stall the queue for some time @@ -154,7 +154,7 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) { return case _ = <-mc.newUpdates: // Handle new metric notifications (from metricFeed) - for mc.updatesInFlight < mc.cfg.Workers*2 { //&& newMetrics > 0{ + for mc.updatesInFlight < mc.cfg.Workers*2 { freeSlots := mc.cfg.Workers*2 - mc.updatesInFlight metrics := mc.metricQueue.PopN(freeSlots) for _, metric := range metrics { @@ -165,9 +165,11 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) { } } - if mc.updatesInFlight == 0 { - mc.logger.Debug("Complete new update cycle - in-flight %d.\n", mc.updatesInFlight) - mc.updatesComplete <- 0 + outstandingUpdates := atomic.AddInt64(&mc.outstandingUpdates, -1) + + if atomic.LoadInt64(&mc.requestsInFlight) == 0 && outstandingUpdates == 0 { + mc.logger.Debug("Return to feed after processing newUpdates") + mc.asyncAppendChan <- &asyncAppend{isCompletion: true} } case resp := <-mc.responseChan: // Handle V3IO async responses @@ -188,6 +190,7 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) { if i < mc.cfg.BatchSize { select { case resp = <-mc.responseChan: + atomic.AddInt64(&mc.requestsInFlight, -1) default: break inLoop } @@ -206,10 +209,12 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) { } } + requestsInFlight := atomic.AddInt64(&mc.requestsInFlight, -1) + // Notify the metric feeder when all in-flight tasks are done - if mc.updatesInFlight == 0 { - mc.logger.Debug("Return to feed. Metric queue length: %d", mc.metricQueue.Length()) - mc.updatesComplete <- 0 + if requestsInFlight == 0 && atomic.LoadInt64(&mc.outstandingUpdates) == 0 { + mc.logger.Debug("Return to feed after processing responseChan") + mc.asyncAppendChan <- &asyncAppend{isCompletion: true} } } } @@ -223,45 +228,70 @@ func (mc *MetricsCache) postMetricUpdates(metric *MetricState) { metric.Lock() defer metric.Unlock() var sent bool - var err error - if metric.getState() == storeStatePreGet { - sent, err = metric.store.getChunksState(mc, metric) - if err != nil { - // Count errors - mc.performanceReporter.IncrementCounter("GetChunksStateError", 1) - - mc.logger.ErrorWith("Failed to get item state", "metric", metric.Lset, "err", err) - setError(mc, metric, err) - } else { - metric.setState(storeStateGet) + // In case we are in pre get state or our data spreads across multiple partitions, get the new state for the current partition + if metric.getState() == storeStatePreGet || + (metric.canSendRequests() && metric.shouldGetState) { + sent = mc.sendGetMetricState(metric) + if sent { + mc.updatesInFlight++ } + } else if metric.canSendRequests() { + sent = mc.writeChunksAndGetState(metric) - } else { - sent, err = metric.store.writeChunks(mc, metric) - if err != nil { - // Count errors - mc.performanceReporter.IncrementCounter("WriteChunksError", 1) - - mc.logger.ErrorWith("Submit failed", "metric", metric.Lset, "err", err) - setError(mc, metric, errors.Wrap(err, "Chunk write submit failed.")) - } else if sent { - metric.setState(storeStateUpdate) - } if !sent { if metric.store.samplesQueueLength() == 0 { metric.setState(storeStateReady) } else { if mc.metricQueue.length() > 0 { + atomic.AddInt64(&mc.outstandingUpdates, 1) mc.newUpdates <- mc.metricQueue.length() } } } } +} + +func (mc *MetricsCache) sendGetMetricState(metric *MetricState) bool { + // If we are already in a get state, discard + if metric.getState() == storeStateGet { + return false + } + + sent, err := metric.store.getChunksState(mc, metric) + if err != nil { + // Count errors + mc.performanceReporter.IncrementCounter("GetChunksStateError", 1) + + mc.logger.ErrorWith("Failed to get item state", "metric", metric.Lset, "err", err) + setError(mc, metric, err) + } else { + metric.setState(storeStateGet) + } + + return sent +} + +func (mc *MetricsCache) writeChunksAndGetState(metric *MetricState) bool { + sent, err := metric.store.writeChunks(mc, metric) + if err != nil { + // Count errors + mc.performanceReporter.IncrementCounter("WriteChunksError", 1) + + mc.logger.ErrorWith("Submit failed", "metric", metric.Lset, "err", err) + setError(mc, metric, errors.Wrap(err, "Chunk write submit failed.")) + } else if sent { + metric.setState(storeStateUpdate) + } else if metric.shouldGetState { + // In case we didn't write any data and the metric state needs to be updated, update it straight away + sent = mc.sendGetMetricState(metric) + } + if sent { mc.updatesInFlight++ } + return sent } // Handle DB responses @@ -337,24 +367,18 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response, metric.setState(storeStateReady) var sent bool - var err error - - if canWrite { - sent, err = metric.store.writeChunks(mc, metric) - if err != nil { - // Count errors - mc.performanceReporter.IncrementCounter("WriteChunksError", 1) - mc.logger.ErrorWith("Submit failed", "metric", metric.Lset, "err", err) - setError(mc, metric, errors.Wrap(err, "Chunk write submit failed.")) - } else if sent { - metric.setState(storeStateUpdate) + // In case our data spreads across multiple partitions, get the new state for the current partition + if metric.shouldGetState { + sent = mc.sendGetMetricState(metric) + if sent { mc.updatesInFlight++ } - + } else if canWrite { + sent = mc.writeChunksAndGetState(metric) } else if metric.store.samplesQueueLength() > 0 { mc.metricQueue.Push(metric) - metric.setState(storeStateUpdate) + metric.setState(storeStateAboutToUpdate) } return sent @@ -385,6 +409,13 @@ func (mc *MetricsCache) nameUpdateRespLoop() { } resp.Release() + + requestsInFlight := atomic.AddInt64(&mc.requestsInFlight, -1) + + if requestsInFlight == 0 && atomic.LoadInt64(&mc.outstandingUpdates) == 0 { + mc.logger.Debug("Return to feed after processing nameUpdateChan") + mc.asyncAppendChan <- &asyncAppend{isCompletion: true} + } } } }() diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/store.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/store.go index c1e324e1..cce59bb1 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/store.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/appender/store.go @@ -25,6 +25,7 @@ import ( "fmt" "path/filepath" "sort" + "sync/atomic" "time" "github.com/nuclio/logger" @@ -156,8 +157,10 @@ func (cs *chunkStore) getChunksState(mc *MetricsCache, metric *MetricState) (boo getInput := v3io.GetItemInput{ Path: path, AttributeNames: []string{config.MaxTimeAttrName}} + atomic.AddInt64(&mc.requestsInFlight, 1) request, err := mc.container.GetItem(&getInput, metric, mc.responseChan) if err != nil { + atomic.AddInt64(&mc.requestsInFlight, -1) mc.logger.ErrorWith("Failed to send a GetItem request to the TSDB", "metric", metric.key, "err", err) return false, err } @@ -186,8 +189,10 @@ func (cs *chunkStore) processGetResp(mc *MetricsCache, metric *MetricState, resp path := filepath.Join(mc.cfg.TablePath, config.NamesDirectory, metric.name) putInput := v3io.PutItemInput{Path: path, Attributes: map[string]interface{}{}} + atomic.AddInt64(&mc.requestsInFlight, 1) request, err := mc.container.PutItem(&putInput, metric, mc.nameUpdateChan) if err != nil { + atomic.AddInt64(&mc.requestsInFlight, -1) cs.performanceReporter.IncrementCounter("PutNameError", 1) mc.logger.ErrorWith("Update-name PutItem failed", "metric", metric.key, "err", err) } else { @@ -199,6 +204,8 @@ func (cs *chunkStore) processGetResp(mc *MetricsCache, metric *MetricState, resp cs.performanceReporter.IncrementCounter("UpdateMetricError", 1) } + metric.shouldGetState = false + cs.maxTime = 0 // In case of an error, initialize max time back to default return } @@ -217,6 +224,7 @@ func (cs *chunkStore) processGetResp(mc *MetricsCache, metric *MetricState, resp // Set Last TableId - indicate that there is no need to create metric object cs.lastTid = cs.nextTid + metric.shouldGetState = false } // Append data to the right chunk and table based on the time and state @@ -297,6 +305,13 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen hasPendingUpdates = false return } + + // In case the current max time is not up to date, force a get state + if partition.GetStartTime() > cs.maxTime && cs.maxTime > 0 { + metric.shouldGetState = true + hasPendingUpdates = false + return + } if partition.GetStartTime() > cs.lastTid { notInitialized = true cs.lastTid = partition.GetStartTime() @@ -397,6 +412,11 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen pendingSamplesCount++ } + // In case we advanced to a newer partition mark we need to get state again + if pendingSampleIndex < len(cs.pending) && !partition.InRange(cs.pending[pendingSampleIndex].t) { + metric.shouldGetState = true + } + cs.aggrList.Clear() if pendingSampleIndex == len(cs.pending) { cs.pending = cs.pending[:0] @@ -440,9 +460,11 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen } expr += fmt.Sprintf("%v=%d;", config.MaxTimeAttrName, cs.maxTime) // TODO: use max() expr path := partition.GetMetricPath(metric.name, metric.hash, cs.labelNames, cs.isAggr()) + atomic.AddInt64(&mc.requestsInFlight, 1) request, err := mc.container.UpdateItem( &v3io.UpdateItemInput{Path: path, Expression: &expr, Condition: conditionExpr}, metric, mc.responseChan) if err != nil { + atomic.AddInt64(&mc.requestsInFlight, -1) mc.logger.ErrorWith("UpdateItem failed", "err", err) hasPendingUpdates = false } diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go index ed5a9ee3..761c854c 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go @@ -619,7 +619,12 @@ func deleteObjectWorker(container v3io.Container, deleteParams *DeleteParams, lo // Update the partition's max time if needed. if deleteParams.From < dbMaxTime && deleteParams.To >= dbMaxTime { if deleteParams.From < newMaxTime { - newMaxTime = deleteParams.From + // Limit the max time to be in the current partition's range + if deleteParams.From < currentPartition.GetStartTime() { + newMaxTime = currentPartition.GetStartTime() + } else { + newMaxTime = deleteParams.From + } } deleteUpdateExpression.WriteString(fmt.Sprintf("%v=%v;", config.MaxTimeAttrName, newMaxTime)) diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/go.mod b/functions/query/vendor/github.com/v3io/v3io-tsdb/go.mod index 21255ccf..969b9e8a 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/go.mod +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/go.mod @@ -4,7 +4,6 @@ go 1.14 require ( github.com/cespare/xxhash v1.1.0 - github.com/cpuguy83/go-md2man v1.0.10 // indirect github.com/ghodss/yaml v1.0.0 github.com/imdario/mergo v0.3.7 github.com/inconshreveable/mousetrap v1.0.0 // indirect @@ -14,7 +13,6 @@ require ( github.com/pkg/errors v0.8.1 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/spf13/cobra v0.0.3 - github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/testify v1.4.0 github.com/v3io/frames v0.7.33 github.com/v3io/v3io-go v0.1.9 diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go index 26fd9ea5..24273713 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go @@ -59,17 +59,20 @@ type MetricState struct { retryCount uint8 newName bool isVariant bool + + shouldGetState bool } // Metric store states type storeState uint8 const ( - storeStateInit storeState = 0 - storeStatePreGet storeState = 1 // Need to get state - storeStateGet storeState = 2 // Getting old state from storage - storeStateReady storeState = 3 // Ready to update - storeStateUpdate storeState = 4 // Update/write in progress + storeStateInit storeState = 0 + storeStatePreGet storeState = 1 // Need to get state + storeStateGet storeState = 2 // Getting old state from storage + storeStateReady storeState = 3 // Ready to update + storeStateUpdate storeState = 4 // Update/write in progress + storeStateAboutToUpdate storeState = 5 // Like ready state but with updates pending ) // store is ready to update samples into the DB @@ -77,6 +80,11 @@ func (m *MetricState) isReady() bool { return m.state == storeStateReady } +// Indicates whether the metric has no inflight requests and can send new ones +func (m *MetricState) canSendRequests() bool { + return m.state == storeStateReady || m.state == storeStateAboutToUpdate +} + func (m *MetricState) getState() storeState { return m.state } @@ -118,6 +126,9 @@ type MetricsCache struct { updatesComplete chan int newUpdates chan int + outstandingUpdates int64 + requestsInFlight int64 + lastMetric uint64 // TODO: consider switching to synch.Map (https://golang.org/pkg/sync/#Map) @@ -144,7 +155,6 @@ func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config newCache.asyncAppendChan = make(chan *asyncAppend, channelSize) newCache.metricQueue = NewElasticQueue() - newCache.updatesComplete = make(chan int, 100) newCache.newUpdates = make(chan int, 1000) newCache.stopChan = make(chan int, 3) @@ -155,10 +165,11 @@ func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config } type asyncAppend struct { - metric *MetricState - t int64 - v interface{} - resp chan int + metric *MetricState + t int64 + v interface{} + resp chan int + isCompletion bool } func (mc *MetricsCache) Start() error { diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go index eb349e0e..32d05592 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go @@ -24,6 +24,7 @@ import ( "fmt" "net/http" "reflect" + "sync/atomic" "time" "github.com/pkg/errors" @@ -47,8 +48,6 @@ func (mc *MetricsCache) start() error { func (mc *MetricsCache) metricFeed(index int) { go func() { - inFlight := 0 - gotData := false potentialCompletion := false var completeChan chan int @@ -56,30 +55,17 @@ func (mc *MetricsCache) metricFeed(index int) { select { case _ = <-mc.stopChan: return - case inFlight = <-mc.updatesComplete: - // Handle completion notifications from the update loop - length := mc.metricQueue.Length() - mc.logger.Debug(`Complete update cycle - "in-flight requests"=%d; "metric queue length"=%d\n`, inFlight, length) - - // If data was sent and the queue is empty, mark as completion - if length == 0 && gotData { - switch len(mc.asyncAppendChan) { - case 0: - potentialCompletion = true - if completeChan != nil { - completeChan <- 0 - } - case 1: - potentialCompletion = true - } - } case app := <-mc.asyncAppendChan: newMetrics := 0 dataQueued := 0 numPushed := 0 + gotCompletion := false inLoop: for i := 0; i <= mc.cfg.BatchSize; i++ { - if app.metric == nil { + // Handle completion notifications from the update loop + if app.isCompletion { + gotCompletion = true + } else if app.metric == nil { // Handle update completion requests (metric == nil) completeChan = app.resp if potentialCompletion { @@ -88,7 +74,6 @@ func (mc *MetricsCache) metricFeed(index int) { } else { potentialCompletion = false // Handle append requests (Add / AddFast) - gotData = true metric := app.metric metric.Lock() @@ -103,7 +88,7 @@ func (mc *MetricsCache) metricFeed(index int) { metric.setState(storeStatePreGet) } if metric.isReady() { - metric.setState(storeStateUpdate) + metric.setState(storeStateAboutToUpdate) } length := mc.metricQueue.Push(metric) @@ -124,7 +109,22 @@ func (mc *MetricsCache) metricFeed(index int) { } // Notify the update loop that there are new metrics to process if newMetrics > 0 { + atomic.AddInt64(&mc.outstandingUpdates, 1) mc.newUpdates <- newMetrics + } else if gotCompletion { + inFlight := atomic.LoadInt64(&mc.requestsInFlight) + outstanding := atomic.LoadInt64(&mc.outstandingUpdates) + if outstanding == 0 && inFlight == 0 { + switch len(mc.asyncAppendChan) { + case 0: + potentialCompletion = true + if completeChan != nil { + completeChan <- 0 + } + case 1: + potentialCompletion = true + } + } } // If we have too much work, stall the queue for some time @@ -154,7 +154,7 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) { return case _ = <-mc.newUpdates: // Handle new metric notifications (from metricFeed) - for mc.updatesInFlight < mc.cfg.Workers*2 { //&& newMetrics > 0{ + for mc.updatesInFlight < mc.cfg.Workers*2 { freeSlots := mc.cfg.Workers*2 - mc.updatesInFlight metrics := mc.metricQueue.PopN(freeSlots) for _, metric := range metrics { @@ -165,9 +165,11 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) { } } - if mc.updatesInFlight == 0 { - mc.logger.Debug("Complete new update cycle - in-flight %d.\n", mc.updatesInFlight) - mc.updatesComplete <- 0 + outstandingUpdates := atomic.AddInt64(&mc.outstandingUpdates, -1) + + if atomic.LoadInt64(&mc.requestsInFlight) == 0 && outstandingUpdates == 0 { + mc.logger.Debug("Return to feed after processing newUpdates") + mc.asyncAppendChan <- &asyncAppend{isCompletion: true} } case resp := <-mc.responseChan: // Handle V3IO async responses @@ -188,6 +190,7 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) { if i < mc.cfg.BatchSize { select { case resp = <-mc.responseChan: + atomic.AddInt64(&mc.requestsInFlight, -1) default: break inLoop } @@ -206,10 +209,12 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) { } } + requestsInFlight := atomic.AddInt64(&mc.requestsInFlight, -1) + // Notify the metric feeder when all in-flight tasks are done - if mc.updatesInFlight == 0 { - mc.logger.Debug("Return to feed. Metric queue length: %d", mc.metricQueue.Length()) - mc.updatesComplete <- 0 + if requestsInFlight == 0 && atomic.LoadInt64(&mc.outstandingUpdates) == 0 { + mc.logger.Debug("Return to feed after processing responseChan") + mc.asyncAppendChan <- &asyncAppend{isCompletion: true} } } } @@ -223,45 +228,70 @@ func (mc *MetricsCache) postMetricUpdates(metric *MetricState) { metric.Lock() defer metric.Unlock() var sent bool - var err error - if metric.getState() == storeStatePreGet { - sent, err = metric.store.getChunksState(mc, metric) - if err != nil { - // Count errors - mc.performanceReporter.IncrementCounter("GetChunksStateError", 1) - - mc.logger.ErrorWith("Failed to get item state", "metric", metric.Lset, "err", err) - setError(mc, metric, err) - } else { - metric.setState(storeStateGet) + // In case we are in pre get state or our data spreads across multiple partitions, get the new state for the current partition + if metric.getState() == storeStatePreGet || + (metric.canSendRequests() && metric.shouldGetState) { + sent = mc.sendGetMetricState(metric) + if sent { + mc.updatesInFlight++ } + } else if metric.canSendRequests() { + sent = mc.writeChunksAndGetState(metric) - } else { - sent, err = metric.store.writeChunks(mc, metric) - if err != nil { - // Count errors - mc.performanceReporter.IncrementCounter("WriteChunksError", 1) - - mc.logger.ErrorWith("Submit failed", "metric", metric.Lset, "err", err) - setError(mc, metric, errors.Wrap(err, "Chunk write submit failed.")) - } else if sent { - metric.setState(storeStateUpdate) - } if !sent { if metric.store.samplesQueueLength() == 0 { metric.setState(storeStateReady) } else { if mc.metricQueue.length() > 0 { + atomic.AddInt64(&mc.outstandingUpdates, 1) mc.newUpdates <- mc.metricQueue.length() } } } } +} + +func (mc *MetricsCache) sendGetMetricState(metric *MetricState) bool { + // If we are already in a get state, discard + if metric.getState() == storeStateGet { + return false + } + + sent, err := metric.store.getChunksState(mc, metric) + if err != nil { + // Count errors + mc.performanceReporter.IncrementCounter("GetChunksStateError", 1) + + mc.logger.ErrorWith("Failed to get item state", "metric", metric.Lset, "err", err) + setError(mc, metric, err) + } else { + metric.setState(storeStateGet) + } + + return sent +} + +func (mc *MetricsCache) writeChunksAndGetState(metric *MetricState) bool { + sent, err := metric.store.writeChunks(mc, metric) + if err != nil { + // Count errors + mc.performanceReporter.IncrementCounter("WriteChunksError", 1) + + mc.logger.ErrorWith("Submit failed", "metric", metric.Lset, "err", err) + setError(mc, metric, errors.Wrap(err, "Chunk write submit failed.")) + } else if sent { + metric.setState(storeStateUpdate) + } else if metric.shouldGetState { + // In case we didn't write any data and the metric state needs to be updated, update it straight away + sent = mc.sendGetMetricState(metric) + } + if sent { mc.updatesInFlight++ } + return sent } // Handle DB responses @@ -337,24 +367,18 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response, metric.setState(storeStateReady) var sent bool - var err error - - if canWrite { - sent, err = metric.store.writeChunks(mc, metric) - if err != nil { - // Count errors - mc.performanceReporter.IncrementCounter("WriteChunksError", 1) - mc.logger.ErrorWith("Submit failed", "metric", metric.Lset, "err", err) - setError(mc, metric, errors.Wrap(err, "Chunk write submit failed.")) - } else if sent { - metric.setState(storeStateUpdate) + // In case our data spreads across multiple partitions, get the new state for the current partition + if metric.shouldGetState { + sent = mc.sendGetMetricState(metric) + if sent { mc.updatesInFlight++ } - + } else if canWrite { + sent = mc.writeChunksAndGetState(metric) } else if metric.store.samplesQueueLength() > 0 { mc.metricQueue.Push(metric) - metric.setState(storeStateUpdate) + metric.setState(storeStateAboutToUpdate) } return sent @@ -385,6 +409,13 @@ func (mc *MetricsCache) nameUpdateRespLoop() { } resp.Release() + + requestsInFlight := atomic.AddInt64(&mc.requestsInFlight, -1) + + if requestsInFlight == 0 && atomic.LoadInt64(&mc.outstandingUpdates) == 0 { + mc.logger.Debug("Return to feed after processing nameUpdateChan") + mc.asyncAppendChan <- &asyncAppend{isCompletion: true} + } } } }() diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/store.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/store.go index c1e324e1..cce59bb1 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/store.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/appender/store.go @@ -25,6 +25,7 @@ import ( "fmt" "path/filepath" "sort" + "sync/atomic" "time" "github.com/nuclio/logger" @@ -156,8 +157,10 @@ func (cs *chunkStore) getChunksState(mc *MetricsCache, metric *MetricState) (boo getInput := v3io.GetItemInput{ Path: path, AttributeNames: []string{config.MaxTimeAttrName}} + atomic.AddInt64(&mc.requestsInFlight, 1) request, err := mc.container.GetItem(&getInput, metric, mc.responseChan) if err != nil { + atomic.AddInt64(&mc.requestsInFlight, -1) mc.logger.ErrorWith("Failed to send a GetItem request to the TSDB", "metric", metric.key, "err", err) return false, err } @@ -186,8 +189,10 @@ func (cs *chunkStore) processGetResp(mc *MetricsCache, metric *MetricState, resp path := filepath.Join(mc.cfg.TablePath, config.NamesDirectory, metric.name) putInput := v3io.PutItemInput{Path: path, Attributes: map[string]interface{}{}} + atomic.AddInt64(&mc.requestsInFlight, 1) request, err := mc.container.PutItem(&putInput, metric, mc.nameUpdateChan) if err != nil { + atomic.AddInt64(&mc.requestsInFlight, -1) cs.performanceReporter.IncrementCounter("PutNameError", 1) mc.logger.ErrorWith("Update-name PutItem failed", "metric", metric.key, "err", err) } else { @@ -199,6 +204,8 @@ func (cs *chunkStore) processGetResp(mc *MetricsCache, metric *MetricState, resp cs.performanceReporter.IncrementCounter("UpdateMetricError", 1) } + metric.shouldGetState = false + cs.maxTime = 0 // In case of an error, initialize max time back to default return } @@ -217,6 +224,7 @@ func (cs *chunkStore) processGetResp(mc *MetricsCache, metric *MetricState, resp // Set Last TableId - indicate that there is no need to create metric object cs.lastTid = cs.nextTid + metric.shouldGetState = false } // Append data to the right chunk and table based on the time and state @@ -297,6 +305,13 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen hasPendingUpdates = false return } + + // In case the current max time is not up to date, force a get state + if partition.GetStartTime() > cs.maxTime && cs.maxTime > 0 { + metric.shouldGetState = true + hasPendingUpdates = false + return + } if partition.GetStartTime() > cs.lastTid { notInitialized = true cs.lastTid = partition.GetStartTime() @@ -397,6 +412,11 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen pendingSamplesCount++ } + // In case we advanced to a newer partition mark we need to get state again + if pendingSampleIndex < len(cs.pending) && !partition.InRange(cs.pending[pendingSampleIndex].t) { + metric.shouldGetState = true + } + cs.aggrList.Clear() if pendingSampleIndex == len(cs.pending) { cs.pending = cs.pending[:0] @@ -440,9 +460,11 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen } expr += fmt.Sprintf("%v=%d;", config.MaxTimeAttrName, cs.maxTime) // TODO: use max() expr path := partition.GetMetricPath(metric.name, metric.hash, cs.labelNames, cs.isAggr()) + atomic.AddInt64(&mc.requestsInFlight, 1) request, err := mc.container.UpdateItem( &v3io.UpdateItemInput{Path: path, Expression: &expr, Condition: conditionExpr}, metric, mc.responseChan) if err != nil { + atomic.AddInt64(&mc.requestsInFlight, -1) mc.logger.ErrorWith("UpdateItem failed", "err", err) hasPendingUpdates = false } diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go index ed5a9ee3..761c854c 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go @@ -619,7 +619,12 @@ func deleteObjectWorker(container v3io.Container, deleteParams *DeleteParams, lo // Update the partition's max time if needed. if deleteParams.From < dbMaxTime && deleteParams.To >= dbMaxTime { if deleteParams.From < newMaxTime { - newMaxTime = deleteParams.From + // Limit the max time to be in the current partition's range + if deleteParams.From < currentPartition.GetStartTime() { + newMaxTime = currentPartition.GetStartTime() + } else { + newMaxTime = deleteParams.From + } } deleteUpdateExpression.WriteString(fmt.Sprintf("%v=%v;", config.MaxTimeAttrName, newMaxTime))