Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/delete sensor #7523

Merged
merged 12 commits into from
Mar 12, 2024
74 changes: 74 additions & 0 deletions cmd/lakectl/cmd/abuse_random_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cmd

import (
"fmt"
"math/rand"
"net/http"
"os"
"syscall"
"time"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/helpers"
"github.com/treeverse/lakefs/pkg/testutil/stress"
)

var abuseRandomDeletesCmd = &cobra.Command{
Use: "random-delete <source ref URI>",
Short: "Delete keys from a file and generate random delete from the source ref for those keys.",
Hidden: false,
Args: cobra.ExactArgs(1),
ValidArgsFunction: ValidArgsRepository,
Run: func(cmd *cobra.Command, args []string) {
u := MustParseRefURI("source ref URI", args[0])
amount := Must(cmd.Flags().GetInt("amount"))
parallelism := Must(cmd.Flags().GetInt("parallelism"))
fromFile := Must(cmd.Flags().GetString("from-file"))

fmt.Println("Source ref:", u)
// read the input file
keys, err := readLines(fromFile)
if err != nil {
DieErr(err)
}
fmt.Printf("read a total of %d keys from key file\n", len(keys))

generator := stress.NewGenerator("delete", parallelism, stress.WithSignalHandlersFor(os.Interrupt, syscall.SIGTERM))

// generate randomly selected keys as input
generator.Setup(func(add stress.GeneratorAddFn) {
for i := 0; i < amount; i++ {
//nolint:gosec
add(keys[rand.Intn(len(keys))])
}
})

// execute the things!
generator.Run(func(input chan string, output chan stress.Result) {
ctx := cmd.Context()
client := getClient()
for work := range input {
start := time.Now()
resp, err := client.DeleteObject(ctx, u.Repository, u.Ref, &apigen.DeleteObjectParams{
Path: work,
})
if err == nil && resp.StatusCode != http.StatusOK {
err = helpers.ResponseAsError(resp)
}
output <- stress.Result{
Error: err,
Took: time.Since(start),
}
}
})
},
}

//nolint:gochecknoinits
func init() {
abuseCmd.AddCommand(abuseRandomDeletesCmd)
abuseRandomDeletesCmd.Flags().String("from-file", "", "read keys from this file (\"-\" for stdin)")
abuseRandomDeletesCmd.Flags().Int("amount", abuseDefaultAmount, "amount of reads to do")
abuseRandomDeletesCmd.Flags().Int("parallelism", abuseDefaultParallelism, "amount of reads to do in parallel")
}
20 changes: 20 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,26 @@ lakectl abuse list <source ref URI> [flags]



### lakectl abuse random-delete

Delete keys from a file and generate random delete from the source ref for those keys.

```
lakectl abuse random-delete <source ref URI> [flags]
```

#### Options
{:.no_toc}

```
--amount int amount of reads to do (default 1000000)
--from-file string read keys from this file ("-" for stdin)
-h, --help help for random-delete
--parallelism int amount of reads to do in parallel (default 100)
```



### lakectl abuse random-read

Read keys from a file and generate random reads from the source ref for those keys.
Expand Down
19 changes: 18 additions & 1 deletion pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ type Catalog struct {
KVStore kv.Store
KVStoreLimited kv.Store
addressProvider *ident.HexAddressProvider
deleteSensor *graveler.DeleteSensor
UGCPrepareMaxFileSize int64
UGCPrepareInterval time.Duration
}
Expand Down Expand Up @@ -366,7 +367,19 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {

protectedBranchesManager := branch.NewProtectionManager(settingManager)
stagingManager := staging.NewManager(ctx, cfg.KVStore, storeLimiter, cfg.Config.Graveler.BatchDBIOTransactionMarkers, executor)
gStore := graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager)
var deleteSensor *graveler.DeleteSensor
if cfg.Config.Graveler.CompactionSensorThreshold > 0 {
cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) {
logging.FromContext(ctx).WithFields(logging.Fields{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider what kind of experiment we're trying to do here. What would we like to see, how will it change our assumptions.
As part of this we should also consider the visibility of the data. Is logging enough? Do we want to add this to the statistics?
These are all questions, no concrete action requested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently leaving with logs, I'm not expecting too much, and it's enough to get the information. Probably not as good as metrics, I prefer leaving it for later, and adding it if we see it's needed.

"repositoryID": repositoryID,
"branchID": branchID,
"stagingTokenID": stagingTokenID,
"inGrace": inGrace,
}).Info("Delete sensor callback")
}
deleteSensor = graveler.NewDeleteSensor(cfg.Config.Graveler.CompactionSensorThreshold, cb)
}
gStore := graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager, deleteSensor)

