Skip to content

Commit 425a25f

Browse files
committed
uplog
1 parent 219b063 commit 425a25f

File tree

97 files changed

+4075
-537
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

97 files changed

+4075
-537
lines changed

client/client.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,12 @@ func SetAppName(userApp string) error {
4949
}
5050

5151
func getUserAgentWithAppName(userApp string) string {
52-
return fmt.Sprintf("QiniuGo/%s (%s; %s; %s) %s",
53-
conf.Version, runtime.GOOS, runtime.GOARCH, userApp, runtime.Version())
52+
userAgentPrefix := "QiniuGo/"
53+
if testRuntime {
54+
userAgentPrefix = "QiniuGo_Debug/"
55+
}
56+
return fmt.Sprintf("%s%s (%s; %s; %s) %s",
57+
userAgentPrefix, conf.Version, runtime.GOOS, runtime.GOARCH, userApp, runtime.Version())
5458
}
5559

5660
// --------------------------------------------------------------------

client/client_prod_env.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
//go:build !unit && !integration
2+
// +build !unit,!integration
3+
4+
package client
5+
6+
const testRuntime = false

client/client_test_env.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
//go:build unit || integration
2+
// +build unit integration
3+
4+
package client
5+
6+
const testRuntime = true

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/gofrs/flock v0.8.1
1313
github.com/iancoleman/strcase v0.3.0
1414
github.com/kr/pretty v0.3.0 // indirect
15+
github.com/matishsiao/goInfo v0.0.0-20210923090445-da2e3fa8d45f
1516
github.com/qiniu/dyn v1.3.0
1617
github.com/rogpeppe/go-internal v1.8.0 // indirect
1718
github.com/stretchr/testify v1.6.1

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
3232
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
3333
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
3434
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
35+
github.com/matishsiao/goInfo v0.0.0-20210923090445-da2e3fa8d45f h1:B0OD7nYl2FPQEVrw8g2uyc1lGEzNbvrKh7fspGZcbvY=
36+
github.com/matishsiao/goInfo v0.0.0-20210923090445-da2e3fa8d45f/go.mod h1:aEt7p9Rvh67BYApmZwNDPpgircTO2kgdmDUoF/1QmwA=
3537
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
3638
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3739
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

internal/clientv2/interceptor.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import (
55
)
66

77
const (
8-
InterceptorPriorityDefault InterceptorPriority = 100
9-
InterceptorPriorityRetryHosts InterceptorPriority = 200
10-
InterceptorPriorityRetrySimple InterceptorPriority = 300
11-
InterceptorPrioritySetHeader InterceptorPriority = 400
12-
InterceptorPriorityNormal InterceptorPriority = 500
13-
InterceptorPriorityAuth InterceptorPriority = 600
14-
InterceptorPriorityDebug InterceptorPriority = 700
8+
InterceptorPriorityDefault InterceptorPriority = 100
9+
InterceptorPriorityRetryHosts InterceptorPriority = 200
10+
InterceptorPriorityRetrySimple InterceptorPriority = 300
11+
InterceptorPriorityUplog InterceptorPriority = 310
12+
InterceptorPriorityBufferResponse InterceptorPriority = 320
13+
InterceptorPrioritySetHeader InterceptorPriority = 400
14+
InterceptorPriorityNormal InterceptorPriority = 500
15+
InterceptorPriorityAuth InterceptorPriority = 600
16+
InterceptorPriorityDebug InterceptorPriority = 700
1517
)
1618

1719
type InterceptorPriority int
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package clientv2
2+
3+
import "net/http"
4+
5+
type bufferResponseInterceptor struct {
6+
}
7+
8+
func NewBufferResponseInterceptor() Interceptor {
9+
return bufferResponseInterceptor{}
10+
}
11+
12+
func (interceptor bufferResponseInterceptor) Priority() InterceptorPriority {
13+
return InterceptorPriorityBufferResponse
14+
}
15+
16+
func (interceptor bufferResponseInterceptor) Intercept(req *http.Request, handler Handler) (resp *http.Response, err error) {
17+
toBufferResponse := req.Context().Value(contextKeyBufferResponse{}) != nil
18+
resp, err = handler(req)
19+
if err == nil && toBufferResponse {
20+
err = bufferResponse(resp)
21+
}
22+
return
23+
}

