diff --git a/src/cache/http_cache_test.go b/src/cache/http_cache_test.go index 74a436c72c..c10da25f86 100644 --- a/src/cache/http_cache_test.go +++ b/src/cache/http_cache_test.go @@ -7,6 +7,7 @@ import ( "path/filepath" "runtime" "testing" + "time" "cache/server" "core" @@ -26,7 +27,7 @@ func init() { target = core.NewBuildTarget(label) // Arbitrary large numbers so the cleaner never needs to run. - cache := server.NewCache("src/cache/test_data", 100000, 100000000, 1000000000) + cache := server.NewCache("src/cache/test_data", 20*time.Hour, 100000, 100000000, 1000000000) key, _ = ioutil.ReadFile("src/cache/test_data/testfile") testServer := httptest.NewServer(server.BuildRouter(cache)) diff --git a/src/cache/rpc_cache_test.go b/src/cache/rpc_cache_test.go index aa9ea401ca..57731242c9 100644 --- a/src/cache/rpc_cache_test.go +++ b/src/cache/rpc_cache_test.go @@ -38,7 +38,7 @@ func init() { func startServer(port int, keyFile, certFile, caCertFile string) *grpc.Server { // Arbitrary large numbers so the cleaner never needs to run. - cache := server.NewCache("src/cache/test_data", 100000, 100000000, 1000000000) + cache := server.NewCache("src/cache/test_data", 20*time.Hour, 100000, 100000000, 1000000000) s, lis := server.BuildGrpcServer(port, cache, keyFile, certFile, caCertFile, "", "") go s.Serve(lis) return s diff --git a/src/cache/server/cache.go b/src/cache/server/cache.go index ec666551d1..908ea41af3 100644 --- a/src/cache/server/cache.go +++ b/src/cache/server/cache.go @@ -42,11 +42,11 @@ type Cache struct { // NewCache initialises the cache and fires off a background cleaner goroutine which runs every // cleanFrequency seconds. The high and low water marks control a (soft) max size and a (harder) // minimum size. -func NewCache(path string, cleanFrequency time.Duration, lowWaterMark, highWaterMark uint64) *Cache { - log.Notice("Initialising cache with settings:\n Path: %s\n Clean frequency: %s\n Low water mark: %s\n High water mark: %s", - path, cleanFrequency, humanize.Bytes(lowWaterMark), humanize.Bytes(highWaterMark)) +func NewCache(path string, cleanFrequency, maxArtifactAge time.Duration, lowWaterMark, highWaterMark uint64) *Cache { + log.Notice("Initialising cache with settings:\n Path: %s\n Clean frequency: %s\n Max artifact age: %s\n Low water mark: %s\n High water mark: %s", + path, cleanFrequency, maxArtifactAge, humanize.Bytes(lowWaterMark), humanize.Bytes(highWaterMark)) cache := newCache(path) - go cache.clean(cleanFrequency, int64(lowWaterMark), int64(highWaterMark)) + go cache.clean(cleanFrequency, maxArtifactAge, int64(lowWaterMark), int64(highWaterMark)) return cache } @@ -273,12 +273,33 @@ func (cache *Cache) DeleteAllArtifacts() error { } // clean implements a periodic clean of the cache to remove old artifacts. -func (cache *Cache) clean(cleanFrequency time.Duration, lowWaterMark, highWaterMark int64) { +func (cache *Cache) clean(cleanFrequency, maxArtifactAge time.Duration, lowWaterMark, highWaterMark int64) { for range time.NewTicker(cleanFrequency).C { + cache.cleanOldFiles(maxArtifactAge) cache.singleClean(lowWaterMark, highWaterMark) } } +// cleanOldFiles cleans any files whose last access time is older than the given duration. +func (cache *Cache) cleanOldFiles(maxArtifactAge time.Duration) bool { + log.Debug("Searching for old files...") + oldestTime := time.Now().Add(-maxArtifactAge) + cleaned := 0 + for t := range cache.cachedFiles.IterBuffered() { + f := t.Val.(*cachedFile) + if f.lastReadTime.Before(oldestTime) { + lock := cache.lockFile(t.Key, true, f.size) + cache.removeAndDeleteFile(t.Key, f) + lock.Unlock() + cleaned++ + } + } + if cleaned > 0 { + log.Notice("Removed %d old files", cleaned) + } + return cleaned > 0 +} + // singleClean runs a single clean of the cache. It's split out for testing purposes. func (cache *Cache) singleClean(lowWaterMark, highWaterMark int64) bool { log.Debug("Total size: %d High water mark: %d", cache.totalSize, highWaterMark) diff --git a/src/cache/server/cache_test.go b/src/cache/server/cache_test.go index 99c39922c8..3ad1b2338a 100644 --- a/src/cache/server/cache_test.go +++ b/src/cache/server/cache_test.go @@ -46,6 +46,28 @@ func TestFilesToClean(t *testing.T) { assert.Equal(t, 2, len(paths)) } +func TestCleanOldFiles(t *testing.T) { + c := newCache("test_clean_old_files") + c.cachedFiles.Set("test/artifact/1", &cachedFile{ + lastReadTime: time.Now().AddDate(0, 0, -2), + readCount: 0, + size: 1000, + }) + c.cachedFiles.Set("test/artifact/2", &cachedFile{ + lastReadTime: time.Now().AddDate(0, 0, -5), + readCount: 0, + size: 1000, + }) + c.cachedFiles.Set("test/artifact/3", &cachedFile{ + lastReadTime: time.Now().AddDate(0, 0, -1), + readCount: 0, + size: 1000, + }) + c.totalSize = 3000 + assert.True(t, c.cleanOldFiles(72*time.Hour)) + assert.Equal(t, 2, c.cachedFiles.Count()) +} + func TestRetrieve(t *testing.T) { artifact, err := cache.RetrieveArtifact("darwin_amd64/pack/label/hash/label.ext") assert.NoError(t, err) diff --git a/src/cache/server/http_server_main.go b/src/cache/server/http_server_main.go index 5c68fbe576..2fb528a164 100644 --- a/src/cache/server/http_server_main.go +++ b/src/cache/server/http_server_main.go @@ -23,6 +23,7 @@ var opts struct { LowWaterMark output.ByteSize `short:"l" long:"low_water_mark" description:"Size of cache to clean down to" default:"18G"` HighWaterMark output.ByteSize `short:"i" long:"high_water_mark" description:"Max size of cache to clean at" default:"20G"` CleanFrequency output.Duration `short:"f" long:"clean_frequency" description:"Frequency to clean cache at" default:"10m"` + MaxArtifactAge output.Duration `short:"m" long:"max_artifact_age" description:"Clean any artifact that's not been read in this long" default:"720h"` } `group:"Options controlling when to clean the cache"` } @@ -31,6 +32,7 @@ func main() { output.InitLogging(opts.Verbosity, opts.LogFile, opts.Verbosity) log.Notice("Initialising cache server...") cache := server.NewCache(opts.Dir, time.Duration(opts.CleanFlags.CleanFrequency), + time.Duration(opts.CleanFlags.MaxArtifactAge), uint64(opts.CleanFlags.LowWaterMark), uint64(opts.CleanFlags.HighWaterMark)) log.Notice("Starting up http cache server on port %d...", opts.Port) router := server.BuildRouter(cache) diff --git a/src/cache/server/rpc_server_main.go b/src/cache/server/rpc_server_main.go index edb0a74841..a34d653154 100644 --- a/src/cache/server/rpc_server_main.go +++ b/src/cache/server/rpc_server_main.go @@ -21,6 +21,7 @@ var opts struct { LowWaterMark output.ByteSize `short:"l" long:"low_water_mark" description:"Size of cache to clean down to" default:"18G"` HighWaterMark output.ByteSize `short:"i" long:"high_water_mark" description:"Max size of cache to clean at" default:"20G"` CleanFrequency output.Duration `short:"f" long:"clean_frequency" description:"Frequency to clean cache at" default:"10m"` + MaxArtifactAge output.Duration `short:"m" long:"max_artifact_age" description:"Clean any artifact that's not been read in this long" default:"720h"` } `group:"Options controlling when to clean the cache"` TLSFlags struct { @@ -42,6 +43,7 @@ func main() { } log.Notice("Scanning existing cache directory %s...", opts.Dir) cache := server.NewCache(opts.Dir, time.Duration(opts.CleanFlags.CleanFrequency), + time.Duration(opts.CleanFlags.MaxArtifactAge), uint64(opts.CleanFlags.LowWaterMark), uint64(opts.CleanFlags.HighWaterMark)) log.Notice("Starting up RPC cache server on port %d...", opts.Port) server.ServeGrpcForever(opts.Port, cache, opts.TLSFlags.KeyFile, opts.TLSFlags.CertFile, diff --git a/src/cache/server/rpc_server_test.go b/src/cache/server/rpc_server_test.go index c10893a57d..5a77c66460 100644 --- a/src/cache/server/rpc_server_test.go +++ b/src/cache/server/rpc_server_test.go @@ -26,7 +26,7 @@ const ( ) func startServer(port int, auth bool, readonlyCerts, writableCerts string) *grpc.Server { - cache := NewCache(testDir, 100, 1000000, 1000000) + cache := NewCache(testDir, 20*time.Hour, 100, 1000000, 1000000) if !auth { s, lis := BuildGrpcServer(port, cache, "", "", "", readonlyCerts, writableCerts) go s.Serve(lis)