From ef5e192c3bb1376a839bebd5eb60b44fc4e4a7e4 Mon Sep 17 00:00:00 2001 From: YangXu <47767754+Three-taile-dragon@users.noreply.github.com> Date: Thu, 22 Aug 2024 00:35:52 +0800 Subject: [PATCH] fix(pikpak): webdav upload issue (#7050) --- drivers/pikpak/driver.go | 52 ++++---- drivers/pikpak/types.go | 24 ++-- drivers/pikpak/util.go | 249 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 282 insertions(+), 43 deletions(-) diff --git a/drivers/pikpak/driver.go b/drivers/pikpak/driver.go index 13b9843015d..e2a2b82e7a7 100644 --- a/drivers/pikpak/driver.go +++ b/drivers/pikpak/driver.go @@ -4,24 +4,18 @@ import ( "context" "encoding/json" "fmt" - "github.com/alist-org/alist/v3/internal/op" - "golang.org/x/oauth2" - "io" - "net/http" - "strconv" - "strings" - "github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/pkg/utils" hash_extend "github.com/alist-org/alist/v3/pkg/utils/hash" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/go-resty/resty/v2" log "github.com/sirupsen/logrus" + "golang.org/x/oauth2" + "net/http" + "strconv" + "strings" ) type PikPak struct { @@ -123,10 +117,16 @@ func (d *PikPak) Init(ctx context.Context) (err error) { d.AccessToken = token.AccessToken // 获取CaptchaToken - err = d.RefreshCaptchaTokenAtLogin(GetAction(http.MethodGet, "https://api-drive.mypikpak.com/drive/v1/files"), d.Common.UserID) + err = d.RefreshCaptchaTokenAtLogin(GetAction(http.MethodGet, "https://api-drive.mypikpak.com/drive/v1/files"), d.Username) if err != nil { return err } + + // 获取用户ID + userID := token.Extra("sub").(string) + if userID != "" { + d.Common.SetUserID(userID) + } // 更新UserAgent if d.Platform == "android" { d.Common.UserAgent = BuildCustomUserAgent(utils.GetMD5EncodeStr(d.Username+d.Password), AndroidClientID, AndroidPackageName, AndroidSdkVersion, AndroidClientVersion, AndroidPackageName, d.Common.UserID) @@ -271,27 +271,17 @@ func (d *PikPak) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr } params := resp.Resumable.Params - endpoint := strings.Join(strings.Split(params.Endpoint, ".")[1:], ".") - cfg := &aws.Config{ - Credentials: credentials.NewStaticCredentials(params.AccessKeyID, params.AccessKeySecret, params.SecurityToken), - Region: aws.String("pikpak"), - Endpoint: &endpoint, + //endpoint := strings.Join(strings.Split(params.Endpoint, ".")[1:], ".") + // web 端上传 返回的endpoint 为 `mypikpak.com` | android 端上传 返回的endpoint 为 `vip-lixian-07.mypikpak.com`· + if d.Addition.Platform == "android" { + params.Endpoint = "mypikpak.com" } - ss, err := session.NewSession(cfg) - if err != nil { - return err - } - uploader := s3manager.NewUploader(ss) - if stream.GetSize() > s3manager.MaxUploadParts*s3manager.DefaultUploadPartSize { - uploader.PartSize = stream.GetSize() / (s3manager.MaxUploadParts - 1) - } - input := &s3manager.UploadInput{ - Bucket: ¶ms.Bucket, - Key: ¶ms.Key, - Body: io.TeeReader(stream, driver.NewProgress(stream.GetSize(), up)), + + if stream.GetSize() <= 10*utils.MB { // 文件大小 小于10MB,改用普通模式上传 + return d.UploadByOSS(¶ms, stream, up) } - _, err = uploader.UploadWithContext(ctx, input) - return err + // 分片上传 + return d.UploadByMultipart(¶ms, stream.GetSize(), stream, up) } // 离线下载文件 diff --git a/drivers/pikpak/types.go b/drivers/pikpak/types.go index b27b905568a..2a959ebf05d 100644 --- a/drivers/pikpak/types.go +++ b/drivers/pikpak/types.go @@ -80,22 +80,24 @@ type UploadTaskData struct { UploadType string `json:"upload_type"` //UPLOAD_TYPE_RESUMABLE Resumable *struct { - Kind string `json:"kind"` - Params struct { - AccessKeyID string `json:"access_key_id"` - AccessKeySecret string `json:"access_key_secret"` - Bucket string `json:"bucket"` - Endpoint string `json:"endpoint"` - Expiration time.Time `json:"expiration"` - Key string `json:"key"` - SecurityToken string `json:"security_token"` - } `json:"params"` - Provider string `json:"provider"` + Kind string `json:"kind"` + Params S3Params `json:"params"` + Provider string `json:"provider"` } `json:"resumable"` File File `json:"file"` } +type S3Params struct { + AccessKeyID string `json:"access_key_id"` + AccessKeySecret string `json:"access_key_secret"` + Bucket string `json:"bucket"` + Endpoint string `json:"endpoint"` + Expiration time.Time `json:"expiration"` + Key string `json:"key"` + SecurityToken string `json:"security_token"` +} + // 添加离线下载响应 type OfflineDownloadResp struct { File *string `json:"file"` diff --git a/drivers/pikpak/util.go b/drivers/pikpak/util.go index 902a9d32191..d3371a2558a 100644 --- a/drivers/pikpak/util.go +++ b/drivers/pikpak/util.go @@ -1,17 +1,24 @@ package pikpak import ( + "bytes" "crypto/md5" "crypto/sha1" "encoding/hex" - "errors" "fmt" + "github.com/alist-org/alist/v3/internal/driver" + "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/pkg/utils" + "github.com/aliyun/aliyun-oss-go-sdk/oss" jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" + "io" "net/http" + "path/filepath" "regexp" "strings" + "sync" "time" "github.com/alist-org/alist/v3/drivers/base" @@ -56,6 +63,12 @@ var WebAlgorithms = []string{ "NhXXU9rg4XXdzo7u5o", } +const ( + OSSUserAgent = "aliyun-sdk-android/2.9.13(Linux/Android 14/M2004j7ac;UKQ1.231108.001)" + OssSecurityTokenHeaderName = "X-OSS-Security-Token" + ThreadsNum = 10 +) + const ( AndroidClientID = "YNxT9w7GMdWvEOKa" AndroidClientSecret = "dbw2OtmVEeuUvIptb1Coyg" @@ -393,3 +406,237 @@ func (d *PikPak) refreshCaptchaToken(action string, metas map[string]string) err d.Common.SetCaptchaToken(resp.CaptchaToken) return nil } + +func (d *PikPak) UploadByOSS(params *S3Params, stream model.FileStreamer, up driver.UpdateProgress) error { + ossClient, err := oss.New(params.Endpoint, params.AccessKeyID, params.AccessKeySecret) + if err != nil { + return err + } + bucket, err := ossClient.Bucket(params.Bucket) + if err != nil { + return err + } + + err = bucket.PutObject(params.Key, stream, OssOption(params)...) + if err != nil { + return err + } + return nil +} + +func (d *PikPak) UploadByMultipart(params *S3Params, fileSize int64, stream model.FileStreamer, up driver.UpdateProgress) error { + var ( + chunks []oss.FileChunk + parts []oss.UploadPart + imur oss.InitiateMultipartUploadResult + ossClient *oss.Client + bucket *oss.Bucket + err error + ) + + tmpF, err := stream.CacheFullInTempFile() + if err != nil { + return err + } + + if ossClient, err = oss.New(params.Endpoint, params.AccessKeyID, params.AccessKeySecret); err != nil { + return err + } + + if bucket, err = ossClient.Bucket(params.Bucket); err != nil { + return err + } + + ticker := time.NewTicker(time.Hour * 12) + defer ticker.Stop() + // 设置超时 + timeout := time.NewTimer(time.Hour * 24) + + if chunks, err = SplitFile(fileSize); err != nil { + return err + } + + if imur, err = bucket.InitiateMultipartUpload(params.Key, + oss.SetHeader(OssSecurityTokenHeaderName, params.SecurityToken), + oss.UserAgentHeader(OSSUserAgent), + ); err != nil { + return err + } + + wg := sync.WaitGroup{} + wg.Add(len(chunks)) + + chunksCh := make(chan oss.FileChunk) + errCh := make(chan error) + UploadedPartsCh := make(chan oss.UploadPart) + quit := make(chan struct{}) + + // producer + go chunksProducer(chunksCh, chunks) + go func() { + wg.Wait() + quit <- struct{}{} + }() + + // consumers + for i := 0; i < ThreadsNum; i++ { + go func(threadId int) { + defer func() { + if r := recover(); r != nil { + errCh <- fmt.Errorf("recovered in %v", r) + } + }() + for chunk := range chunksCh { + var part oss.UploadPart // 出现错误就继续尝试,共尝试3次 + for retry := 0; retry < 3; retry++ { + select { + case <-ticker.C: + errCh <- errors.Wrap(err, "ossToken 过期") + default: + } + + buf := make([]byte, chunk.Size) + if _, err = tmpF.ReadAt(buf, chunk.Offset); err != nil && !errors.Is(err, io.EOF) { + continue + } + + b := bytes.NewBuffer(buf) + if part, err = bucket.UploadPart(imur, b, chunk.Size, chunk.Number, OssOption(params)...); err == nil { + break + } + } + if err != nil { + errCh <- errors.Wrap(err, fmt.Sprintf("上传 %s 的第%d个分片时出现错误:%v", stream.GetName(), chunk.Number, err)) + } + UploadedPartsCh <- part + } + }(i) + } + + go func() { + for part := range UploadedPartsCh { + parts = append(parts, part) + wg.Done() + } + }() +LOOP: + for { + select { + case <-ticker.C: + // ossToken 过期 + return err + case <-quit: + break LOOP + case <-errCh: + return err + case <-timeout.C: + return fmt.Errorf("time out") + } + } + + // EOF错误是xml的Unmarshal导致的,响应其实是json格式,所以实际上上传是成功的 + if _, err = bucket.CompleteMultipartUpload(imur, parts, OssOption(params)...); err != nil && !errors.Is(err, io.EOF) { + // 当文件名含有 &< 这两个字符之一时响应的xml解析会出现错误,实际上上传是成功的 + if filename := filepath.Base(stream.GetName()); !strings.ContainsAny(filename, "&<") { + return err + } + } + return nil +} + +func chunksProducer(ch chan oss.FileChunk, chunks []oss.FileChunk) { + for _, chunk := range chunks { + ch <- chunk + } +} + +func SplitFile(fileSize int64) (chunks []oss.FileChunk, err error) { + for i := int64(1); i < 10; i++ { + if fileSize < i*utils.GB { // 文件大小小于iGB时分为i*100片 + if chunks, err = SplitFileByPartNum(fileSize, int(i*100)); err != nil { + return + } + break + } + } + if fileSize > 9*utils.GB { // 文件大小大于9GB时分为1000片 + if chunks, err = SplitFileByPartNum(fileSize, 1000); err != nil { + return + } + } + // 单个分片大小不能小于1MB + if chunks[0].Size < 1*utils.MB { + if chunks, err = SplitFileByPartSize(fileSize, 1*utils.MB); err != nil { + return + } + } + return +} + +// SplitFileByPartNum splits big file into parts by the num of parts. +// Split the file with specified parts count, returns the split result when error is nil. +func SplitFileByPartNum(fileSize int64, chunkNum int) ([]oss.FileChunk, error) { + if chunkNum <= 0 || chunkNum > 10000 { + return nil, errors.New("chunkNum invalid") + } + + if int64(chunkNum) > fileSize { + return nil, errors.New("oss: chunkNum invalid") + } + + var chunks []oss.FileChunk + chunk := oss.FileChunk{} + chunkN := (int64)(chunkNum) + for i := int64(0); i < chunkN; i++ { + chunk.Number = int(i + 1) + chunk.Offset = i * (fileSize / chunkN) + if i == chunkN-1 { + chunk.Size = fileSize/chunkN + fileSize%chunkN + } else { + chunk.Size = fileSize / chunkN + } + chunks = append(chunks, chunk) + } + + return chunks, nil +} + +// SplitFileByPartSize splits big file into parts by the size of parts. +// Splits the file by the part size. Returns the FileChunk when error is nil. +func SplitFileByPartSize(fileSize int64, chunkSize int64) ([]oss.FileChunk, error) { + if chunkSize <= 0 { + return nil, errors.New("chunkSize invalid") + } + + chunkN := fileSize / chunkSize + if chunkN >= 10000 { + return nil, errors.New("Too many parts, please increase part size") + } + + var chunks []oss.FileChunk + chunk := oss.FileChunk{} + for i := int64(0); i < chunkN; i++ { + chunk.Number = int(i + 1) + chunk.Offset = i * chunkSize + chunk.Size = chunkSize + chunks = append(chunks, chunk) + } + + if fileSize%chunkSize > 0 { + chunk.Number = len(chunks) + 1 + chunk.Offset = int64(len(chunks)) * chunkSize + chunk.Size = fileSize % chunkSize + chunks = append(chunks, chunk) + } + + return chunks, nil +} + +// OssOption get options +func OssOption(params *S3Params) []oss.Option { + options := []oss.Option{ + oss.SetHeader(OssSecurityTokenHeaderName, params.SecurityToken), + oss.UserAgentHeader(OSSUserAgent), + } + return options +}