Skip to content

Commit b6cd429

Browse files
*: migrate from NeoFS SEARCH V1 to SEARCH V2
Close #3825 Signed-off-by: Ekaterina Pavlova <[email protected]>
1 parent b10c973 commit b6cd429

File tree

5 files changed

+131
-181
lines changed

5 files changed

+131
-181
lines changed

cli/util/upload_bin.go

Lines changed: 43 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func uploadBin(ctx *cli.Context) error {
6767
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1)
6868
}
6969
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
70-
i, buf, err := searchIndexFile(ctx, p, containerID, acc.PrivateKey(), signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries, debug)
70+
i, buf, err := searchIndexFile(ctx, p, containerID, acc.PrivateKey(), indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries, debug)
7171
if err != nil {
7272
return cli.Exit(fmt.Errorf("failed to find objects: %w", err), 1)
7373
}
@@ -227,33 +227,44 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p pool.Pool, rpc *rpcclient.Cli
227227
}
228228

229229
// searchIndexFile returns the ID and buffer for the next index file to be uploaded.
230-
func searchIndexFile(ctx *cli.Context, p pool.Pool, containerID cid.ID, privKeys *keys.PrivateKey, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) {
230+
func searchIndexFile(ctx *cli.Context, p pool.Pool, containerID cid.ID, privKeys *keys.PrivateKey, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) {
231231
var (
232232
// buf is used to store OIDs of the uploaded blocks.
233233
buf = make([]byte, indexFileSize*oid.Size)
234234
doneCh = make(chan struct{})
235235
errCh = make(chan error)
236236

237-
existingIndexCount = uint(0)
237+
existingIndexCount = 0
238238
filters = object.NewSearchFilters()
239239
)
240240
go func() {
241241
defer close(doneCh)
242242
// Search for existing index files.
243+
filters.AddFilter(attributeKey, fmt.Sprintf("%d", existingIndexCount), object.MatchNumGE)
243244
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
244-
for i := 0; ; i++ {
245-
indexIDs := searchObjects(ctx.Context, p, containerID, privKeys, attributeKey, uint(i), uint(i+1), 1, maxRetries, debug, errCh, filters)
246-
resOIDs := make([]oid.ID, 0, 1)
247-
for id := range indexIDs {
248-
resOIDs = append(resOIDs, id)
249-
}
250-
if len(resOIDs) == 0 {
251-
break
245+
246+
var indexObsj []client.SearchResultItem
247+
err := retry(func() error {
248+
var errBlockSearch error
249+
indexObsj, errBlockSearch = neofs.ObjectSearch(ctx.Context, &p, privKeys, containerID.String(), filters, []string{attributeKey})
250+
return errBlockSearch
251+
}, maxRetries, debug)
252+
if err != nil {
253+
select {
254+
case errCh <- fmt.Errorf("failed to search for index file: %w", err):
255+
default:
252256
}
253-
if len(resOIDs) > 1 {
254-
fmt.Fprintf(ctx.App.Writer, "WARN: %d duplicated index files with index %d found: %s\n", len(resOIDs), i, resOIDs)
257+
return
258+
}
259+
if len(indexObsj) != 0 {
260+
existingIndexCount, err = strconv.Atoi(indexObsj[len(indexObsj)-1].Attributes[0])
261+
if err != nil {
262+
select {
263+
case errCh <- fmt.Errorf("failed to parse index file ID: %w", err):
264+
default:
265+
}
266+
return
255267
}
256-
existingIndexCount++
257268
}
258269
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d\n", existingIndexCount)
259270

@@ -263,64 +274,51 @@ func searchIndexFile(ctx *cli.Context, p pool.Pool, containerID cid.ID, privKeys
263274
// It prevents duplicates.
264275
processedIndices sync.Map
265276
wg sync.WaitGroup
266-
oidCh = make(chan oid.ID, 2*maxParallelSearches)
277+
objCh = make(chan client.SearchResultItem, 2*maxParallelSearches)
267278
)
268279
wg.Add(int(maxParallelSearches))
269280
for range maxParallelSearches {
270281
go func() {
271282
defer wg.Done()
272-
for id := range oidCh {
273-
var obj *object.Object
274-
errRetr := retry(func() error {
275-
var errGet error
276-
obj, errGet = p.ObjectHead(ctx.Context, containerID, id, signer, client.PrmObjectHead{})
277-
return errGet
278-
}, maxRetries, debug)
279-
if errRetr != nil {
280-
select {
281-
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
282-
default:
283-
}
284-
return
285-
}
286-
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
283+
for obj := range objCh {
284+
blockIndex, err := strconv.Atoi(obj.Attributes[0])
287285
if err != nil {
288286
select {
289-
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
287+
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", obj.ID.String(), err):
290288
default:
291289
}
292290
return
293291
}
294292
pos := uint(blockIndex) % indexFileSize
295293
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
296-
copy(buf[pos*oid.Size:], id[:])
294+
copy(buf[pos*oid.Size:], obj.ID[:])
297295
}
298296
}
299297
}()
300298
}
301299

302300
// Search for blocks within the index file range.
303-
objIDs := searchObjects(ctx.Context, p, containerID, privKeys, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, debug, errCh)
304-
for id := range objIDs {
305-
oidCh <- id
301+
blkObjs := searchObjects(ctx.Context, p, containerID, privKeys, blockAttributeKey, uint(existingIndexCount)*indexFileSize, uint(existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, debug, errCh)
302+
for id := range blkObjs {
303+
objCh <- id
306304
}
307-
close(oidCh)
305+
close(objCh)
308306
wg.Wait()
309307
}()
310308

311309
select {
312310
case err := <-errCh:
313-
return existingIndexCount, nil, err
311+
return uint(existingIndexCount), nil, err
314312
case <-doneCh:
315-
return existingIndexCount, buf, nil
313+
return uint(existingIndexCount), buf, nil
316314
}
317315
}
318316

319317
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
320318
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
321319
// OID search is finished. Errors are sent to errCh in a non-blocking way.
322-
func searchObjects(ctx context.Context, p pool.Pool, containerID cid.ID, privKeys *keys.PrivateKey, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
323-
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize)
320+
func searchObjects(ctx context.Context, p pool.Pool, containerID cid.ID, privKeys *keys.PrivateKey, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan client.SearchResultItem {
321+
var res = make(chan client.SearchResultItem, 2*neofs.DefaultSearchBatchSize)
324322
go func() {
325323
var wg sync.WaitGroup
326324
defer close(res)
@@ -341,23 +339,21 @@ func searchObjects(ctx context.Context, p pool.Pool, containerID cid.ID, privKey
341339
go func(start, end uint) {
342340
defer wg.Done()
343341

344-
prm := client.PrmObjectSearch{}
345342
filters := object.NewSearchFilters()
346-
if len(additionalFilters) != 0 {
347-
filters = additionalFilters[0]
348-
}
349343
if end == start+1 {
350344
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchStringEqual)
351345
} else {
352346
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
353347
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
354348
}
355-
prm.SetFilters(filters)
349+
if len(additionalFilters) != 0 {
350+
filters = additionalFilters[0]
351+
}
356352

357-
var objIDs []oid.ID
353+
var objIDs []client.SearchResultItem
358354
err := retry(func() error {
359355
var errBlockSearch error
360-
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, &p, privKeys, containerID.String(), prm)
356+
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, &p, privKeys, containerID.String(), filters, []string{blockAttributeKey})
361357
return errBlockSearch
362358
}, maxRetries, debug)
363359
if err != nil {
@@ -410,20 +406,6 @@ func uploadObj(ctx context.Context, p pool.Pool, signer user.Signer, containerID
410406
return resOID, nil
411407
}
412408

413-
func getBlockIndex(header *object.Object, attribute string) (int, error) {
414-
for _, attr := range header.UserAttributes() {
415-
if attr.Key() == attribute {
416-
value := attr.Value()
417-
blockIndex, err := strconv.Atoi(value)
418-
if err != nil {
419-
return -1, fmt.Errorf("attribute %s has invalid value: %s, error: %w", attribute, value, err)
420-
}
421-
return blockIndex, nil
422-
}
423-
}
424-
return -1, fmt.Errorf("attribute %s not found", attribute)
425-
}
426-
427409
// getContainer gets container by ID and checks its magic.
428410
func getContainer(ctx *cli.Context, p pool.Pool, expectedMagic string, maxRetries uint, debug bool) (cid.ID, error) {
429411
var (

cli/util/upload_state.go

Lines changed: 19 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,11 @@ import (
1010
"github.com/nspcc-dev/neo-go/cli/server"
1111
"github.com/nspcc-dev/neo-go/pkg/config"
1212
"github.com/nspcc-dev/neo-go/pkg/core"
13-
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
1413
gio "github.com/nspcc-dev/neo-go/pkg/io"
14+
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
1515
"github.com/nspcc-dev/neo-go/pkg/util"
1616
"github.com/nspcc-dev/neofs-sdk-go/client"
17-
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1817
"github.com/nspcc-dev/neofs-sdk-go/object"
19-
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
20-
"github.com/nspcc-dev/neofs-sdk-go/pool"
2118
"github.com/urfave/cli/v2"
2219
"go.uber.org/zap"
2320
)
@@ -74,15 +71,26 @@ func uploadState(ctx *cli.Context) error {
7471
if err != nil {
7572
return cli.Exit(err, 1)
7673
}
77-
78-
stateObjCount, err := searchStateIndex(ctx, p, containerID, acc.PrivateKey(), attr, syncInterval, maxRetries, debug)
79-
if err != nil {
80-
return cli.Exit(fmt.Sprintf("failed searching existing states: %v", err), 1)
81-
}
8274
stateModule := chain.GetStateModule()
8375
currentHeight := int(stateModule.CurrentLocalHeight())
8476
currentStateIndex := currentHeight / syncInterval
85-
if currentStateIndex < stateObjCount {
77+
78+
filters := object.NewSearchFilters()
79+
filters.AddFilter(attr, fmt.Sprintf("%d", 0), object.MatchNumGE)
80+
items, err := neofs.ObjectSearch(ctx.Context, &p, acc.PrivateKey(), containerID.String(), filters, []string{attr})
81+
if err != nil {
82+
return cli.Exit(fmt.Sprintf("failed to search state object: %v", err), 1)
83+
}
84+
stateObjHeight := 0
85+
if len(items) != 0 {
86+
stateObjHeight, err = strconv.Atoi(items[len(items)-1].Attributes[0])
87+
}
88+
89+
if err != nil {
90+
return cli.Exit(fmt.Sprintf("failed to parse state object height: %v", err), 1)
91+
}
92+
stateObjCount := stateObjHeight / syncInterval
93+
if currentStateIndex <= stateObjCount {
8694
log.Info("no new states to upload",
8795
zap.Int("number of uploaded state objects", stateObjCount),
8896
zap.Int("latest state is uploaded for block", (stateObjCount-1)*syncInterval),
@@ -96,7 +104,7 @@ func uploadState(ctx *cli.Context) error {
96104
zap.Int("current height", currentHeight),
97105
zap.Int("StateSyncInterval", syncInterval),
98106
zap.Int("number of states to upload", currentStateIndex-stateObjCount))
99-
for state := stateObjCount; state <= currentStateIndex; state++ {
107+
for state := stateObjCount + 1; state <= currentStateIndex; state++ {
100108
height := uint32(state * syncInterval)
101109
stateRoot, err := stateModule.GetStateRoot(height)
102110
if err != nil {
@@ -156,41 +164,6 @@ func uploadState(ctx *cli.Context) error {
156164
return nil
157165
}
158166

159-
func searchStateIndex(ctx *cli.Context, p pool.Pool, containerID cid.ID, privKeys *keys.PrivateKey,
160-
attributeKey string, syncInterval int, maxRetries uint, debug bool,
161-
) (int, error) {
162-
var (
163-
doneCh = make(chan struct{})
164-
errCh = make(chan error)
165-
objCount = 0
166-
)
167-
168-
go func() {
169-
defer close(doneCh)
170-
for i := 0; ; i++ {
171-
indexIDs := searchObjects(ctx.Context, p, containerID, privKeys,
172-
attributeKey, uint(i*syncInterval), uint(i*syncInterval)+1, 1, maxRetries, debug, errCh)
173-
resOIDs := make([]oid.ID, 0, 1)
174-
for id := range indexIDs {
175-
resOIDs = append(resOIDs, id)
176-
}
177-
if len(resOIDs) == 0 {
178-
break
179-
}
180-
if len(resOIDs) > 1 {
181-
fmt.Fprintf(ctx.App.Writer, "WARN: %d duplicated state objects with %s: %d found: %s\n", len(resOIDs), attributeKey, i, resOIDs)
182-
}
183-
objCount++
184-
}
185-
}()
186-
select {
187-
case err := <-errCh:
188-
return objCount, err
189-
case <-doneCh:
190-
return objCount, nil
191-
}
192-
}
193-
194167
func traverseMPT(root util.Uint256, stateModule core.StateRoot, writer *gio.BinWriter) error {
195168
stateModule.SeekStates(root, []byte{}, func(k, v []byte) bool {
196169
writer.WriteVarBytes(k)

pkg/services/blockfetcher/blockfetcher.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -291,14 +291,12 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
291291
case <-bfs.exiterToOIDDownloader:
292292
return nil
293293
default:
294-
prm := client.PrmObjectSearch{}
295294
filters := object.NewSearchFilters()
296295
filters.AddFilter(bfs.cfg.IndexFileAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
297296
filters.AddFilter("IndexSize", fmt.Sprintf("%d", bfs.cfg.IndexFileSize), object.MatchStringEqual)
298-
prm.SetFilters(filters)
299297

300298
ctx, cancel := context.WithTimeout(bfs.Ctx, bfs.cfg.Timeout)
301-
blockOidsObject, err := bfs.objectSearch(ctx, prm)
299+
blockOidsObject, err := bfs.objectSearch(ctx, filters, []string{bfs.cfg.IndexFileAttribute})
302300
cancel()
303301
if err != nil {
304302
if isContextCanceledErr(err) {
@@ -313,7 +311,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
313311

314312
blockCtx, blockCancel := context.WithTimeout(bfs.Ctx, bfs.cfg.Timeout)
315313
defer blockCancel()
316-
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String(), -1)
314+
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].ID.String(), -1)
317315
if err != nil {
318316
if isContextCanceledErr(err) {
319317
return nil
@@ -385,17 +383,15 @@ func (bfs *Service) fetchOIDsBySearch() error {
385383
case <-bfs.exiterToOIDDownloader:
386384
return nil
387385
default:
388-
prm := client.PrmObjectSearch{}
389386
filters := object.NewSearchFilters()
390387
if startIndex == startIndex+batchSize-1 {
391388
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
392389
} else {
393390
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
394391
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
395392
}
396-
prm.SetFilters(filters)
397393
ctx, cancel := context.WithTimeout(bfs.Ctx, bfs.cfg.Timeout)
398-
blockOids, err := bfs.objectSearch(ctx, prm)
394+
blockObjs, err := bfs.objectSearch(ctx, filters, []string{bfs.cfg.BlockAttribute})
399395
cancel()
400396
if err != nil {
401397
if isContextCanceledErr(err) {
@@ -404,18 +400,21 @@ func (bfs *Service) fetchOIDsBySearch() error {
404400
return err
405401
}
406402

407-
if len(blockOids) == 0 {
403+
if len(blockObjs) == 0 {
408404
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex))
409405
return nil
410406
}
411-
index := int(startIndex)
412-
for _, oid := range blockOids {
407+
var index int
408+
for _, obj := range blockObjs {
409+
index, err = strconv.Atoi(obj.Attributes[0])
410+
if err != nil {
411+
return fmt.Errorf("failed to parse block index: %w", err)
412+
}
413413
select {
414414
case <-bfs.exiterToOIDDownloader:
415415
return nil
416-
case bfs.oidsCh <- indexedOID{Index: index, OID: oid}:
416+
case bfs.oidsCh <- indexedOID{Index: index, OID: obj.ID}:
417417
}
418-
index++ //Won't work properly if neofs.ObjectSearch results are not ordered.
419418
}
420419
startIndex += batchSize
421420
}
@@ -549,16 +548,16 @@ func (bfs *Service) objectGetRange(ctx context.Context, oid string, height int)
549548
return rc, err
550549
}
551550

552-
func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) {
551+
func (bfs *Service) objectSearch(ctx context.Context, filters object.SearchFilters, attrs []string) ([]client.SearchResultItem, error) {
553552
var (
554-
oids []oid.ID
555-
err error
553+
items []client.SearchResultItem
554+
err error
556555
)
557556
err = bfs.Retry(func() error {
558-
oids, err = neofs.ObjectSearch(ctx, bfs.Pool, bfs.Account.PrivateKey(), bfs.cfg.ContainerID, prm)
557+
items, err = neofs.ObjectSearch(ctx, bfs.Pool, bfs.Account.PrivateKey(), bfs.cfg.ContainerID, filters, attrs)
559558
return err
560559
})
561-
return oids, err
560+
return items, err
562561
}
563562

564563
// isContextCanceledErr returns whether error is a wrapped [context.Canceled].

0 commit comments

Comments
 (0)