Skip to content

Commit

Permalink
Merge pull request #1636 from orbs-network/fix-restart-notification
Browse files Browse the repository at this point in the history
fix restart notifications
  • Loading branch information
noambergIL authored Nov 3, 2020
2 parents 03a643d + c21df4b commit 73e1a18
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 32 deletions.
15 changes: 14 additions & 1 deletion bootstrap/httpserver/server_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,22 @@ func TestHttpServer_PublicApiGetStatus(t *testing.T) {
res := make(map[string]interface{})
json.Unmarshal(rec.Body.Bytes(), &res)

require.Contains(t, res, "Timestamp")

h.server.metricRegistry.NewGauge("BlockStorage.FileSystemIndex.LastUpdateTime")
req, _ = http.NewRequest("Get", "/status", nil)
rec = httptest.NewRecorder()
h.server.getStatus(rec, req)

require.Equal(t, http.StatusOK, rec.Code, "should succeed")
require.Equal(t, "application/json", rec.Header().Get("Content-Type"), "should have our content type")

res = make(map[string]interface{})
json.Unmarshal(rec.Body.Bytes(), &res)

require.Contains(t, res, "Timestamp")
require.Contains(t, res, "Error")
require.Equal(t, "LeanHelix Service has not committed any blocks yet", res["Status"])
require.Equal(t, "Last successful blockstorage update (including index update on boot) was too long ago", res["Status"])
require.NotEmpty(t, res["Payload"])
})
})
Expand Down
37 changes: 19 additions & 18 deletions bootstrap/httpserver/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,27 @@ func (s *HttpServer) getStatus(w http.ResponseWriter, r *http.Request) {
}

