-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LogPoller e2e test fixes #1046
LogPoller e2e test fixes #1046
Changes from all commits
942f1da
4ad5f72
0aedb57
2e217cd
39491b9
ff42fa4
6275202
4d1d043
d8b7bc7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,8 +30,8 @@ type filters struct { | |
filtersToDelete map[int64]Filter | ||
filtersMutex sync.RWMutex | ||
loadedFilters atomic.Bool | ||
knownPrograms map[string]uint // fast lookup to see if a base58-encoded ProgramID matches any registered filters | ||
knownDiscriminators map[string]uint // fast lookup based on raw discriminator bytes as string | ||
knownPrograms map[string]uint // fast lookup to see if a base58-encoded ProgramID matches any registered filters | ||
knownDiscriminators map[EventSignature]uint // fast lookup based on raw discriminator bytes as string | ||
seqNums map[int64]int64 | ||
decoders map[int64]Decoder | ||
discriminatorExtractor codec.DiscriminatorExtractor | ||
|
@@ -179,7 +179,7 @@ func (fl *filters) addToIndices(filter Filter, decoder Decoder) error { | |
|
||
programID := filter.Address.ToSolana().String() | ||
fl.knownPrograms[programID]++ | ||
fl.knownDiscriminators[filter.DiscriminatorRawBytes()]++ | ||
fl.knownDiscriminators[filter.EventSig]++ | ||
return nil | ||
} | ||
|
||
|
@@ -255,13 +255,12 @@ func (fl *filters) removeFilterFromIndexes(filter Filter) { | |
} | ||
} | ||
|
||
discriminator := filter.DiscriminatorRawBytes() | ||
if refcount, ok := fl.knownDiscriminators[discriminator]; ok { | ||
if refcount, ok := fl.knownDiscriminators[filter.EventSig]; ok { | ||
refcount-- | ||
if refcount > 0 { | ||
fl.knownDiscriminators[discriminator] = refcount | ||
fl.knownDiscriminators[filter.EventSig] = refcount | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're renaming |
||
} else { | ||
delete(fl.knownDiscriminators, discriminator) | ||
delete(fl.knownDiscriminators, filter.EventSig) | ||
} | ||
} | ||
} | ||
|
@@ -338,7 +337,7 @@ func (fl *filters) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[F | |
return ok | ||
} | ||
|
||
_, ok = fl.knownDiscriminators[string(discriminator)] | ||
_, ok = fl.knownDiscriminators[discriminator] | ||
return ok | ||
} | ||
|
||
|
@@ -352,7 +351,7 @@ func (fl *filters) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[F | |
return nil | ||
} | ||
|
||
return fl.matchingFilters(PublicKey(addr), EventSignature(discriminator)) | ||
return fl.matchingFilters(PublicKey(addr), discriminator) | ||
} | ||
|
||
// GetFiltersToBackfill - returns copy of backfill queue | ||
|
@@ -413,7 +412,7 @@ func (fl *filters) LoadFilters(ctx context.Context) error { | |
fl.filtersToBackfill = make(map[int64]struct{}) | ||
fl.filtersToDelete = make(map[int64]Filter) | ||
fl.knownPrograms = make(map[string]uint) | ||
fl.knownDiscriminators = make(map[string]uint) | ||
fl.knownDiscriminators = make(map[EventSignature]uint) | ||
|
||
filters, err := fl.orm.SelectFilters(ctx) | ||
if err != nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -112,14 +112,14 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots | |
blocks := make(chan Block) | ||
getBlockJobs := make([]*getBlockJob, len(slots)) | ||
for i, slot := range slots { | ||
getBlockJobs[i] = newGetBlockJob(c.client, blocks, c.lggr, slot) | ||
getBlockJobs[i] = newGetBlockJob(ctx.Done(), c.client, blocks, c.lggr, slot) | ||
err := c.workers.Do(ctx, getBlockJobs[i]) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not schedule job to fetch blocks for slot: %w", err) | ||
} | ||
} | ||
|
||
c.engine.Go(func(ctx context.Context) { | ||
go func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to cancel this goroutine if iteration gets aborted |
||
for _, job := range getBlockJobs { | ||
select { | ||
case <-ctx.Done(): | ||
|
@@ -129,7 +129,7 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots | |
} | ||
} | ||
close(blocks) | ||
}) | ||
}() | ||
|
||
return blocks, nil | ||
} | ||
|
@@ -142,6 +142,13 @@ func (c *EncodedLogCollector) BackfillForAddresses(ctx context.Context, addresse | |
|
||
c.lggr.Debugw("Got all slots that need fetching for backfill operations", "addresses", PublicKeysToString(addresses), "fromSlot", fromSlot, "toSlot", toSlot, "slotsToFetch", slotsToFetch) | ||
|
||
ctx, cancelJobs := context.WithCancel(ctx) | ||
defer func() { | ||
// if failed to start backfill process - cancel jobs | ||
if err != nil { | ||
cancelJobs() | ||
} | ||
}() | ||
unorderedBlocks, err := c.scheduleBlocksFetching(ctx, slotsToFetch) | ||
if err != nil { | ||
return nil, func() {}, fmt.Errorf("failed to schedule blocks to fetch: %w", err) | ||
|
@@ -153,6 +160,7 @@ func (c *EncodedLogCollector) BackfillForAddresses(ctx context.Context, addresse | |
} | ||
|
||
cleanUp = func() { | ||
cancelJobs() | ||
err := blocksSorter.Close() | ||
if err != nil { | ||
blocksSorter.lggr.Errorw("Failed to close blocks sorter", "err", err) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.