Skip to content

Commit

Permalink
xrootd: improve read performances 10-fold
Browse files Browse the repository at this point in the history
This CL reduces the lock contention on xrootd.File operations.

```
  $> benchstat ./ref.txt ./new.txt
  name    old time/op    new time/op    delta
  Read-8     67.2s ± 1%      5.4s ± 7%  -91.93%  (p=0.000 n=9+28)

  name    old alloc/op   new alloc/op   delta
  Read-8     343MB ± 0%     341MB ± 0%   -0.78%  (p=0.000 n=8+30)

  name    old allocs/op  new allocs/op  delta
  Read-8      277k ± 0%      288k ± 0%   +3.84%  (p=0.000 n=7+29)
```

and now:
```
  $> time root-dump root://ccxrootdgotest.in2p3.fr:9001/tmp/rootio/testdata/SMHiggsToZZTo4L.root > /dev/null

  real	0m7.279s
  user	0m8.221s
  sys	0m1.256s
```

compared to:

```
  $> time root-dump https://cern.ch/binet/big-file.root > /dev/null

  real	0m5.454s
  user	0m6.156s
  sys	0m0.228s
```

Updates #920.
  • Loading branch information
sbinet committed Mar 8, 2022
1 parent c38575f commit 6f88941
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 64 deletions.
53 changes: 53 additions & 0 deletions xrootd/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright ©2022 The go-hep Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package xrootd_test

import (
"testing"

"go-hep.org/x/hep/groot"
_ "go-hep.org/x/hep/groot/riofs/plugin/xrootd"
"go-hep.org/x/hep/groot/rtree"
)