func (s *HttpServer) getStatusWarningMessage() string {
maxTimeSinceLastBlock := s.config.TransactionPoolTimeBetweenEmptyBlocks().Nanoseconds() * 10
if maxTimeSinceLastBlock < 600000000 { // ten minutes
maxTimeSinceLastBlock = 600000000
}
leanHelixLastCommitteed := s.getGaugeValueFromMetrics("ConsensusAlgo.LeanHelix.LastCommitted.TimeNano")
if leanHelixLastCommitteed == 0 {
return "LeanHelix Service has not committed any blocks yet"
}
if leanHelixLastCommitteed + maxTimeSinceLastBlock < time.Now().UnixNano() {
return "Last Successful Committed Block was too long ago"
if metricObj := s.metricRegistry.Get("BlockStorage.FileSystemIndex.LastUpdateTime"); metricObj != nil {
maxTimeSinceLastBlockStorageUpdate := s.config.TransactionPoolTimeBetweenEmptyBlocks().Nanoseconds() * 10
if maxTimeSinceLastBlockStorageUpdate < 600000000 { // ten minutes
maxTimeSinceLastBlockStorageUpdate = 600000000
}
lastBlockStorageUpdateTime := s.getGaugeValueFromMetrics("BlockStorage.FileSystemIndex.LastUpdateTime")
if lastBlockStorageUpdateTime+maxTimeSinceLastBlockStorageUpdate < time.Now().UnixNano() {
return "Last successful blockstorage update (including index update on boot) was too long ago"
}
}

if len(s.config.ManagementFilePath()) != 0 && s.config.ManagementPollingInterval() > 0 {
maxIntervalSinceLastSuccessfulManagementUpdate := int64(s.config.ManagementPollingInterval().Seconds()) * 20
managementLastSuccessfullUpdate := s.getGaugeValueFromMetrics("Management.Data.LastSuccessfulUpdateTime")
if managementLastSuccessfullUpdate == 0 {
return "Management Service has never successfully updated"
}
if managementLastSuccessfullUpdate + maxIntervalSinceLastSuccessfulManagementUpdate < time.Now().Unix() {
return "Last successful Management Service update was too long ago"
if metricObj := s.metricRegistry.Get("Management.Data.LastSuccessfulUpdateTime"); metricObj != nil {
if len(s.config.ManagementFilePath()) != 0 && s.config.ManagementPollingInterval() > 0 {
maxIntervalSinceLastSuccessfulManagementUpdate := int64(s.config.ManagementPollingInterval().Seconds()) * 20
managementLastSuccessfullUpdate := s.getGaugeValueFromMetrics("Management.Data.LastSuccessfulUpdateTime")
if managementLastSuccessfullUpdate == 0 {
return "Management Service has never successfully updated"
}
if managementLastSuccessfullUpdate+maxIntervalSinceLastSuccessfulManagementUpdate < time.Now().Unix() {
return "Last successful Management Service update was too long ago"
}
}
}

Expand Down
19 changes: 13 additions & 6 deletions services/blockstorage/adapter/filesystem/construct_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package filesystem
import (
"bytes"
"github.com/orbs-network/go-mock"
"github.com/orbs-network/orbs-network-go/instrumentation/metric"
"github.com/orbs-network/orbs-network-go/test"
"github.com/orbs-network/orbs-network-go/test/builders"
"github.com/orbs-network/orbs-network-go/test/rand"
Expand Down Expand Up @@ -42,7 +43,8 @@ func TestConstructIndexFromReader(t *testing.T) {
return block, bytes, nil
})

blockHeightIndex, err := buildIndex(rw, 0, harness.Logger, codec)
metrics := generateBlockStorageMetrics(metric.NewRegistry())
blockHeightIndex, err := buildIndex(rw, 0, harness.Logger, codec, metrics)

require.NoError(t, err, "expected index to construct with no error")
require.EqualValues(t, numBlocks, blockHeightIndex.getLastBlockHeight(), "expected index to reach topHeight block height")
Expand Down Expand Up @@ -88,7 +90,8 @@ func TestConstructIndexFromReaderInterleavedOrder(t *testing.T) {
return block, bytes, nil
})

blockHeightIndex, err := buildIndex(rw, 0, harness.Logger, codec)
metrics := generateBlockStorageMetrics(metric.NewRegistry())
blockHeightIndex, err := buildIndex(rw, 0, harness.Logger, codec, metrics)

require.NoError(t, err, "expected index to construct with no error")
require.EqualValues(t, numBlocks, blockHeightIndex.getLastBlockHeight(), "expected index to reach topHeight block height")
Expand Down Expand Up @@ -135,7 +138,8 @@ func TestConstructIndexFromReaderInterleavedOrderWithGap(t *testing.T) {
return block, bytes, nil
})

blockHeightIndex, err := buildIndex(rw, 0, harness.Logger, codec)
metrics := generateBlockStorageMetrics(metric.NewRegistry())
blockHeightIndex, err := buildIndex(rw, 0, harness.Logger, codec, metrics)

require.NoError(t, err, "expected index to construct with no error")
require.EqualValues(t, getBlockHeight(sequentialTopBlock), blockHeightIndex.getLastBlockHeight(), "expected index to reach sequential top height")
Expand Down Expand Up @@ -206,7 +210,8 @@ func TestBuildIndexSucceedsIndexingFromReader(t *testing.T) {
codec := newCodec(1024 * 1024)
r, done := newBlockFileReadStream(t, ctrlRand, numBlocks, maxTransactions, maxStateDiffs, codec)

bhIndex, err := buildIndex(r, 0, harness.Logger, codec)
metrics := generateBlockStorageMetrics(metric.NewRegistry())
bhIndex, err := buildIndex(r, 0, harness.Logger, codec, metrics)

require.NoError(t, err, "expected buildIndex to succeed")
require.Equal(t, bhIndex.getLastBlockHeight(), primitives.BlockHeight(numBlocks), "expected block height to match the encoded block count")
Expand All @@ -231,7 +236,8 @@ func TestBuildIndexHandlesPartialReads(t *testing.T) {
r, done := newBlockFileReadStream(t, ctrlRand, numBlocks, maxTransactions, maxStateDiffs, codec)

rBuffered, done2 := OneByteAtATimeReader(t, r)
bhIndex, err := buildIndex(rBuffered, 0, harness.Logger, codec)
metrics := generateBlockStorageMetrics(metric.NewRegistry())
bhIndex, err := buildIndex(rBuffered, 0, harness.Logger, codec, metrics)

require.NoError(t, err, "expected buildIndex to succeed with a buffered reader")
require.Equal(t, bhIndex.getLastBlockHeight(), primitives.BlockHeight(numBlocks), "expected block height to match the encoded block count")
Expand All @@ -246,7 +252,8 @@ func TestBuildIndexHandlesEmptyFile(t *testing.T) {
codec := newCodec(1024 * 1024)

r := bytes.NewReader(make([]byte, 0, 0))
bhIndex, err := buildIndex(r, 0, harness.Logger, codec)
metrics := generateBlockStorageMetrics(metric.NewRegistry())
bhIndex, err := buildIndex(r, 0, harness.Logger, codec, metrics)

require.NoError(t, err, "expected buildIndex to succeed")
require.Equal(t, bhIndex.getLastBlockHeight(), primitives.BlockHeight(0), "expected block height to be zero")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ import (
"os"
"path/filepath"
"syscall"
"time"
)

type metrics struct {
sizeOnDisk *metric.Gauge
sizeOnDisk *metric.Gauge
indexLastUpdateTime *metric.Gauge
}

const blocksFilename = "blocks"

func newMetrics(m metric.Factory) *metrics {
func generateBlockStorageMetrics(m metric.Factory) *metrics {
return &metrics{
sizeOnDisk: m.NewGauge("BlockStorage.FileSystemSize.Bytes"),
sizeOnDisk: m.NewGauge("BlockStorage.FileSystemSize.Bytes"),
indexLastUpdateTime: m.NewGauge("BlockStorage.FileSystemIndex.LastUpdateTime"),
}
}

Expand Down Expand Up @@ -76,15 +79,15 @@ func (f *BlockPersistence) GracefulShutdown(shutdownContext context.Context) {

func NewBlockPersistence(conf config.FilesystemBlockPersistenceConfig, parent log.Logger, metricFactory metric.Factory) (*BlockPersistence, error) {
logger := parent.WithTags(log.String("adapter", "block-storage"))

metrics := generateBlockStorageMetrics(metricFactory)
codec := newCodec(conf.BlockStorageFileSystemMaxBlockSizeInBytes())

file, blocksOffset, err := openBlocksFile(conf, logger)
if err != nil {
return nil, err
}

bhIndex, err := buildIndex(bufio.NewReaderSize(file, 1024*1024), blocksOffset, logger, codec)
bhIndex, err := buildIndex(bufio.NewReaderSize(file, 1024*1024), blocksOffset, logger, codec, metrics)
if err != nil {
closeSilently(file, logger)
return nil, err
Expand All @@ -100,7 +103,7 @@ func NewBlockPersistence(conf config.FilesystemBlockPersistenceConfig, parent lo
bhIndex: bhIndex,
config: conf,
blockTracker: synchronization.NewBlockTracker(logger, uint64(bhIndex.getLastBlockHeight()), 5),
metrics: newMetrics(metricFactory),
metrics: metrics,
logger: logger,
blockWriter: newTip,
codec: codec,
Expand Down Expand Up @@ -225,7 +228,7 @@ func newFileBlockWriter(file *os.File, codec blockCodec, nextBlockOffset int64)
return result, nil
}

func buildIndex(r io.Reader, firstBlockOffset int64, logger log.Logger, c blockCodec) (*blockHeightIndex, error) {
func buildIndex(r io.Reader, firstBlockOffset int64, logger log.Logger, c blockCodec, metrics *metrics) (*blockHeightIndex, error) {
bhIndex := newBlockHeightIndex(logger, firstBlockOffset)
offset := int64(firstBlockOffset)
for {
Expand All @@ -242,6 +245,7 @@ func buildIndex(r io.Reader, firstBlockOffset int64, logger log.Logger, c blockC
if err != nil {
return nil, errors.Wrap(err, "failed building block height index")
}
metrics.indexLastUpdateTime.Update(time.Now().Unix())
offset = offset + int64(blockSize)
}
return bhIndex, nil
Expand All @@ -268,6 +272,7 @@ func (f *BlockPersistence) WriteNextBlock(blockPair *protocol.BlockPairContainer
return false, f.bhIndex.getLastBlockHeight(), errors.Wrap(err, "failed to update index after writing block")
}

f.metrics.indexLastUpdateTime.Update(time.Now().Unix())
f.metrics.sizeOnDisk.Add(int64(n))
return true, f.bhIndex.getLastBlockHeight(), nil
}
Expand Down

0 comments on commit 73e1a18

Please sign in to comment.