Skip to content

Commit

Permalink
fix: replace time.Sleep with timer.Wait to support cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
suzuki-shunsuke committed Jan 11, 2025
1 parent abd0bf4 commit 07fcc26
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 68 deletions.
4 changes: 2 additions & 2 deletions pkg/cli/vacuum/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ func (i *command) action(c *cli.Context) error {
ctrl := controller.InitializeVacuumCommandController(c.Context, param, http.DefaultClient, i.r.Runtime)

if c.Command.Name == "show" {
if err := ctrl.ListPackages(i.r.LogE, c.Bool("expired")); err != nil {
if err := ctrl.ListPackages(c.Context, i.r.LogE, c.Bool("expired")); err != nil {
return fmt.Errorf("show packages: %w", err)
}
return nil
}

if c.Command.Name == "run" {
if err := ctrl.Vacuum(i.r.LogE); err != nil {
if err := ctrl.Vacuum(c.Context, i.r.LogE); err != nil {
return fmt.Errorf("run: %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/install/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ packages:
}
downloader := download.NewDownloader(nil, download.NewHTTPDownloader(http.DefaultClient))
executor := &osexec.Mock{}
vacuum := vacuum.New(d.param, fs)
vacuum := vacuum.New(ctx, d.param, fs)
pkgInstaller := installpackage.New(d.param, downloader, d.rt, fs, linker, nil, &checksum.Calculator{}, unarchive.New(executor, fs), &cosign.MockVerifier{}, &slsa.MockVerifier{}, &minisign.MockVerifier{}, &ghattestation.MockVerifier{}, &installpackage.MockGoInstallInstaller{}, &installpackage.MockGoBuildInstaller{}, &installpackage.MockCargoPackageInstaller{}, vacuum)
policyFinder := policy.NewConfigFinder(fs)
policyReader := policy.NewReader(fs, &policy.MockValidator{}, policyFinder, policy.NewConfigReader(fs))
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/vacuum/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vacuum

import (
"context"
"fmt"
"io"
"os"
Expand All @@ -26,13 +27,13 @@ type Controller struct {
}

// New initializes a Controller with the given context, parameters, and dependencies.
func New(param *config.Param, fs afero.Fs) *Controller {
func New(ctx context.Context, param *config.Param, fs afero.Fs) *Controller {
vc := &Controller{
stdout: os.Stdout,
Param: param,
fs: fs,
}
vc.storeQueue = newStoreQueue(vc)
vc.storeQueue = newStoreQueue(ctx, vc)
return vc
}

Expand Down Expand Up @@ -73,7 +74,7 @@ func (vc *Controller) getDB() (*bolt.DB, error) {
}

// withDBRetry retries a database operation with exponential backoff.
func (vc *Controller) withDBRetry(logE *logrus.Entry, fn func(*bolt.Tx) error, dbAccessType DBAccessType) error {
func (vc *Controller) withDBRetry(ctx context.Context, logE *logrus.Entry, fn func(*bolt.Tx) error, dbAccessType DBAccessType) error {

Check failure on line 77 in pkg/controller/vacuum/controller.go

View workflow job for this annotation

GitHub Actions / test / test

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
const (
retries = 2
initialBackoff = 100 * time.Millisecond
Expand Down
11 changes: 6 additions & 5 deletions pkg/controller/vacuum/queue_store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vacuum

import (
"context"
"sync"

"github.com/sirupsen/logrus"
Expand All @@ -23,27 +24,27 @@ type StoreQueue struct {
}

// newStoreQueue initializes the task queue with a single worker.
func newStoreQueue(vc *Controller) *StoreQueue {
func newStoreQueue(ctx context.Context, vc *Controller) *StoreQueue {
const maxTasks = 100
sq := &StoreQueue{
taskQueue: make(chan StoreRequest, maxTasks),
done: make(chan struct{}),
vc: vc,
}

go sq.worker()
go sq.worker(ctx)
return sq
}

// worker processes tasks from the queue.
func (sq *StoreQueue) worker() {
func (sq *StoreQueue) worker(ctx context.Context) {
for {
select {
case task, ok := <-sq.taskQueue:
if !ok {
return
}
err := sq.vc.storePackageInternal(task.logE, task.pkg)
err := sq.vc.storePackageInternal(ctx, task.logE, task.pkg)
if err != nil {
logerr.WithError(task.logE, err).Error("store package asynchronously")
}
Expand All @@ -52,7 +53,7 @@ func (sq *StoreQueue) worker() {
// Process remaining tasks
for len(sq.taskQueue) > 0 {
task := <-sq.taskQueue
err := sq.vc.storePackageInternal(task.logE, task.pkg)
err := sq.vc.storePackageInternal(ctx, task.logE, task.pkg)
if err != nil {
logerr.WithError(task.logE, err).Error("store package asynchronously during shutdown")
}
Expand Down
53 changes: 27 additions & 26 deletions pkg/controller/vacuum/vacuum.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vacuum

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -46,26 +47,26 @@ type Package struct {
}

// Vacuum performs the vacuuming process if it is enabled.
func (vc *Controller) Vacuum(logE *logrus.Entry) error {
func (vc *Controller) Vacuum(ctx context.Context, logE *logrus.Entry) error {
if !vc.IsVacuumEnabled(logE) {
return nil
}
return vc.vacuumExpiredPackages(logE)
return vc.vacuumExpiredPackages(ctx, logE)
}

// ListPackages lists the packages based on the provided arguments.
// If the expired flag is set to true, it lists the expired packages.
// Otherwise, it lists all packages.
func (vc *Controller) ListPackages(logE *logrus.Entry, expired bool, args ...string) error {
func (vc *Controller) ListPackages(ctx context.Context, logE *logrus.Entry, expired bool, args ...string) error {
if expired {
return vc.handleListExpiredPackages(logE, args...)
return vc.handleListExpiredPackages(ctx, logE, args...)
}
return vc.handleListPackages(logE, args...)
return vc.handleListPackages(ctx, logE, args...)
}

// handleListPackages retrieves a list of packages and displays them using a fuzzy search.
func (vc *Controller) handleListPackages(logE *logrus.Entry, args ...string) error {
pkgs, err := vc.listPackages(logE)
func (vc *Controller) handleListPackages(ctx context.Context, logE *logrus.Entry, args ...string) error {
pkgs, err := vc.listPackages(ctx, logE)
if err != nil {
return err
}
Expand All @@ -74,8 +75,8 @@ func (vc *Controller) handleListPackages(logE *logrus.Entry, args ...string) err

// handleListExpiredPackages handles the process of listing expired packages
// and displaying them using a fuzzy search.
func (vc *Controller) handleListExpiredPackages(logE *logrus.Entry, args ...string) error {
expiredPkgs, err := vc.listExpiredPackages(logE)
func (vc *Controller) handleListExpiredPackages(ctx context.Context, logE *logrus.Entry, args ...string) error {
expiredPkgs, err := vc.listExpiredPackages(ctx, logE)
if err != nil {
return err
}
Expand Down Expand Up @@ -135,8 +136,8 @@ func (vc *Controller) IsVacuumEnabled(logE *logrus.Entry) bool {
}

// listExpiredPackages lists all packages that have expired based on the vacuum configuration.
func (vc *Controller) listExpiredPackages(logE *logrus.Entry) ([]*PackageVacuumEntry, error) {
pkgs, err := vc.listPackages(logE)
func (vc *Controller) listExpiredPackages(ctx context.Context, logE *logrus.Entry) ([]*PackageVacuumEntry, error) {
pkgs, err := vc.listPackages(ctx, logE)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -165,7 +166,7 @@ func (vc *Controller) isPackageExpired(pkg *PackageVacuumEntry) bool {
}

// listPackages lists all stored package entries.
func (vc *Controller) listPackages(logE *logrus.Entry) ([]*PackageVacuumEntry, error) {
func (vc *Controller) listPackages(ctx context.Context, logE *logrus.Entry) ([]*PackageVacuumEntry, error) {
db, err := vc.getDB()
if err != nil {
return nil, err
Expand All @@ -177,7 +178,7 @@ func (vc *Controller) listPackages(logE *logrus.Entry) ([]*PackageVacuumEntry, e

var pkgs []*PackageVacuumEntry

err = vc.withDBRetry(logE, func(tx *bbolt.Tx) error {
err = vc.withDBRetry(ctx, logE, func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucketNamePkgs))
if b == nil {
return nil
Expand Down Expand Up @@ -279,8 +280,8 @@ func (vc *Controller) displayPackagesFuzzyInteractive(pkgs []*PackageVacuumEntry
}

// vacuumExpiredPackages performs cleanup of expired packages.
func (vc *Controller) vacuumExpiredPackages(logE *logrus.Entry) error {
expiredPackages, err := vc.listExpiredPackages(logE)
func (vc *Controller) vacuumExpiredPackages(ctx context.Context, logE *logrus.Entry) error {
expiredPackages, err := vc.listExpiredPackages(ctx, logE)
if err != nil {
return err
}
Expand All @@ -303,7 +304,7 @@ func (vc *Controller) vacuumExpiredPackages(logE *logrus.Entry) error {

defer vc.Close(logE)
if len(successfulRemovals) > 0 {
if err := vc.removePackages(logE, successfulRemovals); err != nil {
if err := vc.removePackages(ctx, logE, successfulRemovals); err != nil {
return fmt.Errorf("remove packages from database: %w", err)
}
}
Expand Down Expand Up @@ -374,12 +375,12 @@ func (vc *Controller) processExpiredPackages(logE *logrus.Entry, expired []*Pack
}

// storePackageInternal stores package entries in the database.
func (vc *Controller) storePackageInternal(logE *logrus.Entry, pkg *Package, dateTime ...time.Time) error {
func (vc *Controller) storePackageInternal(ctx context.Context, logE *logrus.Entry, pkg *Package, dateTime ...time.Time) error {
lastUsedTime := time.Now()
if len(dateTime) > 0 {
lastUsedTime = dateTime[0]
}
return vc.withDBRetry(logE, func(tx *bbolt.Tx) error {
return vc.withDBRetry(ctx, logE, func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucketNamePkgs))
if b == nil {
return errors.New("bucket not found")
Expand Down Expand Up @@ -416,8 +417,8 @@ func (vc *Controller) storePackageInternal(logE *logrus.Entry, pkg *Package, dat
}

// removePackages removes package entries from the database.
func (vc *Controller) removePackages(logE *logrus.Entry, pkgs []string) error {
return vc.withDBRetry(logE, func(tx *bbolt.Tx) error {
func (vc *Controller) removePackages(ctx context.Context, logE *logrus.Entry, pkgs []string) error {
return vc.withDBRetry(ctx, logE, func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucketNamePkgs))
if b == nil {
return errors.New("bucket not found")
Expand Down Expand Up @@ -462,26 +463,26 @@ func decodePackageEntry(data []byte) (*PackageEntry, error) {
}

// GetPackageLastUsed retrieves the last used time of a package. for testing purposes.
func (vc *Controller) GetPackageLastUsed(logE *logrus.Entry, pkgPath string) *time.Time {
func (vc *Controller) GetPackageLastUsed(ctx context.Context, logE *logrus.Entry, pkgPath string) *time.Time {
var lastUsedTime time.Time
pkgEntry, _ := vc.retrievePackageEntry(logE, pkgPath)
pkgEntry, _ := vc.retrievePackageEntry(ctx, logE, pkgPath)
if pkgEntry != nil {
lastUsedTime = pkgEntry.LastUsageTime
}
return &lastUsedTime
}

// SetTimeStampPackage permit define a Timestamp for a package Manually. for testing purposes.
func (vc *Controller) SetTimestampPackage(logE *logrus.Entry, pkg *config.Package, pkgPath string, datetime time.Time) error {
func (vc *Controller) SetTimestampPackage(ctx context.Context, logE *logrus.Entry, pkg *config.Package, pkgPath string, datetime time.Time) error {
vacuumPkg := vc.getVacuumPackage(pkg, pkgPath)
return vc.storePackageInternal(logE, vacuumPkg, datetime)
return vc.storePackageInternal(ctx, logE, vacuumPkg, datetime)
}

// retrievePackageEntry retrieves a package entry from the database by key. for testing purposes.
func (vc *Controller) retrievePackageEntry(logE *logrus.Entry, key string) (*PackageEntry, error) {
func (vc *Controller) retrievePackageEntry(ctx context.Context, logE *logrus.Entry, key string) (*PackageEntry, error) {
var pkgEntry *PackageEntry
key = generatePackageKey(vc.Param.RootDir, key)
err := vc.withDBRetry(logE, func(tx *bbolt.Tx) error {
err := vc.withDBRetry(ctx, logE, func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucketNamePkgs))
if b == nil {
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/vacuum/vacuum_internal_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vacuum

import (
"context"
"encoding/json"
"testing"
"time"
Expand All @@ -14,7 +15,7 @@ func TestHandleAsyncStorePackage_NilPackage(t *testing.T) {
t.Parallel()
logE := logrus.NewEntry(logrus.New())

vacuumCtrl := New(nil, nil)
vacuumCtrl := New(context.Background(), nil, nil)

// Test
err := vacuumCtrl.handleAsyncStorePackage(logE, nil)
Expand Down
Loading

0 comments on commit 07fcc26

Please sign in to comment.