-
Notifications
You must be signed in to change notification settings - Fork 367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/delete sensor #7523
Feature/delete sensor #7523
Changes from all commits
272bb8d
a1f8d1b
b5d14c8
4e978da
b6607fb
7787230
d5c5a4c
d94c8a3
b859285
63fcef9
72e8037
b8ceda5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package cmd | ||
|
||
import ( | ||
"fmt" | ||
"math/rand" | ||
"net/http" | ||
"os" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/spf13/cobra" | ||
"github.com/treeverse/lakefs/pkg/api/apigen" | ||
"github.com/treeverse/lakefs/pkg/api/helpers" | ||
"github.com/treeverse/lakefs/pkg/testutil/stress" | ||
) | ||
|
||
var abuseRandomDeletesCmd = &cobra.Command{ | ||
Use: "random-delete <source ref URI>", | ||
Short: "Delete keys from a file and generate random delete from the source ref for those keys.", | ||
Hidden: false, | ||
Args: cobra.ExactArgs(1), | ||
ValidArgsFunction: ValidArgsRepository, | ||
Run: func(cmd *cobra.Command, args []string) { | ||
u := MustParseRefURI("source ref URI", args[0]) | ||
amount := Must(cmd.Flags().GetInt("amount")) | ||
parallelism := Must(cmd.Flags().GetInt("parallelism")) | ||
fromFile := Must(cmd.Flags().GetString("from-file")) | ||
|
||
fmt.Println("Source ref:", u) | ||
// read the input file | ||
keys, err := readLines(fromFile) | ||
if err != nil { | ||
DieErr(err) | ||
} | ||
fmt.Printf("read a total of %d keys from key file\n", len(keys)) | ||
|
||
generator := stress.NewGenerator("delete", parallelism, stress.WithSignalHandlersFor(os.Interrupt, syscall.SIGTERM)) | ||
|
||
// generate randomly selected keys as input | ||
generator.Setup(func(add stress.GeneratorAddFn) { | ||
for i := 0; i < amount; i++ { | ||
//nolint:gosec | ||
add(keys[rand.Intn(len(keys))]) | ||
} | ||
}) | ||
|
||
// execute the things! | ||
generator.Run(func(input chan string, output chan stress.Result) { | ||
ctx := cmd.Context() | ||
client := getClient() | ||
for work := range input { | ||
start := time.Now() | ||
resp, err := client.DeleteObject(ctx, u.Repository, u.Ref, &apigen.DeleteObjectParams{ | ||
Path: work, | ||
}) | ||
if err == nil && resp.StatusCode != http.StatusOK { | ||
err = helpers.ResponseAsError(resp) | ||
} | ||
output <- stress.Result{ | ||
Error: err, | ||
Took: time.Since(start), | ||
} | ||
} | ||
}) | ||
}, | ||
} | ||
|
||
//nolint:gochecknoinits | ||
func init() { | ||
abuseCmd.AddCommand(abuseRandomDeletesCmd) | ||
abuseRandomDeletesCmd.Flags().String("from-file", "", "read keys from this file (\"-\" for stdin)") | ||
abuseRandomDeletesCmd.Flags().Int("amount", abuseDefaultAmount, "amount of reads to do") | ||
abuseRandomDeletesCmd.Flags().Int("parallelism", abuseDefaultParallelism, "amount of reads to do in parallel") | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,136 @@ | ||||||
package graveler | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"sync" | ||||||
|
||||||
"github.com/treeverse/lakefs/pkg/logging" | ||||||
) | ||||||
|
||||||
const ( | ||||||
callbackChannelSize = 1000 | ||||||
) | ||||||
|
||||||
type DeleteSensorCB func(repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken, inGrace bool) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a godoc. It should say that e.g.
Alternatively, remove this parameter entirely and just log.Info() instead of calling the callback when inGrace. |
||||||
|
||||||
// StagingTokenCounter holds a counter for a specific staging token. | ||||||
type StagingTokenCounter struct { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add documentation to all the structs |
||||||
StagingTokenID StagingToken | ||||||
Counter int | ||||||
} | ||||||
|
||||||
// stagingTokenData holds data regarding a staging token. | ||||||
type stagingTokenData struct { | ||||||
repositoryID RepositoryID | ||||||
branchID BranchID | ||||||
stagingTokenID StagingToken | ||||||
} | ||||||
|
||||||
type DeleteSensor struct { | ||||||
cb DeleteSensorCB | ||||||
triggerAt int | ||||||
callbacks chan stagingTokenData | ||||||
wg sync.WaitGroup | ||||||
mutex sync.Mutex | ||||||
// stopped used as flag that the sensor has stopped. stop processing CountDelete. | ||||||
stopped bool | ||||||
branchTombstoneCounter map[RepositoryID]map[BranchID]*StagingTokenCounter | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems less useful than
Suggested change
where type RepositoryBranch struct {
RepositoryID RepositoryID
BranchID BranchID
} |
||||||
} | ||||||
|
||||||
type DeleteSensorOpts func(s *DeleteSensor) | ||||||
|
||||||
func WithCBBufferSize(bufferSize int) DeleteSensorOpts { | ||||||
return func(s *DeleteSensor) { | ||||||
s.callbacks = make(chan stagingTokenData, bufferSize) | ||||||
} | ||||||
} | ||||||
|
||||||
func NewDeleteSensor(triggerAt int, cb DeleteSensorCB, opts ...DeleteSensorOpts) *DeleteSensor { | ||||||
ds := &DeleteSensor{ | ||||||
cb: cb, | ||||||
triggerAt: triggerAt, | ||||||
stopped: false, | ||||||
mutex: sync.Mutex{}, | ||||||
branchTombstoneCounter: make(map[RepositoryID]map[BranchID]*StagingTokenCounter), | ||||||
callbacks: make(chan stagingTokenData, callbackChannelSize), | ||||||
} | ||||||
for _, opt := range opts { | ||||||
opt(ds) | ||||||
} | ||||||
ds.wg.Add(1) | ||||||
go ds.processCallbacks() | ||||||
return ds | ||||||
} | ||||||
|
||||||
// triggerTombstone triggers a tombstone event for a specific staging token. if stopped, the event is not triggered. | ||||||
// if the staging token has changed, the counter is reset. if the counter reaches the triggerAt value, the event is triggered. | ||||||
// in case the callback channel is full, the event is dropped and will be retried in the next call. | ||||||
func (s *DeleteSensor) triggerTombstone(ctx context.Context, st stagingTokenData) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a potential major bottleneck. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How can that happen? it only updates a structure, there are some cases it will update a channel, but even in that case it will not wait. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thought about using channels at first, the two downsides were
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This causes adding tombstones to be unavoidably serialized, since all of them have to go via this method. And if I'm not mistaken, this is true for the lakeFS server context, not just for a specific branch or repository There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I fear @N-o-Z is right, which is both expected and unfortunate. I dug into AWS smithy and clients are not serialized per host, you could get multiple clients to the same DynamoDB server. So without this code deletes are not serialized. However, this is actually not a particularly expensive piece of code, especially in all the cases where it does not enqueue an event. I suggest that we remain aware of it, profile it under load, and if necessary replace this map with a sync.Map. That has a method LoadOrStore, which would probably do a lot of what we need here. At worst we would need to get a bit sloppy with the count :-/ So I would like us to agree to open an issue to profile this and replace with sync.Map. |
||||||
s.mutex.Lock() | ||||||
defer s.mutex.Unlock() | ||||||
if s.stopped { | ||||||
return | ||||||
} | ||||||
if _, ok := s.branchTombstoneCounter[st.repositoryID]; !ok { | ||||||
s.branchTombstoneCounter[st.repositoryID] = make(map[BranchID]*StagingTokenCounter) | ||||||
} | ||||||
stCounter, ok := s.branchTombstoneCounter[st.repositoryID][st.branchID] | ||||||
if !ok { | ||||||
stCounter = &StagingTokenCounter{ | ||||||
StagingTokenID: st.stagingTokenID, | ||||||
Counter: 1, | ||||||
} | ||||||
s.branchTombstoneCounter[st.repositoryID][st.branchID] = stCounter | ||||||
return | ||||||
} | ||||||
// Reset the counter if the staging token has changed, under the assumption that staging tokens are updated only during sealing processes, which occur during a commit or compaction. | ||||||
if stCounter.StagingTokenID != st.stagingTokenID { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you want to log something before the swap? |
||||||
stCounter.StagingTokenID = st.stagingTokenID | ||||||
stCounter.Counter = 1 | ||||||
return | ||||||
} | ||||||
Comment on lines
+87
to
+91
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why drop the count of the sealed token? Its entries will still need to be skipped. I think that the assumption here is that if the token was sealed then a commit is in progress, meaning we will soon be rid of those entries. But then please document this decision -- we may have other reasons to seal in future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was done to:
It isn't the best way and has many downsides, but I think it will do for the log implementation, and will give us valid information for how we would like to go on Adding documentation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might have a weird order of staging token for a branch, a transition of staging tokens might look like: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine with opening an issue to refactor this and revisit at a later date. At such a later date, I would probably try to fill out this sketch: Keep a sync.Map of branch or staging token triggers (I'm fine with either, because we don't care about losing a few counts -- in any case if we need to compact more than once a second, then we need to do something better than compaction!). Whenever we have to, atomically increment the counter on the trigger. If the count-after-increment is (exactly) 1000:
|
||||||
if stCounter.Counter < s.triggerAt { | ||||||
stCounter.Counter++ | ||||||
} | ||||||
if stCounter.Counter >= s.triggerAt { | ||||||
select { | ||||||
case s.callbacks <- st: | ||||||
stCounter.Counter = 0 | ||||||
default: | ||||||
logging.FromContext(ctx).WithFields(logging.Fields{"repositoryID": st.repositoryID, "branchID": st.branchID, "stagingTokenID": st.stagingTokenID}).Info("delete sensor callback channel is full, dropping delete event") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe all related logs should be Debug
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But we want to get this log, this is basically logging, if there was compaction, I would do it now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @guy-har that it's an interesting log, so 👍🏽 . But...if the chan is full, clearly there are too many deletes -- and we start spamming logs. Not sure we can do this without some antiflooding, and I cannot find a package to do this. Let's go with Debug and open a "good first issue"; anyone can pick it up later, it should be 1-2 days' work? |
||||||
} | ||||||
return | ||||||
} | ||||||
} | ||||||
|
||||||
func (s *DeleteSensor) processCallbacks() { | ||||||
s.mutex.Lock() | ||||||
isStopped := s.stopped | ||||||
s.mutex.Unlock() | ||||||
defer s.wg.Done() | ||||||
for cb := range s.callbacks { | ||||||
s.cb(cb.repositoryID, cb.branchID, cb.stagingTokenID, isStopped) | ||||||
} | ||||||
} | ||||||
|
||||||
func (s *DeleteSensor) CountDelete(ctx context.Context, repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken) { | ||||||
st := stagingTokenData{ | ||||||
repositoryID: repositoryID, | ||||||
branchID: branchID, | ||||||
stagingTokenID: stagingTokenID, | ||||||
} | ||||||
s.triggerTombstone(ctx, st) | ||||||
} | ||||||
|
||||||
func (s *DeleteSensor) Close() { | ||||||
s.mutex.Lock() | ||||||
if s.stopped { | ||||||
s.mutex.Unlock() | ||||||
return | ||||||
} | ||||||
s.stopped = true | ||||||
s.mutex.Unlock() | ||||||
|
||||||
close(s.callbacks) | ||||||
s.wg.Wait() | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider what kind of experiment we're trying to do here. What would we like to see, how will it change our assumptions.
As part of this we should also consider the visibility of the data. Is logging enough? Do we want to add this to the statistics?
These are all questions, no concrete action requested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently leaving with logs, I'm not expecting too much, and it's enough to get the information. Probably not as good as metrics, I prefer leaving it for later, and adding it if we see it's needed.