Skip to content

Commit

Permalink
feat(graphdb): split deletion transactions. (#303)
Browse files Browse the repository at this point in the history
* feat(graphdb): split deletion transactions.

* feat(graphdb): use a const for deletion batch size.
  • Loading branch information
Zenithar authored Dec 9, 2024
1 parent 560e11b commit 5672467
Showing 1 changed file with 84 additions and 27 deletions.
111 changes: 84 additions & 27 deletions pkg/kubehound/storage/graphdb/janusgraph_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
channelSizeBatchFactor = 4 // TODO maybe move that into a config file?
StorageProviderName = "janusgraph"
deleteBatchSize = 10000
)

var (
Expand Down Expand Up @@ -70,19 +71,47 @@ func (jgp *JanusGraphProvider) Prepare(ctx context.Context) error {
tx := g.Tx()
defer tx.Close()

gtx, err := tx.Begin()
if err != nil {
return err
}

err = <-gtx.V().Drop().Iterate()
if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return err
for {
// Begin a new transaction.
gtx, err := tx.Begin()
if err != nil {
return err
}

// Retrieve the number of vertices in the graph.
page, err := gtx.V().Count().Next()
if err != nil {
return err
}

// Decode the number of vertices from the page.
count, err := page.GetInt()
if err != nil {
return err
}

// If there are no more vertices to delete, break the loop.
if count == 0 {
break
}

// Delete the vertices in the graph.
err = <-gtx.V().Limit(deleteBatchSize).Drop().Iterate()
if err != nil {
return err
}

// Commit the transaction.
if err := tx.Commit(); err != nil {
return err
}

// Check context for cancellation.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}

return nil
Expand Down Expand Up @@ -154,7 +183,7 @@ func (jgp *JanusGraphProvider) Close(ctx context.Context) error {
return nil
}

// Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries.
// Clean removes all vertices in the graph for the given cluster.
func (jgp *JanusGraphProvider) Clean(ctx context.Context, cluster string) error {
var err error
span, ctx := span.SpanRunFromContext(ctx, span.IngestorClean)
Expand All @@ -165,19 +194,47 @@ func (jgp *JanusGraphProvider) Clean(ctx context.Context, cluster string) error
tx := g.Tx()
defer tx.Close()

gtx, err := tx.Begin()
if err != nil {
return err
}

err = <-gtx.V().Has("cluster", cluster).Drop().Iterate()
if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return err
for {
// Begin a new transaction.
gtx, err := tx.Begin()
if err != nil {
return err
}

// Retrieve the number of vertices in the graph for the given cluster.
page, err := gtx.V().Has("cluster", cluster).Count().Next()
if err != nil {
return err
}

// Decode the number of vertices from the page.
count, err := page.GetInt()
if err != nil {
return err
}

// If there are no more vertices to delete, break the loop.
if count == 0 {
break
}

// Delete the vertices in the graph for the given cluster.
err = <-gtx.V().Has("cluster", cluster).Limit(deleteBatchSize).Drop().Iterate()
if err != nil {
return err
}

// Commit the transaction.
if err := tx.Commit(); err != nil {
return err
}

// Check context for cancellation.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}

return nil
Expand Down

0 comments on commit 5672467

Please sign in to comment.