Skip to content

Commit

Permalink
Disable jetstream if disk permission error while writing raft state
Browse files Browse the repository at this point in the history
  • Loading branch information
souravagrawal committed Jan 16, 2025
1 parent 5dc283f commit 9b4d96d
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 21 deletions.
32 changes: 13 additions & 19 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,11 +517,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Do age checks too, make sure to call in place.
if fs.cfg.MaxAge != 0 {
err := fs.expireMsgsOnRecover()
if err != nil && err == errFileSystemPermissionDenied {
fs.srv.Warnf("file system permission denied while expiring msgs, disabling jetstream: %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
if isPermissionError(err) {
return nil, err
}
fs.startAgeChk()
Expand Down Expand Up @@ -2127,7 +2123,7 @@ func (fs *fileStore) expireMsgsOnRecover() error {
return true
})
err := mb.dirtyCloseWithRemove(true)
if err != nil && err == errFileSystemPermissionDenied {
if isPermissionError(err) {
return err
}
deleted++
Expand All @@ -2146,11 +2142,10 @@ func (fs *fileStore) expireMsgsOnRecover() error {
purged += mb.msgs
bytes += mb.bytes
err := deleteEmptyBlock(mb)
if err != nil && err == errFileSystemPermissionDenied {
mb.mu.Unlock()
mb.mu.Unlock()
if isPermissionError(err) {
return err
}
mb.mu.Unlock()
continue
}

Expand Down Expand Up @@ -3703,7 +3698,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
dios <- struct{}{}

if err != nil {
if os.IsPermission(err) {
if isPermissionError(err) {
return nil, err
}
mb.dirtyCloseWithRemove(true)
Expand Down Expand Up @@ -6640,7 +6635,6 @@ var (
errNoMainKey = errors.New("encrypted store encountered with no main key")
errNoBlkData = errors.New("message block data missing")
errStateTooBig = errors.New("store state too big for optional write")
errFileSystemPermissionDenied = errors.New("storage directory not writeable")
)

const (
Expand Down Expand Up @@ -8132,15 +8126,15 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error {
mb.fss = nil
if mb.mfn != _EMPTY_ {
err := os.Remove(mb.mfn)
if err != nil && os.IsPermission(err){
return errFileSystemPermissionDenied
if isPermissionError(err) {
return err
}
mb.mfn = _EMPTY_
}
if mb.kfn != _EMPTY_ {
err := os.Remove(mb.kfn)
if err != nil && os.IsPermission(err){
return errFileSystemPermissionDenied
if isPermissionError(err) {
return err
}
}
}
Expand Down Expand Up @@ -8562,8 +8556,8 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
select {
case <-t.C:
err := fs.writeFullState()
if err != nil && os.IsPermission(err) {
fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err)
if isPermissionError(err) {
fs.warn("File system permission denied when flushing stream state, disabling jetstream %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
Expand Down Expand Up @@ -8778,8 +8772,8 @@ func (fs *fileStore) _writeFullState(force bool) error {
// Protect with dios.
<-dios
err := os.WriteFile(fn, buf, defaultFilePerms)
// if file system is not writable os.IsPermission is set to true
if err != nil && os.IsPermission(err) {
// if file system is not writable isPermissionError is set to true
if isPermissionError(err) {
return err
}
dios <- struct{}{}
Expand Down
11 changes: 11 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3034,3 +3034,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) {
cfg.Duplicates = 0
}
}

func (s *Server) handleWritePermissionError() {
//TODO Check if we should add s.jetStreamOOSPending in condition
if s.JetStreamEnabled() {
s.Errorf("file system permission denied while writing, disabling jetstream")

go s.DisableJetStream()

//TODO Send respective advisory if needed, same as in handleOutOfSpace
}
}
4 changes: 4 additions & 0 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3978,6 +3978,10 @@ func (n *raft) setWriteErrLocked(err error) {
n.error("Critical write error: %v", err)
n.werr = err

if isPermissionError(err) {
go n.s.handleWritePermissionError()
}

if isOutOfSpaceErr(err) {
// For now since this can be happening all under the covers, we will call up and disable JetStream.
go n.s.handleOutOfSpace(nil)
Expand Down
5 changes: 5 additions & 0 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"io"
"os"
"strings"
"time"
"unsafe"
Expand Down Expand Up @@ -784,3 +785,7 @@ func copyString(s string) string {
copy(b, s)
return bytesToString(b)
}

func isPermissionError(err error) bool {
return err != nil && os.IsPermission(err)
}
4 changes: 2 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5098,12 +5098,12 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}

if err != nil {
if os.IsPermission(err){
if isPermissionError(err) {
mset.mu.Unlock()
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
mset.srv.DisableJetStream()
mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err)
mset.srv.Warnf("File system permission denied while writing msg, disabling jetstream: %v", err)
return err
}
// If we did not succeed put those values back and increment clfs in case we are clustered.
Expand Down

0 comments on commit 9b4d96d

Please sign in to comment.