From ef42fba25a5a52bce38e5d9bbea39ce957992ebc Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 3 Nov 2022 15:21:27 +0200 Subject: [PATCH] Support file operations for FUSE --- pkg/controlplane/http/session.go | 7 +- pkg/dataplane/container.go | 7 + pkg/dataplane/http/container.go | 31 +++- pkg/dataplane/http/context.go | 249 +++++++++++++++++++++++++++++-- pkg/dataplane/http/headers.go | 29 ++++ pkg/dataplane/types.go | 67 ++++++++- pkg/errors/errors.go | 13 +- 7 files changed, 380 insertions(+), 23 deletions(-) diff --git a/pkg/controlplane/http/session.go b/pkg/controlplane/http/session.go index 7aa39ce..6de22b9 100644 --- a/pkg/controlplane/http/session.go +++ b/pkg/controlplane/http/session.go @@ -25,10 +25,11 @@ import ( "net/http" "net/url" "strconv" + "syscall" "time" - "github.com/v3io/v3io-go/pkg/controlplane" - "github.com/v3io/v3io-go/pkg/errors" + v3ioc "github.com/v3io/v3io-go/pkg/controlplane" + v3ioerrors "github.com/v3io/v3io-go/pkg/errors" "github.com/nuclio/errors" "github.com/nuclio/logger" @@ -839,7 +840,7 @@ func (s *session) sendRequest(ctx context.Context, request *request, timeout tim return nil, v3ioerrors.NewErrorWithStatusCode( fmt.Errorf("Failed to execute HTTP request %s/%s.\nResponse code: %d", s.endpoints[0], request.path, responseInstance.statusCode), - responseInstance.statusCode) + responseInstance.statusCode, int(syscall.EINVAL)) } } diff --git a/pkg/dataplane/container.go b/pkg/dataplane/container.go index 214ca02..3dc8160 100644 --- a/pkg/dataplane/container.go +++ b/pkg/dataplane/container.go @@ -159,4 +159,11 @@ type Container interface { // PutOOSObjectSync PutOOSObjectSync(*PutOOSObjectInput) error + + GetFileAttributesSync(*GetFileAttributesInput, *GetFileAttributesOutput) error + OpenFileSync(*OpenFileInput, *OpenFileOutput) error + CloseFileSync(*CloseFileInput) error + TruncateFileSync(*TruncateFileInput) error + SymlinkSync(*SymlinkInput) error + GetWorkerDedicatedPortsSync(*DataPlaneInput) ([]string, error) } diff --git a/pkg/dataplane/http/container.go b/pkg/dataplane/http/container.go index 364f721..0674da3 100644 --- a/pkg/dataplane/http/container.go +++ b/pkg/dataplane/http/container.go @@ -20,7 +20,7 @@ such restriction. package v3iohttp import ( - "github.com/v3io/v3io-go/pkg/dataplane" + v3io "github.com/v3io/v3io-go/pkg/dataplane" "github.com/nuclio/logger" ) @@ -314,3 +314,32 @@ func (c *container) PutOOSObjectSync(putOOSObjectInput *v3io.PutOOSObjectInput) c.populateInputFields(&putOOSObjectInput.DataPlaneInput) return c.session.context.PutOOSObjectSync(putOOSObjectInput) } + +func (c *container) GetFileAttributesSync(input *v3io.GetFileAttributesInput, out *v3io.GetFileAttributesOutput) error { + c.populateInputFields(&input.DataPlaneInput) + return c.session.context.GetFileAttributesSync(input, out) +} + +func (c *container) OpenFileSync(input *v3io.OpenFileInput, out *v3io.OpenFileOutput) error { + c.populateInputFields(&input.DataPlaneInput) + return c.session.context.OpenFileSync(input, out) +} + +func (c *container) CloseFileSync(input *v3io.CloseFileInput) error { + c.populateInputFields(&input.DataPlaneInput) + return c.session.context.CloseFileSync(input) +} + +func (c *container) TruncateFileSync(input *v3io.TruncateFileInput) error { + c.populateInputFields(&input.DataPlaneInput) + return c.session.context.TruncateFileSync(input) +} + +func (c *container) SymlinkSync(input *v3io.SymlinkInput) error { + c.populateInputFields(&input.DataPlaneInput) + return c.session.context.SymlinkSync(input) +} + +func (c *container) GetWorkerDedicatedPortsSync(in *v3io.DataPlaneInput) ([]string, error) { + return c.session.context.GetWorkerDedicatedPortsSync(in) +} diff --git a/pkg/dataplane/http/context.go b/pkg/dataplane/http/context.go index 2eabf03..c2efe4b 100755 --- a/pkg/dataplane/http/context.go +++ b/pkg/dataplane/http/context.go @@ -28,15 +28,18 @@ import ( "encoding/xml" "fmt" "io" + "math/rand" "net" "net/http" "net/url" + "os" "path" "reflect" "regexp" "strconv" "strings" "sync/atomic" + "syscall" "time" v3io "github.com/v3io/v3io-go/pkg/dataplane" @@ -541,7 +544,10 @@ func (c *context) GetObjectSync(getObjectInput *v3io.GetObjectInput) (*v3io.Resp // Range header is inclusive in both 'start' and 'end', thus reducing 1 headers["Range"] = fmt.Sprintf("bytes=%v-%v", getObjectInput.Offset, getObjectInput.Offset+getObjectInput.NumBytes-1) } - + if getObjectInput.Handle.Fh != "" { + headers["x-v3io-handle"] = getObjectInput.Handle.Fh + getObjectInput.DataPlaneInput.URLAlternativePorts = []string{getObjectInput.Handle.URLPort} + } if getObjectInput.CtimeSec > 0 { if headers == nil { headers = make(map[string]string) @@ -569,12 +575,22 @@ func (c *context) PutObject(putObjectInput *v3io.PutObjectInput, // PutObjectSync func (c *context) PutObjectSync(putObjectInput *v3io.PutObjectInput) error { - var headers map[string]string + headers := make(map[string]string) + if putObjectInput.Offset != 0 { + headers["Range"] = strconv.Itoa(putObjectInput.Offset) + } if putObjectInput.Append { - headers = make(map[string]string) headers["Range"] = "-1" } + if putObjectInput.Mode != 0 { + headers["fs-open-mode"] = fmt.Sprintf("%d", putObjectInput.Mode) + } + if putObjectInput.Handle.Fh != "" { + headers["x-v3io-handle"] = putObjectInput.Handle.Fh + putObjectInput.DataPlaneInput.URLAlternativePorts = []string{putObjectInput.Handle.URLPort} + } + _, err := c.sendRequest(&putObjectInput.DataPlaneInput, http.MethodPut, putObjectInput.Path, @@ -589,7 +605,12 @@ func (c *context) PutObjectSync(putObjectInput *v3io.PutObjectInput) error { // UpdateObjectSync func (c *context) UpdateObjectSync(updateObjectInput *v3io.UpdateObjectInput) error { headers := map[string]string{ - "X-v3io-function": "DirSetAttr", + "X-v3io-function": "DirSetAttrs", + } + + if updateObjectInput.Handle.Fh != "" { + headers["x-v3io-handle"] = updateObjectInput.Handle.Fh + updateObjectInput.DataPlaneInput.URLAlternativePorts = []string{updateObjectInput.Handle.URLPort} } marshaledDirAttributes, err := json.Marshal(updateObjectInput.DirAttributes) @@ -1064,7 +1085,7 @@ func (c *context) sendRequest(dataPlaneInput *v3io.DataPlaneInput, request := fasthttp.AcquireRequest() response := c.allocateResponse() - uri, err := c.buildRequestURI(dataPlaneInput.URL, dataPlaneInput.ContainerName, query, path) + uri, err := c.buildRequestURI(dataPlaneInput.URL, dataPlaneInput.URLAlternativePorts, dataPlaneInput.ContainerName, query, path) if err != nil { return nil, err } @@ -1079,9 +1100,12 @@ func (c *context) sendRequest(dataPlaneInput *v3io.DataPlaneInput, if len(dataPlaneInput.AuthenticationToken) > 0 { request.Header.Set("Authorization", dataPlaneInput.AuthenticationToken) } - - if len(dataPlaneInput.AccessKey) > 0 { - request.Header.Set("X-v3io-session-key", dataPlaneInput.AccessKey) + if strings.Contains(dataPlaneInput.AccessKey, "Basic") { + request.Header.Set("Authorization", dataPlaneInput.AccessKey) + } else { + if len(dataPlaneInput.AccessKey) > 0 { + request.Header.Set("X-v3io-session-key", dataPlaneInput.AccessKey) + } } for headerName, headerValue := range headers { @@ -1144,13 +1168,25 @@ func (c *context) sendRequest(dataPlaneInput *v3io.DataPlaneInput, sanitizedRequest := re.ReplaceAllString(request.String(), "X-V3io-Session-Key: SANITIZED") _err := fmt.Errorf("Expected a 2xx response status code: %s\nRequest details:\n%s", response.HTTPResponse.String(), sanitizedRequest) + responseBodyParsed := struct { + ErrorCode int + }{} + var errnoCode int + if json.Unmarshal(response.Body(), &responseBodyParsed) == nil { + errnoCode = -responseBodyParsed.ErrorCode + if errnoCode == 0 || errnoCode > 300 { + errnoCode = int(syscall.EINVAL) + } + } else { + errnoCode = int(syscall.EINVAL) + } // Include response in error only if caller has requested it // Otherwise it will be released automatically if dataPlaneInput.IncludeResponseInError { - err = v3ioerrors.NewErrorWithStatusCodeAndResponse(_err, statusCode, response) + err = v3ioerrors.NewErrorWithStatusCodeAndResponse(_err, statusCode, errnoCode, response) } else { - err = v3ioerrors.NewErrorWithStatusCode(_err, statusCode) + err = v3ioerrors.NewErrorWithStatusCode(_err, statusCode, errnoCode) } goto cleanup @@ -1178,7 +1214,7 @@ cleanup: return response, nil } -func (c *context) buildRequestURI(urlString string, containerName string, query string, pathStr string) (*url.URL, error) { +func (c *context) buildRequestURI(urlString string, alternativePorts []string, containerName string, query string, pathStr string) (*url.URL, error) { uri, err := url.Parse(urlString) if err != nil { return nil, errors.Wrapf(err, "Failed to parse cluster endpoint URL %s", urlString) @@ -1188,6 +1224,11 @@ func (c *context) buildRequestURI(urlString string, containerName string, query uri.Path += "/" // retain trailing slash } uri.RawQuery = strings.Replace(query, " ", "%20", -1) + if len(alternativePorts) != 0 { + alternativePort := alternativePorts[rand.Intn(len(alternativePorts))] + uri.Host = uri.Hostname() + ":" + alternativePort + } + return uri, nil } @@ -1706,6 +1747,26 @@ func trimAndParseInt(str string) (int, error) { return strconv.Atoi(trimmed) } +func trimAndParseUInt(str string) (uint64, error) { + trimmed := strings.TrimSpace(str) + return strconv.ParseUint(trimmed, 10, 64) +} + +func trimAndParsePairUInt(str string) (uint64, uint64, error) { + var err error + var firstInt, secondInt uint64 + parts := strings.Split(str, ",") + firstInt, err = trimAndParseUInt(parts[0]) + if err != nil { + return 0, 0, err + } + secondInt, err = trimAndParseUInt(parts[1]) + if err != nil { + return 0, 0, err + } + return firstInt, secondInt, nil +} + // PutOOSObject func (c *context) PutOOSObject(putOOSObjectInput *v3io.PutOOSObjectInput, context interface{}, @@ -1755,3 +1816,169 @@ func (c *context) PutOOSObjectSync(putOOSObjectInput *v3io.PutOOSObjectInput) er return err } + +// checkPathExistsSync +func (c *context) GetFileAttributesSync(getAttrInput *v3io.GetFileAttributesInput, out *v3io.GetFileAttributesOutput) error { + var headers map[string]string + if getAttrInput.Handle.Fh != "" { + headers = map[string]string{ + "x-v3io-handle": getAttrInput.Handle.Fh, + } + getAttrInput.DataPlaneInput.URLAlternativePorts = []string{getAttrInput.Handle.URLPort} + + } + response, err := c.sendRequest(&getAttrInput.DataPlaneInput, + http.MethodHead, + getAttrInput.Path, + "", + headers, + nil, + false) + + if err == nil { + out.Ino, _ = trimAndParseUInt(string(response.HeaderPeek("x-v3io-inode"))) + out.Size, _ = trimAndParseUInt(string(response.HeaderPeek("Content-Length"))) + out.Atime, out.Atimensec, _ = trimAndParsePairUInt(string(response.HeaderPeek("x-v3io-atime"))) + out.Ctime, out.Ctimensec, _ = trimAndParsePairUInt(string(response.HeaderPeek("x-v3io-ctime"))) + out.Mtime, out.Mtimensec, _ = trimAndParsePairUInt(string(response.HeaderPeek("x-v3io-mtime"))) + out.Mode, _ = trimAndParseUInt(string(response.HeaderPeek("x-v3io-mode"))) + out.OwnerUid, _ = trimAndParseUInt(string(response.HeaderPeek("x-v3io-uid"))) + out.OwnerGid, _ = trimAndParseUInt(string(response.HeaderPeek("x-v3io-gid"))) + response.Release() + } + return err +} + +func (c *context) OpenFileSync(input *v3io.OpenFileInput, out *v3io.OpenFileOutput) error { + openFlags := 0 + if (int(input.Flags) & 0xF) == os.O_RDONLY { + openFlags = 1 + } + if (int(input.Flags) & 0xF) == os.O_WRONLY { + openFlags = 2 + } + if (int(input.Flags) & 0xF) == os.O_RDWR { + openFlags = 3 + } + if input.Flags&uint32(os.O_TRUNC) != 0 { + openFlags |= 4 // Truncate + } + + createFlags := 0 // No creation + if (input.Flags & uint32(os.O_CREATE)) != 0 { + if (input.Flags & uint32(os.O_EXCL)) != 0 { + createFlags = 1 + } else { + createFlags = 2 + } + if openFlags < 2 { + openFlags = 2 + } + } + + headers := map[string]string{ + "Content-Type": OpenFileHeaders["Content-Type"], + "X-v3io-function": OpenFileHeaders["X-v3io-function"], + "fs-open-mode": fmt.Sprintf("%d", input.Mode), + "fs-open-flags": fmt.Sprintf("%d", openFlags), + "fs-creation": fmt.Sprintf("%d", createFlags), + } + var alternativePort string + if len(input.DataPlaneInput.URLAlternativePorts) != 0 { + alternativePort = input.DataPlaneInput.URLAlternativePorts[rand.Intn(len(input.DataPlaneInput.URLAlternativePorts))] + input.DataPlaneInput.URLAlternativePorts = []string{alternativePort} + } + + response, err := c.sendRequest(&input.DataPlaneInput, + http.MethodPut, + input.Path, + "", + headers, + nil, + false) + if err == nil { + out.Handle = v3io.FileHandle{ + Fh: string(response.HeaderPeek("x-v3io-handle")), + URLPort: alternativePort, + } + response.Release() + } + return err +} + +func (c *context) CloseFileSync(input *v3io.CloseFileInput) error { + headers := map[string]string{ + "x-v3io-handle": input.Handle.Fh, + "Content-Type": CloseFileHeaders["Content-Type"], + "X-v3io-function": CloseFileHeaders["X-v3io-function"], + } + input.DataPlaneInput.URLAlternativePorts = []string{input.Handle.URLPort} + + _, err := c.sendRequest(&input.DataPlaneInput, + http.MethodPut, + input.Handle.Fh, + "", + headers, + nil, + true) + + return err +} + +func (c *context) TruncateFileSync(input *v3io.TruncateFileInput) error { + headers := map[string]string{ + "x-v3io-handle": input.Handle.Fh, + "Content-Type": TruncateFileHeaders["Content-Type"], + "X-v3io-function": TruncateFileHeaders["X-v3io-function"], + "range": fmt.Sprintf("%v", input.Size), + } + input.DataPlaneInput.URLAlternativePorts = []string{input.Handle.URLPort} + + _, err := c.sendRequest(&input.DataPlaneInput, + http.MethodPut, + input.Handle.Fh, + "", + headers, + nil, + true) + return err +} + +func (c *context) SymlinkSync(input *v3io.SymlinkInput) error { + headers := map[string]string{ + "Content-Type": SymlinkHeaders["Content-Type"], + "X-v3io-function": SymlinkHeaders["X-v3io-function"], + } + _, err := c.sendRequest(&input.DataPlaneInput, + http.MethodPut, + input.Path, + "", + headers, + []byte(input.TargetPath), + true) + return err +} + +func (c *context) GetWorkerDedicatedPortsSync(in *v3io.DataPlaneInput) ([]string, error) { + response, err := c.sendRequest(in, + http.MethodPut, + "", + "", + nil, + nil, + false) + if err == nil { + responseBodyParsed := struct { + DedicatedWorkerPorts []string + }{} + err = json.Unmarshal(response.Body(), &responseBodyParsed) + response.Release() + if err == nil { + return responseBodyParsed.DedicatedWorkerPorts, nil + } else { + return nil, err + } + } + return nil, err + +} diff --git a/pkg/dataplane/http/headers.go b/pkg/dataplane/http/headers.go index 2da1e49..18f21bc 100755 --- a/pkg/dataplane/http/headers.go +++ b/pkg/dataplane/http/headers.go @@ -33,6 +33,11 @@ const ( getClusterMDFunctionName = "GetClusterMD" putOOSObjectFunctionName = "OosRun" PutChunkFunctionName = "PutChunk" + OpenFileFunctionName = "OpenFile" + CloseFileFunctionName = "CloseFile" + TruncateFileFunctionName = "Truncate" + AllocateFileFunctionName = "Allocate" + SymlinkFunctionName = "CreateSymlink" ) // headers for put item @@ -121,3 +126,27 @@ var seekShardsInputTypeToString = [...]string{ "LATEST", "EARLIEST", } + +// headers for OOS put object +var OpenFileHeaders = map[string]string{ + "Content-Type": "application/json", + "X-v3io-function": OpenFileFunctionName, + "fs-open-mode": "", // File open mode in decimal. 511 is octal 0777 + "fs-open-flags": "", // Bitmask of Read=1, write=2, truncate=4 + "fs-creation": "", // Validate the object on which the request has been received was already created=0, Validate the object on which the request has been received was not already created, Eqv to O_CREAT and O_EXCL=1, No need to validate the exsistance of the object, Eq to O_CREAT =2 +} + +var CloseFileHeaders = map[string]string{ + "Content-Type": "application/json", + "X-v3io-function": CloseFileFunctionName, +} + +var TruncateFileHeaders = map[string]string{ + "Content-Type": "application/json", + "X-v3io-function": TruncateFileFunctionName, + "range": "", // range=size , for example range=100 means truncate to 100 bytes +} +var SymlinkHeaders = map[string]string{ + "Content-Type": "application/json", + "X-v3io-function": SymlinkFunctionName, +} diff --git a/pkg/dataplane/types.go b/pkg/dataplane/types.go index e850d92..ff8c15c 100755 --- a/pkg/dataplane/types.go +++ b/pkg/dataplane/types.go @@ -52,6 +52,7 @@ type DataPlaneInput struct { AccessKey string Timeout time.Duration IncludeResponseInError bool + URLAlternativePorts []string } type DataPlaneOutput struct { @@ -183,6 +184,10 @@ type ContainerInfo struct { // Object // +type FileHandle struct { + Fh string + URLPort string +} type GetObjectInput struct { DataPlaneInput Path string @@ -190,14 +195,17 @@ type GetObjectInput struct { NumBytes int CtimeSec int CtimeNsec int + Handle FileHandle } type PutObjectInput struct { DataPlaneInput Path string + Handle FileHandle Offset int Body []byte Append bool + Mode int } type DeleteObjectInput struct { @@ -208,19 +216,20 @@ type DeleteObjectInput struct { type UpdateObjectInput struct { DataPlaneInput Path string + Handle FileHandle DirAttributes *DirAttributes } type DirAttributes struct { Mode int `json:"mode,omitempty"` - UID int `json:"uid"` - GID int `json:"gid"` + UID int `json:"uid,omitempty"` + GID int `json:"gid,omitempty"` AtimeSec int `json:"atime.sec,omitempty"` - AtimeNSec int `json:"atime.nsec"` + AtimeNSec int `json:"atime.nsec,omitempty"` CtimeSec int `json:"ctime.sec,omitempty"` - CtimeNSec int `json:"ctime.nsec"` + CtimeNSec int `json:"ctime.nsec,omitempty"` MtimeSec int `json:"mtime.sec,omitempty"` - MtimeNSec int `json:"mtime.nsec"` + MtimeNSec int `json:"mtime.nsec,omitempty"` } // @@ -341,6 +350,54 @@ type CheckPathExistsInput struct { Path string } +type GetFileAttributesInput struct { + DataPlaneInput + Path string + Handle FileHandle +} + +type GetFileAttributesOutput struct { + Ino uint64 + Size uint64 + Atime uint64 + Mtime uint64 + Ctime uint64 + Atimensec uint64 + Mtimensec uint64 + Ctimensec uint64 + Mode uint64 + OwnerUid uint64 + OwnerGid uint64 +} + +type OpenFileInput struct { + DataPlaneInput + Path string + Mode uint32 + Flags uint32 +} + +type CloseFileInput struct { + DataPlaneInput + Handle FileHandle +} + +type TruncateFileInput struct { + DataPlaneInput + Handle FileHandle + Size uint64 +} + +type SymlinkInput struct { + DataPlaneInput + Path string + TargetPath string +} + +type OpenFileOutput struct { + Handle FileHandle +} + type DescribeStreamInput struct { DataPlaneInput Path string diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 30aac5e..03ceb6d 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -21,6 +21,7 @@ package v3ioerrors import ( "errors" + "syscall" ) var ErrInvalidTypeConversion = errors.New("Invalid type conversion") @@ -31,6 +32,7 @@ var ErrTimeout = errors.New("Timed out") type ErrorWithStatusCode struct { error statusCode int + errnoCode int } type ErrorWithStatusCodeAndResponse struct { @@ -38,10 +40,11 @@ type ErrorWithStatusCodeAndResponse struct { response interface{} } -func NewErrorWithStatusCode(err error, statusCode int) ErrorWithStatusCode { +func NewErrorWithStatusCode(err error, statusCode int, errnoCode int) ErrorWithStatusCode { return ErrorWithStatusCode{ error: err, statusCode: statusCode, + errnoCode: errnoCode, } } @@ -49,16 +52,20 @@ func (e ErrorWithStatusCode) StatusCode() int { return e.statusCode } +func (e ErrorWithStatusCode) Errno() syscall.Errno { + return syscall.Errno(e.errnoCode) +} + func (e ErrorWithStatusCode) Error() string { return e.error.Error() } func NewErrorWithStatusCodeAndResponse(err error, - statusCode int, + statusCode int, errnoCode int, response interface{}) ErrorWithStatusCodeAndResponse { return ErrorWithStatusCodeAndResponse{ - ErrorWithStatusCode: NewErrorWithStatusCode(err, statusCode), + ErrorWithStatusCode: NewErrorWithStatusCode(err, statusCode, errnoCode), response: response, } }