-
Notifications
You must be signed in to change notification settings - Fork 367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/delete sensor #7523
Feature/delete sensor #7523
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Fairly minor but important changes requested.
pkg/config/config.go
Outdated
@@ -287,6 +287,7 @@ type Config struct { | |||
Graveler struct { | |||
EnsureReadableRootNamespace bool `mapstructure:"ensure_readable_root_namespace"` | |||
BatchDBIOTransactionMarkers bool `mapstructure:"batch_dbio_transaction_markers"` | |||
TriggerDeleteSensorAt int `mapstructure:"trigger_delete_sensor_at"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what this parameter does. Can you document it, or perhaps add units (does num_deletes_to_trigger_delete_sensor
make sense?), or change the name?
pkg/graveler/delete_sensor.go
Outdated
stagingTokenID StagingToken | ||
} | ||
|
||
func (s *stagingTokenData) CombinedKey() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document this function. Why do we need it? Go doesn't need a string key to index a map, so if it's just for that -- you can use a struct of strings as a map key and it will work.
pkg/graveler/delete_sensor.go
Outdated
} | ||
|
||
type DeleteSensor struct { | ||
ctx context.Context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every task should probably have its own context. For instance, that allows logs to carry a request ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! looks like leftovers, it isn't used
pkg/graveler/delete_sensor.go
Outdated
triggerAt: triggerAt, | ||
stopped: 0, | ||
graceDuration: graceDuration, | ||
mutex: &sync.RWMutex{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why not
mutex: &sync.RWMutex{}, | |
mutex: &sync.Mutex{}, |
pkg/graveler/delete_sensor.go
Outdated
// stopped used as flag that the sensor has stopped. stop processing CountDelete. | ||
stopped int32 | ||
graceDuration time.Duration | ||
branchTombstoneCounter map[string]*StagingTokenCounter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document the key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to map[RepositoryID]map[BranchID]*StagingTokenCounter. Please comment again if you still think documentation is needed
if stCounter.StagingTokenID != st.stagingTokenID { | ||
stCounter.StagingTokenID = st.stagingTokenID | ||
stCounter.Counter = 1 | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why drop the count of the sealed token? Its entries will still need to be skipped.
I think that the assumption here is that if the token was sealed then a commit is in progress, meaning we will soon be rid of those entries. But then please document this decision -- we may have other reasons to seal in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done to:
- Avoid the need for exposing a reset method and calling across the code (for now)
- An "easy" way to reset cross lakeFS instances when a
commit
orcompaction
is done.
It isn't the best way and has many downsides, but I think it will do for the log implementation, and will give us valid information for how we would like to go on
Adding documentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might have a weird order of staging token for a branch, a transition of staging tokens might look like:
st1,st1,st1,st2,st1,st2,st2,st2
. How will this effect the sensor logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with opening an issue to refactor this and revisit at a later date.
At such a later date, I would probably try to fill out this sketch: Keep a sync.Map of branch or staging token triggers (I'm fine with either, because we don't care about losing a few counts -- in any case if we need to compact more than once a second, then we need to do something better than compaction!). Whenever we have to, atomically increment the counter on the trigger. If the count-after-increment is (exactly) 1000:
- If we count per branch, atomically decrement it by 1000 and trigger compaction or whatever.
- If we count per staging token, atomically replace the staging token trigger, and we're happy to lose the excess counts!
pkg/graveler/delete_sensor.go
Outdated
stCounter.Counter = 1 | ||
return | ||
} | ||
if stCounter.Counter >= s.triggerAt-1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if stCounter.Counter >= s.triggerAt-1 { | |
if stCounter.Counter >= s.triggerAt-1 { |
This is odd. If triggerAt is 2, I would not expect the sensor to fire at the first deletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't, but I agree it makes the code unclear, did this to avoid another if.
Changed
pkg/graveler/delete_sensor.go
Outdated
if !atomic.CompareAndSwapInt32(&s.stopped, 0, 1) { | ||
return | ||
} | ||
s.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth logging if stop()
timed out and returned: there may be errors during shutdown, and they will be less scary and easier to understand if Close() explains what happens.
pkg/graveler/delete_sensor_test.go
Outdated
triggredBranches[string(repositoryID)+"-"+string(branchID)]++ | ||
} | ||
sensor := graveler.NewDeleteSensor(ctx, tc.triggerAt, cb) | ||
//defer sensor.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed, it's closed later
pkg/graveler/graveler.go
Outdated
@@ -2382,10 +2384,21 @@ func (g *Graveler) Reset(ctx context.Context, repository *RepositoryRecord, bran | |||
return nil | |||
} | |||
|
|||
func (g *Graveler) deleteAndNotify(ctx context.Context, repositoryID RepositoryID, branchID BranchID, branch *Branch, key Key, requireExists bool) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a godoc. Especially the "notify" bit :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moar comments about stopping.
pkg/graveler/delete_sensor.go
Outdated
} | ||
|
||
func (s *DeleteSensor) CountDelete(ctx context.Context, repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken) { | ||
if s.isStopped() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't run very often: it means that stopping the program has to wait for a delete before the sensor stops running, so Close will often wait and then time out!
pkg/graveler/delete_sensor.go
Outdated
ctx: ctx, | ||
cb: cb, | ||
triggerAt: triggerAt, | ||
stopped: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is racy and slow. Consider just closing thye callbacks chan to stop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks,
Not blocking as to not delay you but please note the comments
pkg/config/config.go
Outdated
RepositoryCache struct { | ||
EnsureReadableRootNamespace bool `mapstructure:"ensure_readable_root_namespace"` | ||
BatchDBIOTransactionMarkers bool `mapstructure:"batch_dbio_transaction_markers"` | ||
NumDeletesToTriggerDeleteSensor int `mapstructure:"num_deletes_to_trigger_delete_sensor"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a good name :)
Consider CompactionSensorThreashold
or DeleteSensorThreashold
NumDeletesToTriggerDeleteSensor int `mapstructure:"num_deletes_to_trigger_delete_sensor"` | |
CompactionSensorThreashold int `mapstructure:"compaction_sensor_threshold"` |
pkg/config/config.go
Outdated
RepositoryCache struct { | ||
EnsureReadableRootNamespace bool `mapstructure:"ensure_readable_root_namespace"` | ||
BatchDBIOTransactionMarkers bool `mapstructure:"batch_dbio_transaction_markers"` | ||
NumDeletesToTriggerDeleteSensor int `mapstructure:"num_deletes_to_trigger_delete_sensor"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a minimal value for it, otherwise we might risk damaging performance instead of improving it.
We can set it now for an arbitrary value (1000) and tune it once the compaction mechanism is in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what value we get from protecting with a minimum number. If for some reason (e.g. we add more lakeFS instances) we decide to use a smaller number, why would we want to be blocked by an arbitrary value?
pkg/graveler/delete_sensor.go
Outdated
s.branchTombstoneCounter[st.repositoryID][st.branchID] = stCounter | ||
return | ||
} | ||
// Reset the counter if the staging token has changed, under the assumption that staging tokens are updated only during sealing processes, which occur following a commit or compaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Reset the counter if the staging token has changed, under the assumption that staging tokens are updated only during sealing processes, which occur following a commit or compaction. | |
// Reset the counter if the staging token has changed, under the assumption that staging tokens are updated only during sealing processes, which occur following a failed commit or compaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sealed tokens are created only on a failed commit. On a successful commit the tokens are cleared. So either "during a commit" or "following a failed commit"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
during a commit makes sense to me, changing
|
||
type DeleteSensorCB func(repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken, inGrace bool) | ||
|
||
type StagingTokenCounter struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add documentation to all the structs
var deleteSensor *graveler.DeleteSensor | ||
if cfg.Config.Graveler.NumDeletesToTriggerDeleteSensor > 0 { | ||
cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) { | ||
logging.FromContext(ctx).WithFields(logging.Fields{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider what kind of experiment we're trying to do here. What would we like to see, how will it change our assumptions.
As part of this we should also consider the visibility of the data. Is logging enough? Do we want to add this to the statistics?
These are all questions, no concrete action requested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently leaving with logs, I'm not expecting too much, and it's enough to get the information. Probably not as good as metrics, I prefer leaving it for later, and adding it if we see it's needed.
return atomic.LoadInt32(&s.stopped) == 1 | ||
} | ||
|
||
func (s *DeleteSensor) triggerTombstone(ctx context.Context, st stagingTokenData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a potential major bottleneck.
The call to CountDelete is synchronous and using a lock for triggerTombstone could cause lowered performance on delete load.
I would expect the request to go into some channel which will handle the triggerTombstone asynchronously (or at least this block).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can that happen? it only updates a structure, there are some cases it will update a channel, but even in that case it will not wait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought about using channels at first, the two downsides were
- If a channel is full, I need to throw the trigger away (or block, which is unacceptable)
- It's slower with channels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This causes adding tombstones to be unavoidably serialized, since all of them have to go via this method. And if I'm not mistaken, this is true for the lakeFS server context, not just for a specific branch or repository
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fear @N-o-Z is right, which is both expected and unfortunate. I dug into AWS smithy and clients are not serialized per host, you could get multiple clients to the same DynamoDB server. So without this code deletes are not serialized.
However, this is actually not a particularly expensive piece of code, especially in all the cases where it does not enqueue an event. I suggest that we remain aware of it, profile it under load, and if necessary replace this map with a sync.Map. That has a method LoadOrStore, which would probably do a lot of what we need here. At worst we would need to get a bit sloppy with the count :-/
So I would like us to agree to open an issue to profile this and replace with sync.Map.
pkg/graveler/graveler.go
Outdated
@@ -1802,7 +1804,7 @@ func (g *Graveler) Delete(ctx context.Context, repository *RepositoryRecord, bra | |||
log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "delete"}) | |||
err = g.safeBranchWrite(ctx, log, repository, branchID, | |||
safeBranchWriteOptions{}, func(branch *Branch) error { | |||
return g.deleteUnsafe(ctx, repository, branch, key, nil) | |||
return g.deleteUnsafe(ctx, repository, branchID, branch, key, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Use BranchRecord instead
// resetKey resets given key on branch | ||
// Since we cannot (will not) modify sealed tokens data, we overwrite changes done on entry on a new staging token, effectively reverting it | ||
// to the current state in the branch committed data. If entry is not committed return an error | ||
func (g *Graveler) resetKey(ctx context.Context, repository *RepositoryRecord, branch *Branch, key Key, stagedValue *Value, st StagingToken) error { | ||
func (g *Graveler) resetKey(ctx context.Context, repository *RepositoryRecord, branchID BranchID, branch *Branch, key Key, stagedValue *Value, st StagingToken) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BranchRecord
case s.callbacks <- st: | ||
stCounter.Counter = 0 | ||
default: | ||
logging.FromContext(ctx).WithFields(logging.Fields{"repositoryID": st.repositoryID, "branchID": st.branchID, "stagingTokenID": st.stagingTokenID}).Info("delete sensor callback channel is full, dropping delete event") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe all related logs should be Debug
logging.FromContext(ctx).WithFields(logging.Fields{"repositoryID": st.repositoryID, "branchID": st.branchID, "stagingTokenID": st.stagingTokenID}).Info("delete sensor callback channel is full, dropping delete event") | |
logging.FromContext(ctx).WithFields(logging.Fields{"repositoryID": st.repositoryID, "branchID": st.branchID, "stagingTokenID": st.stagingTokenID}).Debug("Delete sensor callback channel is full, dropping delete event") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we want to get this log, this is basically logging, if there was compaction, I would do it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @guy-har that it's an interesting log, so 👍🏽 .
But...
if the chan is full, clearly there are too many deletes -- and we start spamming logs. Not sure we can do this without some antiflooding, and I cannot find a package to do this. Let's go with Debug and open a "good first issue"; anyone can pick it up later, it should be 1-2 days' work?
if stCounter.StagingTokenID != st.stagingTokenID { | ||
stCounter.StagingTokenID = st.stagingTokenID | ||
stCounter.Counter = 1 | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might have a weird order of staging token for a branch, a transition of staging tokens might look like:
st1,st1,st1,st2,st1,st2,st2,st2
. How will this effect the sensor logic?
return | ||
} | ||
// Reset the counter if the staging token has changed, under the assumption that staging tokens are updated only during sealing processes, which occur following a commit or compaction. | ||
if stCounter.StagingTokenID != st.stagingTokenID { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to log something before the swap?
pkg/catalog/catalog.go
Outdated
"branchID": branchID, | ||
"stagingTokenID": stagingTokenID, | ||
"inGrace": inGrace, | ||
}).Info("delete sensor callback") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}).Info("delete sensor callback") | |
}).Info("Delete sensor callback") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the comments. I have only one concern which I think we have to fix before this could be merged
return atomic.LoadInt32(&s.stopped) == 1 | ||
} | ||
|
||
func (s *DeleteSensor) triggerTombstone(ctx context.Context, st stagingTokenData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This causes adding tombstones to be unavoidably serialized, since all of them have to go via this method. And if I'm not mistaken, this is true for the lakeFS server context, not just for a specific branch or repository
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to open issues and defer on most of the remaining issues.
I do want us to use a struct as a map key.1 This is always easier to read and more efficient IME. And it will make it more clear that we can move to sync.Map if there are performance issues.
Footnotes
-
This is non-blocking, because I accept that right now this is a matter of taste. ↩
callbackChannelSize = 1000 | ||
) | ||
|
||
type DeleteSensorCB func(repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken, inGrace bool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a godoc. It should say that e.g.
if inGrace, lakeFS is terminating and the callback should return ASAP.
Alternatively, remove this parameter entirely and just log.Info() instead of calling the callback when inGrace.
mutex sync.Mutex | ||
// stopped used as flag that the sensor has stopped. stop processing CountDelete. | ||
stopped bool | ||
branchTombstoneCounter map[RepositoryID]map[BranchID]*StagingTokenCounter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems less useful than
branchTombstoneCounter map[RepositoryID]map[BranchID]*StagingTokenCounter | |
branchTombstoneCounter map[RepositoryBranch]*StagingTokenCounter |
where
type RepositoryBranch struct {
RepositoryID RepositoryID
BranchID BranchID
}
return atomic.LoadInt32(&s.stopped) == 1 | ||
} | ||
|
||
func (s *DeleteSensor) triggerTombstone(ctx context.Context, st stagingTokenData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fear @N-o-Z is right, which is both expected and unfortunate. I dug into AWS smithy and clients are not serialized per host, you could get multiple clients to the same DynamoDB server. So without this code deletes are not serialized.
However, this is actually not a particularly expensive piece of code, especially in all the cases where it does not enqueue an event. I suggest that we remain aware of it, profile it under load, and if necessary replace this map with a sync.Map. That has a method LoadOrStore, which would probably do a lot of what we need here. At worst we would need to get a bit sloppy with the count :-/
So I would like us to agree to open an issue to profile this and replace with sync.Map.
if stCounter.StagingTokenID != st.stagingTokenID { | ||
stCounter.StagingTokenID = st.stagingTokenID | ||
stCounter.Counter = 1 | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with opening an issue to refactor this and revisit at a later date.
At such a later date, I would probably try to fill out this sketch: Keep a sync.Map of branch or staging token triggers (I'm fine with either, because we don't care about losing a few counts -- in any case if we need to compact more than once a second, then we need to do something better than compaction!). Whenever we have to, atomically increment the counter on the trigger. If the count-after-increment is (exactly) 1000:
- If we count per branch, atomically decrement it by 1000 and trigger compaction or whatever.
- If we count per staging token, atomically replace the staging token trigger, and we're happy to lose the excess counts!
case s.callbacks <- st: | ||
stCounter.Counter = 0 | ||
default: | ||
logging.FromContext(ctx).WithFields(logging.Fields{"repositoryID": st.repositoryID, "branchID": st.branchID, "stagingTokenID": st.stagingTokenID}).Info("delete sensor callback channel is full, dropping delete event") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @guy-har that it's an interesting log, so 👍🏽 .
But...
if the chan is full, clearly there are too many deletes -- and we start spamming logs. Not sure we can do this without some antiflooding, and I cannot find a package to do this. Let's go with Debug and open a "good first issue"; anyone can pick it up later, it should be 1-2 days' work?
♻️ PR Preview b8ceda5 has been successfully destroyed since this PR has been closed. 🤖 By surge-preview |
@N-o-Z @arielshaqed , I am adding here some information I ran it:
running with and without the sensor, resulted the same
Pushed it a bit by setting the callback channel to 1 and setting the callback with a 1 second sleep
Same as above but without the log when the channel is full (looks the same as the first)
|
Thanks, I think that there is no difference between any of the versions that you showed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm convinced 😃
Thanks!!
No description provided.