Skip to content

Commit

Permalink
meta/session: exit bg tasks when close
Browse files Browse the repository at this point in the history
Signed-off-by: jiefenghuang <[email protected]>
  • Loading branch information
jiefenghuang committed Dec 31, 2024
1 parent beaa0d0 commit b3f7ca2
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cmd/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func destroy(ctx *cli.Context) error {
}

if !ctx.Bool("force") {
m.CleanStaleSessions()
m.CleanStaleSessions(meta.Background())
sessions, err := m.ListSessions()
if err != nil {
logger.Fatalf("list sessions: %s", err)
Expand Down
153 changes: 111 additions & 42 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type engine interface {
doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) // limit < 0 means all
doDeleteFileData(inode Ino, length uint64)
doCleanupSlices()
doCleanupDelayedSlices(edge int64) (int, error)
doCleanupDelayedSlices(ctx Context, edge int64) (int, error)
doDeleteSlice(id uint64, size uint32) error

doCloneEntry(ctx Context, srcIno Ino, parent Ino, name string, ino Ino, attr *Attr, cmode uint8, cumask uint16, top bool) syscall.Errno
Expand Down Expand Up @@ -182,10 +182,17 @@ func (symCache *symlinkCache) Store(inode Ino, path []byte) {
}
}

func (symCache *symlinkCache) clean() {
func (symCache *symlinkCache) clean(ctx Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
time.Sleep(time.Minute)
symCache.doClean()
select {
case <-ctx.Done():
return
case <-ticker.C:
symCache.doClean()
}
}
}

Expand Down Expand Up @@ -226,6 +233,10 @@ type baseMeta struct {
sesMu sync.Mutex
aclCache aclAPI.Cache

sessCtx Context
sessWG sync.WaitGroup
dSliceWG sync.WaitGroup

dirStatsLock sync.Mutex
dirStats map[Ino]dirStat

Expand All @@ -249,9 +260,9 @@ type baseMeta struct {
totalInodesG prometheus.Gauge
txDist prometheus.Histogram
txRestart prometheus.Counter
opDist prometheus.Histogram
opCount *prometheus.CounterVec
opDuration *prometheus.CounterVec
opDist prometheus.Histogram
opCount *prometheus.CounterVec
opDuration *prometheus.CounterVec

en engine
}
Expand Down Expand Up @@ -447,9 +458,11 @@ func (m *baseMeta) newSessionInfo() []byte {
}

func (m *baseMeta) NewSession(record bool) error {
go m.refresh()
m.sessCtx = Background()
ctx := m.sessCtx
go m.refresh(ctx)

if err := m.en.cacheACLs(Background()); err != nil {
if err := m.en.cacheACLs(ctx); err != nil {
return err
}

Expand Down Expand Up @@ -477,27 +490,56 @@ func (m *baseMeta) NewSession(record bool) error {
}

m.loadQuotas()
go m.flushStats()
go m.flushDirStat()
go m.flushQuotas()

if m.conf.MaxDeletes > 0 {
m.dslices = make(chan Slice, m.conf.MaxDeletes*10240)
for i := 0; i < m.conf.MaxDeletes; i++ {
go func() {
for s := range m.dslices {

m.sessWG.Add(3)
go m.flushStats(ctx)
go m.flushDirStat(ctx)
go m.flushQuotas(ctx)
m.startDeleteSliceTasks() // start MaxDeletes tasks

if !m.conf.NoBGJob {
m.sessWG.Add(4)
go m.cleanupDeletedFiles(ctx)
go m.cleanupSlices(ctx)
go m.cleanupTrash(ctx)
go m.symlinks.clean(ctx, &m.sessWG)
}
return nil
}

func (m *baseMeta) startDeleteSliceTasks() {
if m.conf.MaxDeletes <= 0 || m.dslices != nil {
return
}
m.sessWG.Add(m.conf.MaxDeletes)
m.dSliceWG.Add(m.conf.MaxDeletes)
m.dslices = make(chan Slice, m.conf.MaxDeletes*10240)
for i := 0; i < m.conf.MaxDeletes; i++ {
go func() {
defer m.sessWG.Done()
defer m.dSliceWG.Done()
for {
select {
case <-m.sessCtx.Done():
return
case s, ok := <-m.dslices:
if !ok {
return
}
m.deleteSlice_(s.Id, s.Size)
}
}()
}
}
}()
}
if !m.conf.NoBGJob {
go m.cleanupDeletedFiles()
go m.cleanupSlices()
go m.cleanupTrash()
go m.symlinks.clean()
}

func (m *baseMeta) stopDeleteSliceTasks() {
if m.conf.MaxDeletes <= 0 || m.dslices == nil {
return
}
return nil
close(m.dslices)
m.dSliceWG.Wait()
m.dslices = nil
}

func (m *baseMeta) expireTime() int64 {
Expand All @@ -516,8 +558,11 @@ func (m *baseMeta) OnReload(fn func(f *Format)) {

const UmountCode = 11

func (m *baseMeta) refresh() {
func (m *baseMeta) refresh(ctx Context) {
for {
if ctx.Canceled() {
return
}
if m.conf.Heartbeat > 0 {
utils.SleepWithJitter(m.conf.Heartbeat)
} else { // use default value
Expand Down Expand Up @@ -575,18 +620,21 @@ func (m *baseMeta) refresh() {
if ok, err := m.en.setIfSmall("lastCleanupSessions", time.Now().Unix(), int64((m.conf.Heartbeat * 9 / 10).Seconds())); err != nil {
logger.Warnf("checking counter lastCleanupSessions: %s", err)
} else if ok {
go m.CleanStaleSessions()
go m.CleanStaleSessions(ctx)
}
}
}

func (m *baseMeta) CleanStaleSessions() {
func (m *baseMeta) CleanStaleSessions(ctx Context) {
sids, err := m.en.doFindStaleSessions(1000)
if err != nil {
logger.Warnf("scan stale sessions: %s", err)
return
}
for _, sid := range sids {
if ctx.Canceled() {
return
}
s, err := m.en.GetSession(sid, false)
if err != nil {
logger.Warnf("Get session info %d: %v", sid, err)
Expand All @@ -605,6 +653,9 @@ func (m *baseMeta) CloseSession() error {
if m.sid > 0 {
err = m.en.doCleanStaleSession(m.sid)
}
m.sessCtx.Cancel()
m.sessWG.Wait()
m.stopDeleteSliceTasks()
logger.Infof("close session %d: %v", m.sid, err)
return err
}
Expand All @@ -623,9 +674,14 @@ func (m *baseMeta) Init(format *Format, force bool) error {
return m.en.doInit(format, force)
}

func (m *baseMeta) cleanupDeletedFiles() {
func (m *baseMeta) cleanupDeletedFiles(ctx Context) {
defer m.sessWG.Done()
for {
utils.SleepWithJitter(time.Hour)
select {
case <-ctx.Done():
return
case <-time.After(utils.JitterIt(time.Hour)):
}
if ok, err := m.en.setIfSmall("lastCleanupFiles", time.Now().Unix(), int64(time.Hour.Seconds())*9/10); err != nil {
logger.Warnf("checking counter lastCleanupFiles: %s", err)
} else if ok {
Expand All @@ -646,9 +702,14 @@ func (m *baseMeta) cleanupDeletedFiles() {
}
}

func (m *baseMeta) cleanupSlices() {
func (m *baseMeta) cleanupSlices(ctx Context) {
defer m.sessWG.Done()
for {
utils.SleepWithJitter(time.Hour)
select {
case <-ctx.Done():
return
case <-time.After(utils.JitterIt(time.Hour)):
}
if ok, err := m.en.setIfSmall("nextCleanupSlices", time.Now().Unix(), int64(time.Hour.Seconds())*9/10); err != nil {
logger.Warnf("checking counter nextCleanupSlices: %s", err)
} else if ok {
Expand Down Expand Up @@ -2400,10 +2461,15 @@ func (m *baseMeta) trashEntry(parent, inode Ino, name string) string {
return s
}

func (m *baseMeta) cleanupTrash() {
func (m *baseMeta) cleanupTrash(ctx Context) {
defer m.sessWG.Done()
for {
utils.SleepWithJitter(time.Hour)
if st := m.en.doGetAttr(Background(), TrashInode, nil); st != 0 {
select {
case <-ctx.Done():
return
case <-time.After(utils.JitterIt(time.Hour)):
}
if st := m.en.doGetAttr(ctx, TrashInode, nil); st != 0 {
if st != syscall.ENOENT {
logger.Warnf("getattr inode %d: %s", TrashInode, st)
}
Expand All @@ -2413,8 +2479,8 @@ func (m *baseMeta) cleanupTrash() {
logger.Warnf("checking counter lastCleanupTrash: %s", err)
} else if ok {
days := m.getFormat().TrashDays
go m.doCleanupTrash(days, false)
go m.cleanupDelayedSlices(days)
go m.doCleanupTrash(ctx, days, false)
go m.cleanupDelayedSlices(ctx, days)
}
}
}
Expand Down Expand Up @@ -2451,6 +2517,9 @@ func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress
}()
batch := 1000000
for len(entries) > 0 {
if ctx.Canceled() {
return
}
e := entries[0]
ts, err := time.Parse("2006-01-02-15", string(e.Name))
if err != nil {
Expand Down Expand Up @@ -2533,19 +2602,19 @@ func (m *baseMeta) scanTrashFiles(ctx Context, scan trashFileScan) error {
return nil
}

func (m *baseMeta) doCleanupTrash(days int, force bool) {
func (m *baseMeta) doCleanupTrash(ctx Context, days int, force bool) {
edge := time.Now().Add(-time.Duration(24*days+2) * time.Hour)
if force {
edge = time.Now()
}
m.CleanupTrashBefore(Background(), edge, nil)
m.CleanupTrashBefore(ctx, edge, nil)
}

func (m *baseMeta) cleanupDelayedSlices(days int) {
func (m *baseMeta) cleanupDelayedSlices(ctx Context, days int) {
now := time.Now()
edge := now.Unix() - int64(days)*24*3600
logger.Debugf("Cleanup delayed slices: started with edge %d", edge)
if count, err := m.en.doCleanupDelayedSlices(edge); err != nil {
if count, err := m.en.doCleanupDelayedSlices(ctx, edge); err != nil {
logger.Warnf("Cleanup delayed slices: deleted %d slices in %v, but got error: %s", count, time.Since(now), err)
} else if count > 0 {
logger.Infof("Cleanup delayed slices: deleted %d slices in %v", count, time.Since(now))
Expand Down
22 changes: 15 additions & 7 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ import (
"testing"
"time"

"xorm.io/xorm"

aclAPI "github.com/juicedata/juicefs/pkg/acl"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"xorm.io/xorm"
)

func testConfig() *Config {
Expand Down Expand Up @@ -429,7 +428,7 @@ func testMetaClient(t *testing.T, m Meta) {
if base.sid != ses[0].Sid {
t.Fatalf("my sid %d != registered sid %d", base.sid, ses[0].Sid)
}
go m.CleanStaleSessions()
go m.CleanStaleSessions(Background())

var parent, inode, dummyInode Ino
if st := m.Mkdir(ctx, 1, "d", 0640, 022, 0, &parent, attr); st != 0 {
Expand Down Expand Up @@ -1406,6 +1405,11 @@ func testCompaction(t *testing.T, m Meta, trash bool) {
} else {
_ = m.Init(testFormat(), false)
}

if err := m.NewSession(false); err != nil {
t.Fatalf("new session: %v", err)
}
defer m.CloseSession()
var l sync.Mutex
deleted := make(map[uint64]int)
m.OnMsg(DeleteSlice, func(args ...interface{}) error {
Expand Down Expand Up @@ -1500,15 +1504,16 @@ func testCompaction(t *testing.T, m Meta, trash bool) {
if len(sliceMap[1]) < 200 {
t.Fatalf("list delayed slices %d is less than 200", len(sliceMap[1]))
}
m.(engine).doCleanupDelayedSlices(time.Now().Unix() + 1)
m.(engine).doCleanupDelayedSlices(ctx, time.Now().Unix()+1)
}
time.Sleep(time.Second * 3) // wait for all slices deleted
m.getBase().stopDeleteSliceTasks()
l.Lock()
deletes := len(deleted)
l.Unlock()
if deletes < 200 {
t.Fatalf("deleted slices %d is less than 200", deletes)
}
m.getBase().startDeleteSliceTasks()

// truncate to 0
if st := m.Truncate(ctx, inode, 0, 0, attr, false); st != 0 {
Expand Down Expand Up @@ -2013,7 +2018,7 @@ func testTrash(t *testing.T, m Meta) {
if st := m.Rename(ctx2, TrashInode+1, "d", 1, "f", 0, &inode, attr); st != syscall.EPERM {
t.Fatalf("rename d -> f: %s", st)
}
m.getBase().doCleanupTrash(format.TrashDays, true)
m.getBase().doCleanupTrash(Background(), format.TrashDays, true)
if st := m.GetAttr(ctx2, TrashInode+1, attr); st != syscall.ENOENT {
t.Fatalf("getattr: %s", st)
}
Expand Down Expand Up @@ -2531,7 +2536,10 @@ func testDirStat(t *testing.T, m Meta) {
if st := m.Mkdir(Background(), RootInode, testDir, 0640, 022, 0, &testInode, nil); st != 0 {
t.Fatalf("mkdir: %s", st)
}

if err := m.NewSession(true); err != nil {
t.Fatalf("new session: %s", err)
}
defer m.CloseSession()
stat, st := m.GetDirStat(Background(), testInode)
checkResult := func(length, space, inodes int64) {
if st != 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ type Meta interface {
// ListLocks returns all locks of a inode.
ListLocks(ctx context.Context, inode Ino) ([]PLockItem, []FLockItem, error)
// CleanStaleSessions cleans up sessions not active for more than 5 minutes
CleanStaleSessions()
CleanStaleSessions(ctx Context)
// CleanupTrashBefore deletes all files in trash before the given time.
CleanupTrashBefore(ctx Context, edge time.Time, increProgress func(int))
// CleanupDetachedNodesBefore deletes all detached nodes before the given time.
Expand Down
Loading

0 comments on commit b3f7ca2

Please sign in to comment.