internal/clientv2/interceptor_retry_simple.go

+5-8
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,12 @@ func (c *SimpleRetryConfig) getRetryDecision(req *http.Request, resp *http.Respo
7070
} else {
7171
return retrier.DontRetry
7272
}
73-
} else if c.Retrier != nil {
74-
return c.Retrier.Retry(req, resp, err, &retrier.RetrierOptions{Attempts: attempts})
7573
} else {
76-
return errorRetrier.Retry(req, resp, err, &retrier.RetrierOptions{Attempts: attempts})
74+
r := errorRetrier
75+
if c.Retrier != nil {
76+
r = c.Retrier
77+
}
78+
return r.Retry(req, resp, err, &retrier.RetrierOptions{Attempts: attempts})
7779
}
7880
}
7981

@@ -91,7 +93,6 @@ func (interceptor *simpleRetryInterceptor) Intercept(req *http.Request, handler
9193
if interceptor == nil || req == nil {
9294
return interceptor.callHandler(req, &retrier.RetrierOptions{Attempts: 0}, handler)
9395
}
94-
toBufferResponse := req.Context().Value(contextKeyBufferResponse{}) != nil
9596

9697
interceptor.config.init()
9798

@@ -105,10 +106,6 @@ func (interceptor *simpleRetryInterceptor) Intercept(req *http.Request, handler
105106
reqBefore := cloneReq(req)
106107
resp, err = interceptor.callHandler(req, &retrier.RetrierOptions{Attempts: i}, handler)
107108

108-
if err == nil && toBufferResponse {
109-
err = bufferResponse(resp)
110-
}
111-
112109
retryDecision := interceptor.config.getRetryDecision(reqBefore, resp, err, i)
113110
if retryDecision == retrier.DontRetry {
114111
interceptor.feedbackGood(req, hostname, chosenIPs)

internal/io/compatible.go

+4
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,7 @@ func (r *sizedReadSeekCloserFromReader) Seek(offset int64, whence int) (int64, e
5656
func (r *sizedReadSeekCloserFromReader) Close() error {
5757
return nil
5858
}
59+
60+
func (r *sizedReadSeekCloserFromReader) DetectLength() (int64, error) {
61+
return r.size, nil
62+
}

internal/io/nopcloser.go

+27
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import (
55
"io"
66
)
77

8+
type KnownLength interface {
9+
DetectLength() (int64, error)
10+
}
11+
812
type ReadSeekableNopCloser struct {
913
r io.ReadSeeker
1014
}
@@ -21,6 +25,25 @@ func (nc ReadSeekableNopCloser) Seek(offset int64, whence int) (int64, error) {
2125
return nc.r.Seek(offset, whence)
2226
}
2327

28+
func (nc ReadSeekableNopCloser) DetectLength() (int64, error) {
29+
if kl, ok := nc.r.(KnownLength); ok {
30+
return kl.DetectLength()
31+
}
32+
cur, err := nc.r.Seek(0, io.SeekCurrent)
33+
if err != nil {
34+
return 0, err
35+
}
36+
length, err := nc.r.Seek(0, io.SeekEnd)
37+
if err != nil {
38+
return 0, err
39+
}
40+
_, err = nc.r.Seek(cur, io.SeekStart)
41+
if err != nil {
42+
return 0, err
43+
}
44+
return length, nil
45+
}
46+
2447
func (nc ReadSeekableNopCloser) Close() error {
2548
return nil
2649
}
@@ -46,6 +69,10 @@ func (nc *BytesNopCloser) Seek(offset int64, whence int) (int64, error) {
4669
return nc.r.Seek(offset, whence)
4770
}
4871

72+
func (nc *BytesNopCloser) DetectLength() (int64, error) {
73+
return nc.Size(), nil
74+
}
75+
4976
func (nc *BytesNopCloser) Size() int64 {
5077
return nc.r.Size()
5178
}

storage/resume_uploader.go

+42-12
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/qiniu/go-sdk/v7/storagev2/apis"
1717
"github.com/qiniu/go-sdk/v7/storagev2/http_client"
1818
"github.com/qiniu/go-sdk/v7/storagev2/region"
19+
"github.com/qiniu/go-sdk/v7/storagev2/uplog"
1920
)
2021

2122
// ResumeUploader 表示一个分片上传的对象
@@ -135,17 +136,22 @@ func (p *ResumeUploader) rput(ctx context.Context, ret interface{}, upToken stri
135136
extra.init()
136137

137138
var (
139+
bucket string
138140
recorderKey string
139141
fileInfo os.FileInfo = nil
140142
)
141143
if fileDetails != nil {
142144
fileInfo = fileDetails.fileInfo
143145
}
144146

147+
if _, bucket, err = getAkBucketFromUploadToken(upToken); err != nil {
148+
return
149+
}
150+
145151
recorderKey = getRecorderKey(extra.Recorder, upToken, key, "v1", blockSize, fileDetails)
146152

147153
return uploadByWorkers(
148-
newResumeUploaderImpl(p, key, hasKey, upToken, makeEndpointsFromUpHost(extra.UpHost), fileInfo, extra, ret, recorderKey),
154+
newResumeUploaderImpl(p, bucket, key, hasKey, upToken, makeEndpointsFromUpHost(extra.UpHost), fileInfo, extra, ret, recorderKey),
149155
ctx, newSizedChunkReader(f, fsize, blockSize))
150156
}
151157

@@ -155,8 +161,14 @@ func (p *ResumeUploader) rputWithoutSize(ctx context.Context, ret interface{}, u
155161
}
156162
extra.init()
157163

164+
var bucket string
165+
166+
if _, bucket, err = getAkBucketFromUploadToken(upToken); err != nil {
167+
return
168+
}
169+
158170
return uploadByWorkers(
159-
newResumeUploaderImpl(p, key, hasKey, upToken, makeEndpointsFromUpHost(extra.UpHost), nil, extra, ret, ""),
171+
newResumeUploaderImpl(p, bucket, key, hasKey, upToken, makeEndpointsFromUpHost(extra.UpHost), nil, extra, ret, ""),
160172
ctx, newUnsizedChunkReader(r, 1<<blockBits))
161173
}
162174

@@ -211,6 +223,7 @@ type (
211223
resumeUploaderImpl struct {
212224
cfg *Config
213225
storage *apis.Storage
226+
bucket string
214227
key string
215228
hasKey bool
216229
upToken string
@@ -241,7 +254,7 @@ type (
241254
}
242255
)
243256

244-
func newResumeUploaderImpl(resumeUploader *ResumeUploader, key string, hasKey bool, upToken string, upEndpoints region.EndpointsProvider, fileInfo os.FileInfo, extra *RputExtra, ret interface{}, recorderKey string) *resumeUploaderImpl {
257+
func newResumeUploaderImpl(resumeUploader *ResumeUploader, bucket, key string, hasKey bool, upToken string, upEndpoints region.EndpointsProvider, fileInfo os.FileInfo, extra *RputExtra, ret interface{}, recorderKey string) *resumeUploaderImpl {
245258
opts := http_client.Options{
246259
BasicHTTPClient: resumeUploader.Client.Client,
247260
UseInsecureProtocol: !resumeUploader.Cfg.UseHTTPS,
@@ -261,6 +274,7 @@ func newResumeUploaderImpl(resumeUploader *ResumeUploader, key string, hasKey bo
261274
}
262275
return &resumeUploaderImpl{
263276
cfg: resumeUploader.Cfg,
277+
bucket: bucket,
264278
key: key,
265279
hasKey: hasKey,
266280
upToken: upToken,
@@ -279,17 +293,16 @@ func newResumeUploaderImpl(resumeUploader *ResumeUploader, key string, hasKey bo
279293
}
280294
}
281295

282-
func (impl *resumeUploaderImpl) initUploader(ctx context.Context) ([]int64, error) {
283-
var recovered []int64
296+
func (impl *resumeUploaderImpl) initUploader(ctx context.Context) (recovered []int64, recoveredSize int64, err error) {
284297
if impl.extra.Recorder != nil && len(impl.recorderKey) > 0 {
285298
if recorderData, err := impl.extra.Recorder.Get(impl.recorderKey); err == nil {
286-
recovered = impl.recover(ctx, recorderData)
299+
recovered, recoveredSize = impl.recover(ctx, recorderData)
287300
if len(recovered) == 0 {
288301
impl.deleteUploadRecordIfNeed(nil, true)
289302
}
290303
}
291304
}
292-
return recovered, nil
305+
return
293306
}
294307

295308
func (impl *resumeUploaderImpl) uploadChunk(ctx context.Context, c chunk) error {
@@ -384,21 +397,37 @@ func (impl *resumeUploaderImpl) final(ctx context.Context) error {
384397
return err
385398
}
386399

400+
func (impl *resumeUploaderImpl) version() uplog.UpApiVersion {
401+
return uplog.UpApiVersionV1
402+
}
403+
404+
func (impl *resumeUploaderImpl) getUpToken() string {
405+
return impl.upToken
406+
}
407+
408+
func (impl *resumeUploaderImpl) getBucket() string {
409+
return impl.bucket
410+
}
411+
412+
func (impl *resumeUploaderImpl) getKey() (string, bool) {
413+
return impl.key, impl.hasKey
414+
}
415+
387416
func (impl *resumeUploaderImpl) deleteUploadRecordIfNeed(err error, force bool) {
388417
// 无效删除之前的记录
389418
if force || (isContextExpiredError(err) && impl.extra.Recorder != nil && len(impl.recorderKey) > 0) {
390419
_ = impl.extra.Recorder.Delete(impl.recorderKey)
391420
}
392421
}
393422

394-
func (impl *resumeUploaderImpl) recover(ctx context.Context, recoverData []byte) (recovered []int64) {
423+
func (impl *resumeUploaderImpl) recover(ctx context.Context, recoverData []byte) (recovered []int64, recoveredSize int64) {
395424
var recoveryInfo resumeUploaderRecoveryInfo
396425
if err := json.Unmarshal(recoverData, &recoveryInfo); err != nil {
397-
return nil
426+
return
398427
}
399428
if impl.fileInfo == nil || recoveryInfo.FileSize != impl.fileInfo.Size() ||
400429
recoveryInfo.ModTimeStamp != impl.fileInfo.ModTime().UnixNano() {
401-
return nil
430+
return
402431
}
403432
if recoveryInfo.RecorderVersion != uploadRecordVersion {
404433
return
@@ -407,17 +436,18 @@ func (impl *resumeUploaderImpl) recover(ctx context.Context, recoverData []byte)
407436
for _, c := range recoveryInfo.Contexts {
408437
if isUploadContextExpired(c.ExpiredAt) {
409438
// 有一个过期,最终其实都会无效,重传最后之前没过期的也可能会过期
410-
return nil
439+
return nil, 0
411440
}
412441

413442
impl.fileSize += int64(c.ChunkSize)
443+
recoveredSize += int64(c.ChunkSize)
414444
impl.extra.Progresses = append(impl.extra.Progresses, BlkputRet{
415445
blkIdx: c.Idx, fileOffset: c.Offset, chunkSize: c.ChunkSize, Ctx: c.Ctx, ExpiredAt: c.ExpiredAt,
416446
})
417447
recovered = append(recovered, c.Offset)
418448
}
419449

420-
return recovered
450+
return recovered, recoveredSize
421451
}
422452

423453
func (impl *resumeUploaderImpl) save(ctx context.Context) {

0 commit comments

Comments
 (0)