Skip to content

Commit

Permalink
Adding a parameter on servers to delete artifacts that have not been …
Browse files Browse the repository at this point in the history
…accessed for more than a certain period of time (defaults to roughly 1 month)
  • Loading branch information
peterebden committed Sep 8, 2016
1 parent a0300a4 commit f6a2247
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 8 deletions.
3 changes: 2 additions & 1 deletion src/cache/http_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"runtime"
"testing"
"time"

"cache/server"
"core"
Expand All @@ -26,7 +27,7 @@ func init() {
target = core.NewBuildTarget(label)

// Arbitrary large numbers so the cleaner never needs to run.
cache := server.NewCache("src/cache/test_data", 100000, 100000000, 1000000000)
cache := server.NewCache("src/cache/test_data", 20*time.Hour, 100000, 100000000, 1000000000)
key, _ = ioutil.ReadFile("src/cache/test_data/testfile")
testServer := httptest.NewServer(server.BuildRouter(cache))

Expand Down
2 changes: 1 addition & 1 deletion src/cache/rpc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func init() {

func startServer(port int, keyFile, certFile, caCertFile string) *grpc.Server {
// Arbitrary large numbers so the cleaner never needs to run.
cache := server.NewCache("src/cache/test_data", 100000, 100000000, 1000000000)
cache := server.NewCache("src/cache/test_data", 20*time.Hour, 100000, 100000000, 1000000000)
s, lis := server.BuildGrpcServer(port, cache, keyFile, certFile, caCertFile, "", "")
go s.Serve(lis)
return s
Expand Down
31 changes: 26 additions & 5 deletions src/cache/server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ type Cache struct {
// NewCache initialises the cache and fires off a background cleaner goroutine which runs every
// cleanFrequency seconds. The high and low water marks control a (soft) max size and a (harder)
// minimum size.
func NewCache(path string, cleanFrequency time.Duration, lowWaterMark, highWaterMark uint64) *Cache {
log.Notice("Initialising cache with settings:\n Path: %s\n Clean frequency: %s\n Low water mark: %s\n High water mark: %s",
path, cleanFrequency, humanize.Bytes(lowWaterMark), humanize.Bytes(highWaterMark))
func NewCache(path string, cleanFrequency, maxArtifactAge time.Duration, lowWaterMark, highWaterMark uint64) *Cache {
log.Notice("Initialising cache with settings:\n Path: %s\n Clean frequency: %s\n Max artifact age: %s\n Low water mark: %s\n High water mark: %s",
path, cleanFrequency, maxArtifactAge, humanize.Bytes(lowWaterMark), humanize.Bytes(highWaterMark))
cache := newCache(path)
go cache.clean(cleanFrequency, int64(lowWaterMark), int64(highWaterMark))
go cache.clean(cleanFrequency, maxArtifactAge, int64(lowWaterMark), int64(highWaterMark))
return cache
}

Expand Down Expand Up @@ -273,12 +273,33 @@ func (cache *Cache) DeleteAllArtifacts() error {
}

// clean implements a periodic clean of the cache to remove old artifacts.
func (cache *Cache) clean(cleanFrequency time.Duration, lowWaterMark, highWaterMark int64) {
func (cache *Cache) clean(cleanFrequency, maxArtifactAge time.Duration, lowWaterMark, highWaterMark int64) {
for range time.NewTicker(cleanFrequency).C {
cache.cleanOldFiles(maxArtifactAge)
cache.singleClean(lowWaterMark, highWaterMark)
}
}

// cleanOldFiles cleans any files whose last access time is older than the given duration.
func (cache *Cache) cleanOldFiles(maxArtifactAge time.Duration) bool {
log.Debug("Searching for old files...")
oldestTime := time.Now().Add(-maxArtifactAge)
cleaned := 0
for t := range cache.cachedFiles.IterBuffered() {
f := t.Val.(*cachedFile)
if f.lastReadTime.Before(oldestTime) {
lock := cache.lockFile(t.Key, true, f.size)
cache.removeAndDeleteFile(t.Key, f)
lock.Unlock()
cleaned++
}
}
if cleaned > 0 {
log.Notice("Removed %d old files", cleaned)
}
return cleaned > 0
}

// singleClean runs a single clean of the cache. It's split out for testing purposes.
func (cache *Cache) singleClean(lowWaterMark, highWaterMark int64) bool {
log.Debug("Total size: %d High water mark: %d", cache.totalSize, highWaterMark)
Expand Down
22 changes: 22 additions & 0 deletions src/cache/server/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,28 @@ func TestFilesToClean(t *testing.T) {
assert.Equal(t, 2, len(paths))
}

func TestCleanOldFiles(t *testing.T) {
c := newCache("test_clean_old_files")
c.cachedFiles.Set("test/artifact/1", &cachedFile{
lastReadTime: time.Now().AddDate(0, 0, -2),
readCount: 0,
size: 1000,
})
c.cachedFiles.Set("test/artifact/2", &cachedFile{
lastReadTime: time.Now().AddDate(0, 0, -5),
readCount: 0,
size: 1000,
})
c.cachedFiles.Set("test/artifact/3", &cachedFile{
lastReadTime: time.Now().AddDate(0, 0, -1),
readCount: 0,
size: 1000,
})
c.totalSize = 3000
assert.True(t, c.cleanOldFiles(72*time.Hour))
assert.Equal(t, 2, c.cachedFiles.Count())
}

func TestRetrieve(t *testing.T) {
artifact, err := cache.RetrieveArtifact("darwin_amd64/pack/label/hash/label.ext")
assert.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions src/cache/server/http_server_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var opts struct {
LowWaterMark output.ByteSize `short:"l" long:"low_water_mark" description:"Size of cache to clean down to" default:"18G"`
HighWaterMark output.ByteSize `short:"i" long:"high_water_mark" description:"Max size of cache to clean at" default:"20G"`
CleanFrequency output.Duration `short:"f" long:"clean_frequency" description:"Frequency to clean cache at" default:"10m"`
MaxArtifactAge output.Duration `short:"m" long:"max_artifact_age" description:"Clean any artifact that's not been read in this long" default:"720h"`
} `group:"Options controlling when to clean the cache"`
}

Expand All @@ -31,6 +32,7 @@ func main() {
output.InitLogging(opts.Verbosity, opts.LogFile, opts.Verbosity)
log.Notice("Initialising cache server...")
cache := server.NewCache(opts.Dir, time.Duration(opts.CleanFlags.CleanFrequency),
time.Duration(opts.CleanFlags.MaxArtifactAge),
uint64(opts.CleanFlags.LowWaterMark), uint64(opts.CleanFlags.HighWaterMark))
log.Notice("Starting up http cache server on port %d...", opts.Port)
router := server.BuildRouter(cache)
Expand Down
2 changes: 2 additions & 0 deletions src/cache/server/rpc_server_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var opts struct {
LowWaterMark output.ByteSize `short:"l" long:"low_water_mark" description:"Size of cache to clean down to" default:"18G"`
HighWaterMark output.ByteSize `short:"i" long:"high_water_mark" description:"Max size of cache to clean at" default:"20G"`
CleanFrequency output.Duration `short:"f" long:"clean_frequency" description:"Frequency to clean cache at" default:"10m"`
MaxArtifactAge output.Duration `short:"m" long:"max_artifact_age" description:"Clean any artifact that's not been read in this long" default:"720h"`
} `group:"Options controlling when to clean the cache"`

TLSFlags struct {
Expand All @@ -42,6 +43,7 @@ func main() {
}
log.Notice("Scanning existing cache directory %s...", opts.Dir)
cache := server.NewCache(opts.Dir, time.Duration(opts.CleanFlags.CleanFrequency),
time.Duration(opts.CleanFlags.MaxArtifactAge),
uint64(opts.CleanFlags.LowWaterMark), uint64(opts.CleanFlags.HighWaterMark))
log.Notice("Starting up RPC cache server on port %d...", opts.Port)
server.ServeGrpcForever(opts.Port, cache, opts.TLSFlags.KeyFile, opts.TLSFlags.CertFile,
Expand Down
2 changes: 1 addition & 1 deletion src/cache/server/rpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
)

func startServer(port int, auth bool, readonlyCerts, writableCerts string) *grpc.Server {
cache := NewCache(testDir, 100, 1000000, 1000000)
cache := NewCache(testDir, 20*time.Hour, 100, 1000000, 1000000)
if !auth {
s, lis := BuildGrpcServer(port, cache, "", "", "", readonlyCerts, writableCerts)
go s.Serve(lis)
Expand Down

0 comments on commit f6a2247

Please sign in to comment.