// The size of the workPool is determined by the number of workers and the number of desired pending tasks for each worker.
workPool := pond.New(sharedWorkers, sharedWorkers*pendingTasksPerWorker, pond.Context(ctx))
Expand All @@ -384,6 +397,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
managers: []io.Closer{sstableManager, sstableMetaManager, &ctxCloser{cancelFn}},
KVStoreLimited: storeLimiter,
addressProvider: addressProvider,
deleteSensor: deleteSensor,
}, nil
}

Expand Down Expand Up @@ -2763,6 +2777,9 @@ func (c *Catalog) Close() error {
}
}
c.workPool.StopAndWaitFor(workersMaxDrainDuration)
if c.deleteSensor != nil {
c.deleteSensor.Close()
}
return errs
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ type Config struct {
Graveler struct {
EnsureReadableRootNamespace bool `mapstructure:"ensure_readable_root_namespace"`
BatchDBIOTransactionMarkers bool `mapstructure:"batch_dbio_transaction_markers"`
CompactionSensorThreshold int `mapstructure:"compaction_sensor_threshold"`
RepositoryCache struct {
Size int `mapstructure:"size"`
Expiry time.Duration `mapstructure:"expiry"`
Expand Down
136 changes: 136 additions & 0 deletions pkg/graveler/delete_sensor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package graveler

import (
"context"
"sync"

"github.com/treeverse/lakefs/pkg/logging"
)

const (
callbackChannelSize = 1000
)

type DeleteSensorCB func(repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken, inGrace bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a godoc. It should say that e.g.

if inGrace, lakeFS is terminating and the callback should return ASAP.

Alternatively, remove this parameter entirely and just log.Info() instead of calling the callback when inGrace.


// StagingTokenCounter holds a counter for a specific staging token.
type StagingTokenCounter struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation to all the structs

StagingTokenID StagingToken
Counter int
}

// stagingTokenData holds data regarding a staging token.
type stagingTokenData struct {
repositoryID RepositoryID
branchID BranchID
stagingTokenID StagingToken
}

type DeleteSensor struct {
cb DeleteSensorCB
triggerAt int
callbacks chan stagingTokenData
wg sync.WaitGroup
mutex sync.Mutex
// stopped used as flag that the sensor has stopped. stop processing CountDelete.
stopped bool
branchTombstoneCounter map[RepositoryID]map[BranchID]*StagingTokenCounter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems less useful than

Suggested change
branchTombstoneCounter map[RepositoryID]map[BranchID]*StagingTokenCounter
branchTombstoneCounter map[RepositoryBranch]*StagingTokenCounter

where

type RepositoryBranch struct {
  RepositoryID RepositoryID
  BranchID BranchID
}

}

type DeleteSensorOpts func(s *DeleteSensor)

func WithCBBufferSize(bufferSize int) DeleteSensorOpts {
return func(s *DeleteSensor) {
s.callbacks = make(chan stagingTokenData, bufferSize)
}
}

func NewDeleteSensor(triggerAt int, cb DeleteSensorCB, opts ...DeleteSensorOpts) *DeleteSensor {
ds := &DeleteSensor{
cb: cb,
triggerAt: triggerAt,
stopped: false,
mutex: sync.Mutex{},
branchTombstoneCounter: make(map[RepositoryID]map[BranchID]*StagingTokenCounter),
callbacks: make(chan stagingTokenData, callbackChannelSize),
}
for _, opt := range opts {
opt(ds)
}
ds.wg.Add(1)
go ds.processCallbacks()
return ds
}

// triggerTombstone triggers a tombstone event for a specific staging token. if stopped, the event is not triggered.
// if the staging token has changed, the counter is reset. if the counter reaches the triggerAt value, the event is triggered.
// in case the callback channel is full, the event is dropped and will be retried in the next call.
func (s *DeleteSensor) triggerTombstone(ctx context.Context, st stagingTokenData) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a potential major bottleneck.
The call to CountDelete is synchronous and using a lock for triggerTombstone could cause lowered performance on delete load.
I would expect the request to go into some channel which will handle the triggerTombstone asynchronously (or at least this block).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can that happen? it only updates a structure, there are some cases it will update a channel, but even in that case it will not wait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about using channels at first, the two downsides were

  1. If a channel is full, I need to throw the trigger away (or block, which is unacceptable)
  2. It's slower with channels

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes adding tombstones to be unavoidably serialized, since all of them have to go via this method. And if I'm not mistaken, this is true for the lakeFS server context, not just for a specific branch or repository

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fear @N-o-Z is right, which is both expected and unfortunate. I dug into AWS smithy and clients are not serialized per host, you could get multiple clients to the same DynamoDB server. So without this code deletes are not serialized.

However, this is actually not a particularly expensive piece of code, especially in all the cases where it does not enqueue an event. I suggest that we remain aware of it, profile it under load, and if necessary replace this map with a sync.Map. That has a method LoadOrStore, which would probably do a lot of what we need here. At worst we would need to get a bit sloppy with the count :-/

So I would like us to agree to open an issue to profile this and replace with sync.Map.

s.mutex.Lock()
defer s.mutex.Unlock()
if s.stopped {
return
}
if _, ok := s.branchTombstoneCounter[st.repositoryID]; !ok {
s.branchTombstoneCounter[st.repositoryID] = make(map[BranchID]*StagingTokenCounter)
}
stCounter, ok := s.branchTombstoneCounter[st.repositoryID][st.branchID]
if !ok {
stCounter = &StagingTokenCounter{
StagingTokenID: st.stagingTokenID,
Counter: 1,
}
s.branchTombstoneCounter[st.repositoryID][st.branchID] = stCounter
return
}
// Reset the counter if the staging token has changed, under the assumption that staging tokens are updated only during sealing processes, which occur during a commit or compaction.
if stCounter.StagingTokenID != st.stagingTokenID {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to log something before the swap?

stCounter.StagingTokenID = st.stagingTokenID
stCounter.Counter = 1
return
}
Comment on lines +87 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why drop the count of the sealed token? Its entries will still need to be skipped.

I think that the assumption here is that if the token was sealed then a commit is in progress, meaning we will soon be rid of those entries. But then please document this decision -- we may have other reasons to seal in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done to:

  1. Avoid the need for exposing a reset method and calling across the code (for now)
  2. An "easy" way to reset cross lakeFS instances when a commit or compaction is done.

It isn't the best way and has many downsides, but I think it will do for the log implementation, and will give us valid information for how we would like to go on

Adding documentation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might have a weird order of staging token for a branch, a transition of staging tokens might look like:
st1,st1,st1,st2,st1,st2,st2,st2. How will this effect the sensor logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with opening an issue to refactor this and revisit at a later date.

At such a later date, I would probably try to fill out this sketch: Keep a sync.Map of branch or staging token triggers (I'm fine with either, because we don't care about losing a few counts -- in any case if we need to compact more than once a second, then we need to do something better than compaction!). Whenever we have to, atomically increment the counter on the trigger. If the count-after-increment is (exactly) 1000:

  • If we count per branch, atomically decrement it by 1000 and trigger compaction or whatever.
  • If we count per staging token, atomically replace the staging token trigger, and we're happy to lose the excess counts!

if stCounter.Counter < s.triggerAt {
stCounter.Counter++
}
if stCounter.Counter >= s.triggerAt {
select {
case s.callbacks <- st:
stCounter.Counter = 0
default:
logging.FromContext(ctx).WithFields(logging.Fields{"repositoryID": st.repositoryID, "branchID": st.branchID, "stagingTokenID": st.stagingTokenID}).Info("delete sensor callback channel is full, dropping delete event")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe all related logs should be Debug

Suggested change
logging.FromContext(ctx).WithFields(logging.Fields{"repositoryID": st.repositoryID, "branchID": st.branchID, "stagingTokenID": st.stagingTokenID}).Info("delete sensor callback channel is full, dropping delete event")
logging.FromContext(ctx).WithFields(logging.Fields{"repositoryID": st.repositoryID, "branchID": st.branchID, "stagingTokenID": st.stagingTokenID}).Debug("Delete sensor callback channel is full, dropping delete event")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we want to get this log, this is basically logging, if there was compaction, I would do it now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @guy-har that it's an interesting log, so 👍🏽 .

But...

if the chan is full, clearly there are too many deletes -- and we start spamming logs. Not sure we can do this without some antiflooding, and I cannot find a package to do this. Let's go with Debug and open a "good first issue"; anyone can pick it up later, it should be 1-2 days' work?

}
return
}
}

func (s *DeleteSensor) processCallbacks() {
s.mutex.Lock()
isStopped := s.stopped
s.mutex.Unlock()
defer s.wg.Done()
for cb := range s.callbacks {
s.cb(cb.repositoryID, cb.branchID, cb.stagingTokenID, isStopped)
}
}

func (s *DeleteSensor) CountDelete(ctx context.Context, repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken) {
st := stagingTokenData{
repositoryID: repositoryID,
branchID: branchID,
stagingTokenID: stagingTokenID,
}
s.triggerTombstone(ctx, st)
}

func (s *DeleteSensor) Close() {
s.mutex.Lock()
if s.stopped {
s.mutex.Unlock()
return
}
s.stopped = true
s.mutex.Unlock()

close(s.callbacks)
s.wg.Wait()
}
Loading
Loading