Skip to content
This repository has been archived by the owner on Mar 22, 2024. It is now read-only.

CVM-1861 #87

Open
wants to merge 3 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ require (
github.com/spf13/pflag v1.0.3
github.com/spf13/viper v1.3.2
go.etcd.io/bbolt v1.3.2
golang.org/x/sync v0.0.0-20190423024810-112230192c58
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
Expand Down
3 changes: 2 additions & 1 deletion internal/gateway/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type ActionController interface {
CancelLease(ctx context.Context, tokenStr string) error
CommitLease(ctx context.Context, tokenStr, oldRootHash, newRootHash string, tag gw.RepositoryTag) error
SubmitPayload(ctx context.Context, token string, payload io.Reader, digest string, headerSize int) error
RunGC(ctx context.Context, options GCOptions) (string, error)
StartGC(ctx context.Context, token string, options GCOptions) error
IsDoingGC(ctx context.Context, token string) bool
PublishManifest(ctx context.Context, repository string, message []byte) error
SubscribeToNotifications(ctx context.Context, repository string) SubscriberHandle
UnsubscribeFromNotifications(ctx context.Context, repository string, handle SubscriberHandle) error
Expand Down
82 changes: 65 additions & 17 deletions internal/gateway/backend/gc_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ package backend

import (
"context"
"fmt"
"os"
"os/exec"
"strconv"
"sync"
"time"

gw "github.com/cvmfs/gateway/internal/gateway"
)

// GCOptions represents the different options supplied for a garbace collection run
Expand All @@ -17,18 +22,42 @@ type GCOptions struct {
}

// RunGC triggers garbage collection on the specified repository
func (s *Services) RunGC(ctx context.Context, options GCOptions) (string, error) {
func (s *Services) StartGC(ctx context.Context, token string, options GCOptions) error {
t0 := time.Now()

outcome := "success"
defer logAction(ctx, "garbage_collection", &outcome, t0)

baseArgs := []string{"gc", "-f"}
leasePath, lease, err := s.Leases.GetLease(ctx, token)
if err != nil {
outcome = err.Error()
return err
}

if err := CheckToken(token, lease.Token.Secret); err != nil {
outcome = err.Error()
return err
}

if leasePath != fmt.Sprintf("%s%s", options.Repository, "/") {
err = fmt.Errorf("necessary lease on root of the repository `/` to run Garbage Collection")
outcome = err.Error()
return err
}

defer func() {
if err = s.Leases.CancelLease(ctx, token); err != nil {
// we are not really worried if something goes bad here but it still worth to log it
gw.LogC(ctx, "gc", gw.LogInfo).Msg(fmt.Sprintf("error in cancelling the lease: %s", err.Error()))
}
}()

baseArgs := []string{"gc", "-f", "-@"}
if options.NumRevisions != 0 {
baseArgs = append(baseArgs, "-r", strconv.Itoa(options.NumRevisions))
}
if !options.Timestamp.IsZero() {
baseArgs = append(baseArgs, "-t", options.Timestamp.String())
baseArgs = append(baseArgs, "-t", fmt.Sprintf("@%d", options.Timestamp.Unix()))
}
if options.DryRun {
baseArgs = append(baseArgs, "-d")
Expand All @@ -37,20 +66,39 @@ func (s *Services) RunGC(ctx context.Context, options GCOptions) (string, error)
baseArgs = append(baseArgs, "-l")
}

var output string
args := append(baseArgs, options.Repository)
if err := s.Leases.WithLock(ctx, options.Repository, func() error {
cmd := exec.Command("cvmfs_server", args...)
out, err := cmd.CombinedOutput()
if err != nil {
return err
}
output = string(out)
return nil
}); err != nil {
outcome = err.Error()
return "", err
}
gctime := 24 * time.Hour
// don't care about the cancel function
gcctx, _ := context.WithTimeout(context.Background(), gctime)

// we just start the GC and hold the lock on the token
// we wait untill the GC does not finish
// TODO set the results somewhere
// we should keep the logs and other stuff
go func() {
s.Leases.WithLock(gcctx, options.Repository, func() error {
var token_wg sync.WaitGroup
token_wg.Add(1)
defer token_wg.Done()
go s.Leases.WithLock(ctx, token, func() error {
// this is for the IsDoingGC check
token_wg.Wait()
return nil
})
fmt.Fprintf(os.Stdout, "Executing GC\n")
cmd := exec.Command("cvmfs_server", args...)
out, err := cmd.CombinedOutput()
fmt.Fprintf(os.Stdout, "Executing GC: out: %s\n", out)
if err != nil {
return err
}
return nil
})
}()

return nil
}

return output, nil
func (s *Services) IsDoingGC(ctx context.Context, token string) bool {
return s.Leases.IsLocked(token)
}
1 change: 1 addition & 0 deletions internal/gateway/backend/leasedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type LeaseDB interface {
CancelLeases(ctx context.Context, repoPath string) error
CancelLease(ctx context.Context, tokenStr string) error
WithLock(ctx context.Context, name string, task func() error) error
IsLocked(name string) bool
SetRepositoryEnabled(ctx context.Context, repository string, enable bool) error
GetRepositoryEnabled(ctx context.Context, repository string) bool
}
Expand Down
7 changes: 6 additions & 1 deletion internal/gateway/backend/leasedb_bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,12 @@ func (db *BoltLeaseDB) CancelLease(ctx context.Context, tokenStr string) error {

// WithLock runs the given task while holding a commit lock for the repository
func (db *BoltLeaseDB) WithLock(ctx context.Context, repository string, task func() error) error {
return db.locks.WithLock(repository, task)
return db.locks.WithLock(ctx, repository, task)
}

// Check if it is possible to acquire a lock on the repository
func (db *BoltLeaseDB) IsLocked(repository string) bool {
return db.locks.IsLocked(repository)
}

// SetRepositoryEnabled sets the enabled/disabled status for a given repository
Expand Down
5 changes: 5 additions & 0 deletions internal/gateway/backend/leasedb_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func (db *EtcdLeaseDB) WithLock(ctx context.Context, repository string, task fun
return nil
}

// Check if it is possible to acquire a lock on the repository
func (db *EtcdLeaseDB) IsLocked(repository string) bool {
return false
}

// SetRepositoryEnabled sets the enabled/disabled status for a given repository
func (db *EtcdLeaseDB) SetRepositoryEnabled(
ctx context.Context, repository string, enable bool) error {
Expand Down
7 changes: 6 additions & 1 deletion internal/gateway/backend/leasedb_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,12 @@ func (db *SqliteLeaseDB) CancelLease(ctx context.Context, tokenStr string) error

// WithLock runs the given task while holding a commit lock for the repository
func (db *SqliteLeaseDB) WithLock(ctx context.Context, repository string, task func() error) error {
return db.locks.WithLock(repository, task)
return db.locks.WithLock(ctx, repository, task)
}

// Check if it is possible to acquire a lock on the repository
func (db *SqliteLeaseDB) IsLocked(repository string) bool {
return db.locks.IsLocked(repository)
}

// SetRepositoryEnabled sets the enabled/disabled status for a given repository
Expand Down
33 changes: 27 additions & 6 deletions internal/gateway/backend/locks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package backend

import "sync"
import (
"context"
"sync"

"golang.org/x/sync/semaphore"
)

// NamedLocks provides a thread-safe map of named locks, used for locking
// repositories during critical operations (commits, GC, etc.)
Expand All @@ -10,11 +15,27 @@ type NamedLocks struct {

// WithLock runs the given task, locking the "name" mutex for the
// duration of the task
func (l *NamedLocks) WithLock(name string, task func() error) error {
m, _ := l.locks.LoadOrStore(name, &sync.Mutex{})
mtx := m.(*sync.Mutex)
mtx.Lock()
defer mtx.Unlock()
func (l *NamedLocks) WithLock(ctx context.Context, name string, task func() error) error {
s := semaphore.NewWeighted(1)
m, _ := l.locks.LoadOrStore(name, s)
sem := m.(*semaphore.Weighted)
sem.Acquire(ctx, 1)
defer sem.Release(1)

return task()
}

func (l *NamedLocks) IsLocked(name string) bool {
m, ok := l.locks.Load(name)
if !ok {
return false
}
sem := m.(*semaphore.Weighted)
couldAcquire := sem.TryAcquire(1)
if couldAcquire {
// we unlock after returning the value
defer sem.Release(1)
return false
}
return true
}
2 changes: 1 addition & 1 deletion internal/gateway/frontend/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func WithAuthz(ac be.ActionController, next httprouter.Handle) httprouter.Handle
return
}
}
} else if strings.HasPrefix(req.URL.Path, APIRoot+"/payloads") {
} else if strings.HasPrefix(req.URL.Path, APIRoot+"/payloads") || strings.HasPrefix(req.URL.Path, APIRoot+"/gc") {
token := ps.ByName("token")
if token != "" {
// For the new style of payload submission requests, use the token to compute HMAC
Expand Down
5 changes: 4 additions & 1 deletion internal/gateway/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ func NewFrontend(services be.ActionController, port int, timeout time.Duration)
router.POST(APIRoot+"/notifications/publish", tag(MakeNotificationsHandler(services)))
router.GET(APIRoot+"/notifications/subscribe", tag(MakeNotificationsHandler(services)))

// Garbage collection
router.POST(APIRoot+"/gc/:token", mw(GCStartHandler(services)))
router.GET(APIRoot+"/gc/:token/check", mw(GCCheckHandler(services)))

// Admin routes
router.POST(APIRoot+"/repos/:name", amw(MakeAdminReposHandler(services)))
router.DELETE(APIRoot+"/leases-by-path/*path", amw(MakeAdminLeasesHandler(services)))
router.POST(APIRoot+"/gc", amw(MakeGCHandler(services)))

// Configure and start the HTTP server
srv := &http.Server{
Expand Down
26 changes: 23 additions & 3 deletions internal/gateway/frontend/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
)

// MakeGCHandler creates an HTTP handler for the "/gc" endpoint
func MakeGCHandler(services be.ActionController) httprouter.Handle {
func GCStartHandler(services be.ActionController) httprouter.Handle {
return func(w http.ResponseWriter, h *http.Request, ps httprouter.Params) {
token := ps.ByName("token")

ctx := h.Context()

var options be.GCOptions
Expand All @@ -21,11 +23,29 @@ func MakeGCHandler(services be.ActionController) httprouter.Handle {
}

msg := map[string]interface{}{"status": "ok"}
if output, err := services.RunGC(ctx, options); err != nil {
if err := services.StartGC(ctx, token, options); err != nil {
msg["status"] = "error"
msg["reason"] = err.Error()
} else {
msg["output"] = output
msg["output"] = "GC started"
}
gw.LogC(ctx, "http", gw.LogInfo).Msg("request processed")

replyJSON(ctx, w, msg)
}
}

// MakeGCHandler creates an HTTP handler for the "/gc" endpoint
func GCCheckHandler(services be.ActionController) httprouter.Handle {
return func(w http.ResponseWriter, h *http.Request, ps httprouter.Params) {
token := ps.ByName("token")

ctx := h.Context()

msg := map[string]interface{}{"status": "in_progress"}
if services.IsDoingGC(ctx, token) {
} else {
msg["status"] = "done"
}

gw.LogC(ctx, "http", gw.LogInfo).Msg("request processed")
Expand Down
3 changes: 3 additions & 0 deletions vendor/golang.org/x/sync/AUTHORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/golang.org/x/sync/CONTRIBUTORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions vendor/golang.org/x/sync/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions vendor/golang.org/x/sync/PATENTS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading