Skip to content

Commit

Permalink
fix: awaiting publish confirmation count (#1778)
Browse files Browse the repository at this point in the history
* fix awaiting publish confirmation count

* update devnet versions

* fix lint error

* fix nil pointer error

* fix comparison
  • Loading branch information
LexLuthr authored Oct 26, 2023
1 parent 7b92e23 commit fc598a9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 10 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ docsgen-openrpc-boost: docsgen-openrpc-bin

## DOCKER IMAGES
docker_user?=filecoin
lotus_version?=v1.23.4-rc1
lotus_version?=v1.25.0-rc1
ffi_from_source?=0
build_lotus?=0
build_boost?=1
boost_version?=v2.1.0-rc1
boost_version?=v2.1.0-rc2
ifeq ($(build_boost),1)
#v1: build boost images currently checked out branch
boost_build_cmd=docker/boost
Expand Down
28 changes: 20 additions & 8 deletions gql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (r *resolver) Deal(ctx context.Context, args struct{ ID graphql.ID }) (*dea
return nil, err
}

return newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, r.spApi), nil
return newDealResolver(r.mpool, deal, r.provider, r.dealsDB, r.logsDB, r.spApi), nil
}

type filterArgs struct {
Expand Down Expand Up @@ -178,7 +178,7 @@ func (r *resolver) Deals(ctx context.Context, args dealsArgs) (*dealListResolver
resolvers := make([]*dealResolver, 0, len(deals))
for _, deal := range deals {
deal.NBytesReceived = int64(r.provider.NBytesReceived(deal.DealUuid))
resolvers = append(resolvers, newDealResolver(&deal, r.provider, r.dealsDB, r.logsDB, r.spApi))
resolvers = append(resolvers, newDealResolver(r.mpool, &deal, r.provider, r.dealsDB, r.logsDB, r.spApi))
}

return &dealListResolver{
Expand Down Expand Up @@ -211,7 +211,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
}

net := make(chan *dealResolver, 1)
net <- newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, r.spApi)
net <- newDealResolver(r.mpool, deal, r.provider, r.dealsDB, r.logsDB, r.spApi)

// Updates to deal state are broadcast on pubsub. Pipe these updates to the
// client
Expand All @@ -223,7 +223,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
}
return nil, fmt.Errorf("%s: subscribing to deal updates: %w", args.ID, err)
}
sub := &subLastUpdate{sub: dealUpdatesSub, provider: r.provider, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi}
sub := &subLastUpdate{sub: dealUpdatesSub, provider: r.provider, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi, mpool: r.mpool}
go func() {
sub.Pipe(ctx, net) // blocks until connection is closed
close(net)
Expand Down Expand Up @@ -262,7 +262,7 @@ func (r *resolver) DealNew(ctx context.Context) (<-chan *dealNewResolver, error)
case evti := <-sub.Out():
// Pipe the deal to the new deal channel
di := evti.(types.ProviderDealState)
rsv := newDealResolver(&di, r.provider, r.dealsDB, r.logsDB, r.spApi)
rsv := newDealResolver(r.mpool, &di, r.provider, r.dealsDB, r.logsDB, r.spApi)
totalCount, err := r.dealsDB.Count(ctx, "", nil)
if err != nil {
log.Errorf("getting total deal count: %w", err)
Expand Down Expand Up @@ -399,6 +399,7 @@ func (r *resolver) dealList(ctx context.Context, query string, filter *db.Filter
}

type dealResolver struct {
mpool *mpoolmonitor.MpoolMonitor
types.ProviderDealState
provider *storagemarket.Provider
transferred uint64
Expand All @@ -407,8 +408,9 @@ type dealResolver struct {
spApi sealingpipeline.API
}

func newDealResolver(deal *types.ProviderDealState, provider *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, spApi sealingpipeline.API) *dealResolver {
func newDealResolver(mpool *mpoolmonitor.MpoolMonitor, deal *types.ProviderDealState, provider *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, spApi sealingpipeline.API) *dealResolver {
return &dealResolver{
mpool: mpool,
ProviderDealState: *deal,
provider: provider,
transferred: uint64(deal.NBytesReceived),
Expand Down Expand Up @@ -612,7 +614,16 @@ func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints.
case dealcheckpoints.Transferred:
return "Ready to Publish"
case dealcheckpoints.Published:
elapsedEpochs := uint64(time.Since(checkpointAt).Seconds()) / build.BlockDelaySecs
if *dr.PublishCID == cid.Undef {
return "Awaiting Message CID"
}
found, elapsedEpochs, err := dr.mpool.MsgExecElapsedEpochs(ctx, *dr.PublishCID)
if found {
return "Awaiting Message Execution"
}
if err != nil {
return fmt.Sprint(err)
}
confidenceEpochs := build.MessageConfidence * 2
return fmt.Sprintf("Awaiting Publish Confirmation (%d/%d epochs)", elapsedEpochs, confidenceEpochs)
case dealcheckpoints.PublishConfirmed:
Expand Down Expand Up @@ -719,6 +730,7 @@ type subLastUpdate struct {
dealsDB *db.DealsDB
logsDB *db.LogsDB
spApi sealingpipeline.API
mpool *mpoolmonitor.MpoolMonitor
}

func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) {
Expand Down Expand Up @@ -757,7 +769,7 @@ func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) {
loop:
for {
di := lastUpdate.(types.ProviderDealState)
rsv := newDealResolver(&di, s.provider, s.dealsDB, s.logsDB, s.spApi)
rsv := newDealResolver(s.mpool, &di, s.provider, s.dealsDB, s.logsDB, s.spApi)

select {
case <-ctx.Done():
Expand Down
23 changes: 23 additions & 0 deletions lib/mpoolmonitor/mpoolmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,26 @@ func (mm *MpoolMonitor) Stop(ctx context.Context) error {
mm.cancel()
return nil
}

func (mm *MpoolMonitor) MsgInMpool(msgCid cid.Cid) bool {
mm.lk.Lock()
defer mm.lk.Unlock()
_, ok := mm.msgs[msgCid]
return ok
}

func (mm *MpoolMonitor) MsgExecElapsedEpochs(ctx context.Context, msgCid cid.Cid) (bool, abi.ChainEpoch, error) {
found := mm.MsgInMpool(msgCid)
if found {
return found, 0, nil
}
x, err := mm.fullNode.StateSearchMsg(ctx, types.EmptyTSK, msgCid, abi.ChainEpoch(20), true)
if err != nil {
return found, 0, fmt.Errorf("searching message: %w", err)
}
c, err := mm.fullNode.ChainHead(ctx)
if err != nil {
return found, 0, fmt.Errorf("getting chain head: %w", err)
}
return found, c.Height() - x.Height, nil
}

0 comments on commit fc598a9

Please sign in to comment.