Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvovk/updsync #9134

Merged
merged 5 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ func (d *DownloadSnapshots) Run(ctx *Context) error {
freezeblocks.NewBlockReader(
freezeblocks.NewRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, snapshotVersion, log.Root()),
freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, snapshotVersion, log.Root())),
params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer))
params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer), []string{})
}

type RetrieveHistoricalState struct {
Expand Down
111 changes: 78 additions & 33 deletions diagnostics/diagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ type DiagnosticClient struct {
metricsMux *http.ServeMux
node *node.ErigonNode

snapshotDownload diaglib.SnapshotDownloadStatistics
syncStats diaglib.SyncStatistics
}

func NewDiagnosticClient(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) *DiagnosticClient {
return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, snapshotDownload: diaglib.SnapshotDownloadStatistics{}}
return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, syncStats: diaglib.SyncStatistics{}}
}

func (d *DiagnosticClient) Setup() {
d.runSnapshotListener()
d.runSegmentDownloadingListener()
d.runSegmentIndexingListener()
d.runSegmentIndexingFinishedListener()
d.runCurrentSyncStageListener()
d.runSyncStagesListListener()
}

func (d *DiagnosticClient) runSnapshotListener() {
Expand All @@ -44,19 +46,18 @@ func (d *DiagnosticClient) runSnapshotListener() {
cancel()
return
case info := <-ch:
d.snapshotDownload.Downloaded = info.Downloaded
d.snapshotDownload.Total = info.Total
d.snapshotDownload.TotalTime = info.TotalTime
d.snapshotDownload.DownloadRate = info.DownloadRate
d.snapshotDownload.UploadRate = info.UploadRate
d.snapshotDownload.Peers = info.Peers
d.snapshotDownload.Files = info.Files
d.snapshotDownload.Connections = info.Connections
d.snapshotDownload.Alloc = info.Alloc
d.snapshotDownload.Sys = info.Sys
d.snapshotDownload.DownloadFinished = info.DownloadFinished
d.snapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady
d.snapshotDownload.LogPrefix = info.LogPrefix
d.syncStats.SnapshotDownload.Downloaded = info.Downloaded
d.syncStats.SnapshotDownload.Total = info.Total
d.syncStats.SnapshotDownload.TotalTime = info.TotalTime
d.syncStats.SnapshotDownload.DownloadRate = info.DownloadRate
d.syncStats.SnapshotDownload.UploadRate = info.UploadRate
d.syncStats.SnapshotDownload.Peers = info.Peers
d.syncStats.SnapshotDownload.Files = info.Files
d.syncStats.SnapshotDownload.Connections = info.Connections
d.syncStats.SnapshotDownload.Alloc = info.Alloc
d.syncStats.SnapshotDownload.Sys = info.Sys
d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished
d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady

if info.DownloadFinished {
return
Expand All @@ -67,8 +68,8 @@ func (d *DiagnosticClient) runSnapshotListener() {
}()
}

func (d *DiagnosticClient) SnapshotDownload() diaglib.SnapshotDownloadStatistics {
return d.snapshotDownload
func (d *DiagnosticClient) SyncStatistics() diaglib.SyncStatistics {
return d.syncStats
}

func (d *DiagnosticClient) runSegmentDownloadingListener() {
Expand All @@ -85,11 +86,11 @@ func (d *DiagnosticClient) runSegmentDownloadingListener() {
cancel()
return
case info := <-ch:
if d.snapshotDownload.SegmentsDownloading == nil {
d.snapshotDownload.SegmentsDownloading = map[string]diaglib.SegmentDownloadStatistics{}
if d.syncStats.SnapshotDownload.SegmentsDownloading == nil {
d.syncStats.SnapshotDownload.SegmentsDownloading = map[string]diaglib.SegmentDownloadStatistics{}
}

d.snapshotDownload.SegmentsDownloading[info.Name] = info
d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info
}
}
}()
Expand Down Expand Up @@ -130,15 +131,15 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener() {
return
case info := <-ch:
found := false
for i := range d.snapshotDownload.SegmentIndexing.Segments {
if d.snapshotDownload.SegmentIndexing.Segments[i].SegmentName == info.SegmentName {
for i := range d.syncStats.SnapshotIndexing.Segments {
if d.syncStats.SnapshotIndexing.Segments[i].SegmentName == info.SegmentName {
found = true
d.snapshotDownload.SegmentIndexing.Segments[i].Percent = 100
d.syncStats.SnapshotIndexing.Segments[i].Percent = 100
}
}

if !found {
d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, diaglib.SnapshotSegmentIndexingStatistics{
d.syncStats.SnapshotIndexing.Segments = append(d.syncStats.SnapshotIndexing.Segments, diaglib.SnapshotSegmentIndexingStatistics{
SegmentName: info.SegmentName,
Percent: 100,
Alloc: 0,
Expand All @@ -151,26 +152,70 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener() {
}

func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd diaglib.SnapshotIndexingStatistics) {
if d.snapshotDownload.SegmentIndexing.Segments == nil {
d.snapshotDownload.SegmentIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{}
if d.syncStats.SnapshotIndexing.Segments == nil {
d.syncStats.SnapshotIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{}
}

for i := range upd.Segments {
found := false
for j := range d.snapshotDownload.SegmentIndexing.Segments {
if d.snapshotDownload.SegmentIndexing.Segments[j].SegmentName == upd.Segments[i].SegmentName {
d.snapshotDownload.SegmentIndexing.Segments[j].Percent = upd.Segments[i].Percent
d.snapshotDownload.SegmentIndexing.Segments[j].Alloc = upd.Segments[i].Alloc
d.snapshotDownload.SegmentIndexing.Segments[j].Sys = upd.Segments[i].Sys
for j := range d.syncStats.SnapshotIndexing.Segments {
if d.syncStats.SnapshotIndexing.Segments[j].SegmentName == upd.Segments[i].SegmentName {
d.syncStats.SnapshotIndexing.Segments[j].Percent = upd.Segments[i].Percent
d.syncStats.SnapshotIndexing.Segments[j].Alloc = upd.Segments[i].Alloc
d.syncStats.SnapshotIndexing.Segments[j].Sys = upd.Segments[i].Sys
found = true
break
}
}

if !found {
d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, upd.Segments[i])
d.syncStats.SnapshotIndexing.Segments = append(d.syncStats.SnapshotIndexing.Segments, upd.Segments[i])
}
}

d.snapshotDownload.SegmentIndexing.TimeElapsed = upd.TimeElapsed
d.syncStats.SnapshotIndexing.TimeElapsed = upd.TimeElapsed
}

func (d *DiagnosticClient) runSyncStagesListListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SyncStagesList](context.Background(), 1)
defer cancel()

rootCtx, _ := common.RootContext()

diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SyncStagesList{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.syncStats.SyncStages.StagesList = info.Stages
return
}
}
}()
}

func (d *DiagnosticClient) runCurrentSyncStageListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.CurrentSyncStage](context.Background(), 1)
defer cancel()

rootCtx, _ := common.RootContext()

diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.CurrentSyncStage{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.syncStats.SyncStages.CurrentStage = info.Stage
if int(d.syncStats.SyncStages.CurrentStage) >= len(d.syncStats.SyncStages.StagesList) {
return
}
}
}
}()
}
2 changes: 1 addition & 1 deletion diagnostics/snapshot_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ func SetupStagesAccess(metricsMux *http.ServeMux, diag *DiagnosticClient) {
}

func writeStages(w http.ResponseWriter, diag *DiagnosticClient) {
json.NewEncoder(w).Encode(diag.SnapshotDownload())
json.NewEncoder(w).Encode(diag.SyncStatistics())
}
29 changes: 27 additions & 2 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type PeerStatistics struct {
TypeBytesOut map[string]uint64
}

type SyncStatistics struct {
SyncStages SyncStages `json:"syncStages"`
SnapshotDownload SnapshotDownloadStatistics `json:"snapshotDownload"`
SnapshotIndexing SnapshotIndexingStatistics `json:"snapshotIndexing"`
}

type SnapshotDownloadStatistics struct {
Downloaded uint64 `json:"downloaded"`
Total uint64 `json:"total"`
Expand All @@ -42,9 +48,7 @@ type SnapshotDownloadStatistics struct {
Sys uint64 `json:"sys"`
DownloadFinished bool `json:"downloadFinished"`
SegmentsDownloading map[string]SegmentDownloadStatistics `json:"segmentsDownloading"`
SegmentIndexing SnapshotIndexingStatistics `json:"segmentsIndexing"`
TorrentMetadataReady int32 `json:"torrentMetadataReady"`
LogPrefix string `json:"logPrefix"`
}

type SegmentDownloadStatistics struct {
Expand Down Expand Up @@ -73,6 +77,19 @@ type SnapshotSegmentIndexingFinishedUpdate struct {
SegmentName string `json:"segmentName"`
}

type SyncStagesList struct {
Stages []string `json:"stages"`
}

type CurrentSyncStage struct {
Stage uint `json:"stage"`
}

type SyncStages struct {
StagesList []string `json:"stagesList"`
CurrentStage uint `json:"currentStage"`
}

func (ti SnapshotDownloadStatistics) Type() Type {
return TypeOf(ti)
}
Expand All @@ -88,3 +105,11 @@ func (ti SnapshotIndexingStatistics) Type() Type {
func (ti SnapshotSegmentIndexingFinishedUpdate) Type() Type {
return TypeOf(ti)
}

func (ti SyncStagesList) Type() Type {
return TypeOf(ti)
}

func (ti CurrentSyncStage) Type() Type {
return TypeOf(ti)
}
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
cfg.notifier.Events.OnNewSnapshot()
}
} else {
if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.historyV3, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader); err != nil {
if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.historyV3, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil {
return err
}
}
Expand Down
51 changes: 37 additions & 14 deletions eth/stagedsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
Expand All @@ -21,13 +22,14 @@ type Sync struct {
unwindReason UnwindReason
posTransition *uint64

stages []*Stage
unwindOrder []*Stage
pruningOrder []*Stage
currentStage uint
timings []Timing
logPrefixes []string
logger log.Logger
stages []*Stage
unwindOrder []*Stage
pruningOrder []*Stage
currentStage uint
timings []Timing
logPrefixes []string
logger log.Logger
stagesIdsList []string
}

type Timing struct {
Expand Down Expand Up @@ -86,6 +88,11 @@ func (s *Sync) NextStage() {
return
}
s.currentStage++

isDiagEnabled := diagnostics.TypeOf(diagnostics.CurrentSyncStage{}).Enabled()
if isDiagEnabled {
diagnostics.Send(diagnostics.CurrentSyncStage{Stage: s.currentStage})
}
}

// IsBefore returns true if stage1 goes before stage2 in staged sync
Expand Down Expand Up @@ -144,10 +151,22 @@ func (s *Sync) LogPrefix() string {
return s.logPrefixes[s.currentStage]
}

func (s *Sync) StagesIdsList() []string {
if s == nil {
return []string{}
}
return s.stagesIdsList
}

func (s *Sync) SetCurrentStage(id stages.SyncStage) error {
for i, stage := range s.stages {
if stage.ID == id {
s.currentStage = uint(i)
isDiagEnabled := diagnostics.TypeOf(diagnostics.CurrentSyncStage{}).Enabled()
if isDiagEnabled {
diagnostics.Send(diagnostics.CurrentSyncStage{Stage: s.currentStage})
}

return nil
}
}
Expand All @@ -173,19 +192,23 @@ func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, prune
}
}
}

logPrefixes := make([]string, len(stagesList))
stagesIdsList := make([]string, len(stagesList))
for i := range stagesList {
logPrefixes[i] = fmt.Sprintf("%d/%d %s", i+1, len(stagesList), stagesList[i].ID)
stagesIdsList[i] = string(stagesList[i].ID)
}

return &Sync{
cfg: cfg,
stages: stagesList,
currentStage: 0,
unwindOrder: unwindStages,
pruningOrder: pruneStages,
logPrefixes: logPrefixes,
logger: logger,
cfg: cfg,
stages: stagesList,
currentStage: 0,
unwindOrder: unwindStages,
pruningOrder: pruneStages,
logPrefixes: logPrefixes,
logger: logger,
stagesIdsList: stagesIdsList,
}
}

Expand Down
5 changes: 2 additions & 3 deletions turbo/snapshotsync/snapshotsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func RequestSnapshotsDownload(ctx context.Context, downloadRequest []services.Do

// WaitForDownloader - wait for Downloader service to download all expected snapshots
// for MVP we sync with Downloader only once, in future will send new snapshots also
func WaitForDownloader(ctx context.Context, logPrefix string, histV3 bool, caplin CaplinMode, agg *state.AggregatorV3, tx kv.RwTx, blockReader services.FullBlockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient) error {
func WaitForDownloader(ctx context.Context, logPrefix string, histV3 bool, caplin CaplinMode, agg *state.AggregatorV3, tx kv.RwTx, blockReader services.FullBlockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient, stagesIdsList []string) error {
snapshots := blockReader.Snapshots()
borSnapshots := blockReader.BorSnapshots()
if blockReader.FreezingCfg().NoDownloader {
Expand Down Expand Up @@ -157,12 +157,12 @@ Loop:
Alloc: m.Alloc,
Sys: m.Sys,
DownloadFinished: stats.Completed,
LogPrefix: logPrefix,
})

log.Info(fmt.Sprintf("[%s] download finished", logPrefix), "time", time.Since(downloadStartTime).String())
break Loop
} else {
diagnostics.Send(diagnostics.SyncStagesList{Stages: stagesIdsList})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don’t understand meaning of this line

diagnostics.Send(diagnostics.SnapshotDownloadStatistics{
Downloaded: stats.BytesCompleted,
Total: stats.BytesTotal,
Expand All @@ -176,7 +176,6 @@ Loop:
Sys: m.Sys,
DownloadFinished: stats.Completed,
TorrentMetadataReady: stats.MetadataReady,
LogPrefix: logPrefix,
})

if stats.MetadataReady < stats.FilesTotal {
Expand Down
Loading