-
-
Notifications
You must be signed in to change notification settings - Fork 679
Use IsBlacklistedOrBackingOff
to determine if we should try to fetch devices
#3254
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #3254 +/- ##
==========================================
- Coverage 65.51% 65.51% -0.01%
==========================================
Files 507 507
Lines 57217 57245 +28
==========================================
+ Hits 37484 37502 +18
- Misses 15892 15899 +7
- Partials 3841 3844 +3
Flags with carried forward coverage won't be shown. Click here to find out more.
☔ View full report in Codecov by Sentry. |
query the given server or not
IsBlacklistedOrBackingOff
to determine if we should try to fetch devices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good, few things.
var federationClientError *fedsenderapi.FederationClientError | ||
if errors.As(err, &federationClientError) { | ||
if federationClientError.Blacklisted { | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs unit tests.
hash := fnv.New32a() | ||
_, _ = hash.Write([]byte(remoteServer)) | ||
index := int(int64(hash.Sum32()) % int64(len(u.workerChans))) | ||
|
||
ch := u.assignChannel(userID) | ||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc() | ||
defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove? Using a defer
is much cleaner than what we have now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just because we sent the work to the worker doesn't mean the back pressure is gone, the worker might still be processing the server. So decrementing doesn't make sense here.
We're incrementing right before trying to send the work and the worker decrements it once it is done processing the server. (either when skipping or actually done)
// The channel is at capacity, don't try to send more work | ||
if len(ch) == cap(ch) { | ||
continue | ||
} | ||
serversToRetry = serversToRetry[:0] // reuse memory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super unclear that it's nuking the serversToRetry
slice, and why. Can we add more comments here please?
@@ -431,6 +460,7 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) { | |||
_, exists := retries[serverName] | |||
retriesMu.Unlock() | |||
if exists { | |||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am unconvinced that this is going to track counts correctly. It is incremented for every call to notifyWorkers(userID)
but only decremented under very certain scenarios (when not full, etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.
…vice-list-updater-again
…vice-list-updater-again
case "localhost": | ||
delete(expectedServers, serverName) | ||
aliceCh <- true // unblock notifyWorkers | ||
case "notlocalhost": // this should not happen as it is "filtered" away by the blacklist |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case "notlocalhost": // this should not happen as it is "filtered" away by the blacklist | |
case unreachableServer: // this should not happen as it is "filtered" away by the blacklist |
userIDToChan: make(map[string]chan bool), | ||
userIDToMutex: make(map[string]*sync.Mutex), | ||
} | ||
workerCh := make(chan spec.ServerName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer close(workerCh)?
Use
IsBlacklistedOrBackingOff
from the federation API to check if we should fetch devices.To reduce back pressure, we now only queue retrying servers if there's space in the channel.