func BenchmarkRead(b *testing.B) {
const (
fname = "root://ccxrootdgotest.in2p3.fr:9001/tmp/rootio/testdata/SMHiggsToZZTo4L.root"
tname = "Events"
)

f, err := groot.Open(fname)
if err != nil {
b.Fatal(err)
}
defer f.Close()

o, err := f.Get(tname)
if err != nil {
b.Fatal(err)
}

tree := o.(rtree.Tree)
read := func(tree rtree.Tree) error {
r, err := rtree.NewReader(tree, rtree.NewReadVars(tree))
if err != nil {
b.Fatal(err)
}
defer r.Close()

return r.Read(func(rctx rtree.RCtx) error {
_ = rctx.Entry
return nil
})
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
err := read(tree)
if err != nil {
b.Fatal(err)
}
}
}
110 changes: 46 additions & 64 deletions xrootd/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type file struct {
handle xrdfs.FileHandle
compression *xrdfs.FileCompression

mu rsync.Mutex
mu rsync.RWMutex
info *xrdfs.EntryStat
sessionID string
}
Expand All @@ -47,56 +47,36 @@ func (f *file) Handle() xrdfs.FileHandle {

// Close closes the file.
func (f *file) Close(ctx context.Context) error {
f.mu.Lock()
defer f.mu.Unlock()

newSessionID, err := f.fs.c.sendSession(ctx, f.sessionID, nil, &xrdclose.Request{Handle: f.handle})
if err != nil {
return err
}
f.sessionID = newSessionID
return nil
return f.do(ctx, func(ctx context.Context, sid string) (string, error) {
return f.fs.c.sendSession(ctx, sid, nil, &xrdclose.Request{Handle: f.handle})
})
}

// CloseVerify closes the file and checks whether the file has the provided size.
// A zero size suppresses the verification.
func (f *file) CloseVerify(ctx context.Context, size int64) error {
f.mu.Lock()
defer f.mu.Unlock()

newSessionID, err := f.fs.c.sendSession(ctx, f.sessionID, nil, &xrdclose.Request{Handle: f.handle, Size: size})
if err != nil {
return err
}
f.sessionID = newSessionID
return nil
return f.do(ctx, func(ctx context.Context, sid string) (string, error) {
return f.fs.c.sendSession(ctx, sid, nil, &xrdclose.Request{Handle: f.handle, Size: size})
})
}

// Sync commits all pending writes to an open file.
func (f *file) Sync(ctx context.Context) error {
f.mu.Lock()
defer f.mu.Unlock()

newSessionID, err := f.fs.c.sendSession(ctx, f.sessionID, nil, &sync.Request{Handle: f.handle})
if err != nil {
return err
}
f.sessionID = newSessionID
return nil
return f.do(ctx, func(ctx context.Context, sid string) (string, error) {
return f.fs.c.sendSession(ctx, sid, nil, &sync.Request{Handle: f.handle})
})
}

// ReadAtContext reads len(p) bytes into p starting at offset off.
func (f *file) ReadAtContext(ctx context.Context, p []byte, off int64) (n int, err error) {
f.mu.Lock()
defer f.mu.Unlock()

resp := read.Response{Data: p}
req := &read.Request{Handle: f.handle, Offset: off, Length: int32(len(p))}
newSessionID, err := f.fs.c.sendSession(ctx, f.sessionID, &resp, req)
err = f.do(ctx, func(ctx context.Context, sid string) (string, error) {
return f.fs.c.sendSession(ctx, sid, &resp, req)
})
if err != nil {
return 0, err
}
f.sessionID = newSessionID
return len(resp.Data), nil
}

Expand All @@ -107,15 +87,9 @@ func (f *file) ReadAt(p []byte, off int64) (n int, err error) {

// WriteAtContext writes len(p) bytes from p to the file at offset off.
func (f *file) WriteAtContext(ctx context.Context, p []byte, off int64) error {
f.mu.Lock()
defer f.mu.Unlock()

newSessionID, err := f.fs.c.sendSession(ctx, f.sessionID, nil, &write.Request{Handle: f.handle, Offset: off, Data: p})
if err != nil {
return err
}
f.sessionID = newSessionID
return nil
return f.do(ctx, func(ctx context.Context, sid string) (string, error) {
return f.fs.c.sendSession(ctx, sid, nil, &write.Request{Handle: f.handle, Offset: off, Data: p})
})
}

// WriteAt writes len(p) bytes from p to the file at offset off.
Expand All @@ -129,47 +103,44 @@ func (f *file) WriteAt(p []byte, off int64) (n int, err error) {

// Truncate changes the size of the named file.
func (f *file) Truncate(ctx context.Context, size int64) error {
f.mu.Lock()
defer f.mu.Unlock()

newSessionID, err := f.fs.c.sendSession(ctx, f.sessionID, nil, &truncate.Request{Handle: f.handle, Size: size})
if err != nil {
return err
}
f.sessionID = newSessionID
return nil
return f.do(ctx, func(ctx context.Context, sid string) (string, error) {
return f.fs.c.sendSession(ctx, sid, nil, &truncate.Request{Handle: f.handle, Size: size})
})
}

// StatVirtualFS fetches the virtual fs stat info from the XRootD server.
// TODO: note that calling stat with vfs and handle may be invalid.
// See https://github.com/xrootd/xrootd/issues/728 for the details.
func (f *file) StatVirtualFS(ctx context.Context) (xrdfs.VirtualFSStat, error) {
f.mu.Lock()
defer f.mu.Unlock()

var resp stat.VirtualFSResponse
newSessionID, err := f.fs.c.sendSession(ctx, f.sessionID, &resp, &stat.Request{FileHandle: f.handle, Options: stat.OptionsVFS})
err := f.do(ctx, func(ctx context.Context, sid string) (string, error) {
return f.fs.c.sendSession(ctx, sid, &resp, &stat.Request{FileHandle: f.handle, Options: stat.OptionsVFS})
})
if err != nil {
return xrdfs.VirtualFSStat{}, err
}
f.sessionID = newSessionID
return resp.VirtualFSStat, nil
}

// Stat fetches the stat info of this file from the XRootD server.
// Note that Stat re-fetches value returned by the Info, so after the call to Stat
// calls to Info may return different value than before.
func (f *file) Stat(ctx context.Context) (xrdfs.EntryStat, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.RLock()
sid := f.sessionID
f.mu.RUnlock()

var resp stat.DefaultResponse
newSessionID, err := f.fs.c.sendSession(ctx, f.sessionID, &resp, &stat.Request{FileHandle: f.handle})
sid, err := f.fs.c.sendSession(ctx, sid, &resp, &stat.Request{FileHandle: f.handle})
if err != nil {
return xrdfs.EntryStat{}, err
}

f.mu.Lock()
f.sessionID = sid
f.info = &resp.EntryStat
f.sessionID = newSessionID
f.mu.Unlock()

return resp.EntryStat, nil
}

Expand All @@ -178,14 +149,25 @@ func (f *file) Stat(ctx context.Context) (xrdfs.EntryStat, error) {
// TODO: note that verifyw is not supported by the XRootD server.
// See https://github.com/xrootd/xrootd/issues/738 for the details.
func (f *file) VerifyWriteAt(ctx context.Context, p []byte, off int64) error {
f.mu.Lock()
defer f.mu.Unlock()
return f.do(ctx, func(ctx context.Context, sid string) (string, error) {
return f.fs.c.sendSession(ctx, sid, nil, verifyw.NewRequestCRC32(f.handle, off, p))
})
}

func (f *file) do(ctx context.Context, fct func(ctx context.Context, sid string) (string, error)) error {
f.mu.RLock()
sid := f.sessionID
f.mu.RUnlock()

newSessionID, err := f.fs.c.sendSession(ctx, f.sessionID, nil, verifyw.NewRequestCRC32(f.handle, off, p))
id, err := fct(ctx, sid)
if err != nil {
return err
}
f.sessionID = newSessionID

f.mu.Lock()
f.sessionID = id
f.mu.Unlock()

return nil
}

Expand Down

0 comments on commit 6f88941

Please sign in to comment.