diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index b0d2d35ccb..29b62f53a4 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -40,8 +40,10 @@ import ( objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" + protosession "github.com/nspcc-dev/neofs-sdk-go/proto/session" apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/neofs-sdk-go/version" "go.uber.org/zap" ) @@ -244,6 +246,7 @@ func initObjectService(c *cfg) { mNumber, err := c.shared.basics.cli.MagicNumber() fatalOnErr(err) + os := &objectSource{get: sGet} sPut := putsvc.NewService(&transport{clients: putConstructor}, c, putsvc.WithNetworkMagic(mNumber), putsvc.WithKeyStorage(keyStorage), @@ -257,7 +260,7 @@ func initObjectService(c *cfg) { putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), putsvc.WithLogger(c.log), putsvc.WithSplitChainVerifier(split.NewVerifier(sGet)), - putsvc.WithTombstoneVerifier(tombstone.NewVerifier(objectSource{sGet, sSearch})), + putsvc.WithTombstoneVerifier(tombstone.NewVerifier(os)), ) sDelete := deletesvc.New( @@ -314,6 +317,7 @@ func initObjectService(c *cfg) { keys: keyStorage, } server := objectService.New(objSvc, mNumber, fsChain, storage, c.metaService, c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc, coreConstructor) + os.server = server for _, srv := range c.cfgGRPC.servers { protoobject.RegisterObjectServiceServer(srv, server) @@ -639,16 +643,9 @@ func (x storageForObjectService) GetSessionPrivateKey(usr user.ID, uid uuid.UUID type objectSource struct { get *getsvc.Service - search *searchsvc.Service -} - -type searchWriter struct { - ids []oid.ID -} - -func (w *searchWriter) WriteIDs(ids []oid.ID) error { - w.ids = append(w.ids, ids...) - return nil + server interface { + ProcessSearch(ctx context.Context, req *protoobject.SearchV2Request) ([]client.SearchResultItem, []byte, error) + } } func (o objectSource) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { @@ -664,20 +661,34 @@ func (o objectSource) Head(ctx context.Context, addr oid.Address) (*objectSDK.Ob return hw.h, err } -func (o objectSource) Search(ctx context.Context, cnr cid.ID, filters objectSDK.SearchFilters) ([]oid.ID, error) { - var sw searchWriter - - var sPrm searchsvc.Prm - sPrm.SetWriter(&sw) - sPrm.WithSearchFilters(filters) - sPrm.WithContainerID(cnr) +func (o objectSource) Search(ctx context.Context, cnr cid.ID, filters objectSDK.SearchFilters, childV2 bool) ([]client.SearchResultItem, error) { + var attrs []string + count := uint32(1) + if !childV2 { + count = 1000 + attrs = []string{objectSDK.FilterPayloadSize} + } - err := o.search.Search(ctx, sPrm) + req := &protoobject.SearchV2Request{ + Body: &protoobject.SearchV2Request_Body{ + ContainerId: cnr.ProtoMessage(), + Version: 1, + Filters: filters.ProtoMessage(), + Cursor: "", + Count: count, + Attributes: attrs, + }, + MetaHeader: &protosession.RequestMetaHeader{ + Version: version.Current().ProtoMessage(), + Ttl: 2, + }, + } + res, _, err := o.server.ProcessSearch(ctx, req) if err != nil { return nil, err } - return sw.ids, nil + return res, nil } // IsLocalNodePublicKey checks whether given binary-encoded public key is diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 27d332e1e8..2279b9ed0b 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -192,7 +192,7 @@ type server struct { } // New provides protoobject.ObjectServiceServer for the given parameters. -func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, metaSvc *metasvc.Meta, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp ACLInfoExtractor, cs searchsvc.ClientConstructor) protoobject.ObjectServiceServer { +func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, metaSvc *metasvc.Meta, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp ACLInfoExtractor, cs searchsvc.ClientConstructor) *server { // TODO: configurable capacity sp, err := ants.NewPool(100, ants.WithNonblocking(true)) if err != nil { @@ -1866,10 +1866,6 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear if body.ContainerId == nil { return nil, errors.New("missing container ID") } - var cID cid.ID - if err := cID.FromProtoMessage(body.ContainerId); err != nil { - return nil, fmt.Errorf("invalid container ID: %w", err) - } if body.Version != 1 { return nil, errors.New("unsupported query version") } @@ -1900,25 +1896,36 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear if len(body.Attributes) > 0 && (len(body.Filters) == 0 || body.Filters[0].Key != body.Attributes[0]) { return nil, errors.New("primary attribute must be filtered 1st") } - ttl := req.MetaHeader.GetTtl() - if ttl == 0 { - return nil, errors.New("zero TTL") + + res, newCursor, err := s.ProcessSearch(ctx, req) + if err != nil { + return nil, err } - var fs object.SearchFilters - if err := fs.FromProtoMessage(body.Filters); err != nil { - return nil, fmt.Errorf("invalid filters: %w", err) + + resBody := &protoobject.SearchV2Response_Body{ + Result: make([]*protoobject.SearchV2Response_OIDWithMeta, len(res)), } - ofs, cursor, err := objectcore.PreprocessSearchQuery(fs, body.Attributes, body.Cursor) - if err != nil { - if errors.Is(err, objectcore.ErrUnreachableQuery) { - return nil, nil + for i := range res { + resBody.Result[i] = &protoobject.SearchV2Response_OIDWithMeta{ + Id: res[i].ID.ProtoMessage(), + Attributes: res[i].Attributes, } - return nil, err } + if newCursor != nil { + resBody.Cursor = base64.StdEncoding.EncodeToString(newCursor) + } + return resBody, nil +} +func (s *server) ProcessSearch(ctx context.Context, req *protoobject.SearchV2Request) ([]sdkclient.SearchResultItem, []byte, error) { + body := req.GetBody() + var cID cid.ID + if err := cID.FromProtoMessage(body.ContainerId); err != nil { + return nil, nil, fmt.Errorf("invalid container ID: %w", err) + } cnr, err := s.fsChain.Get(cID) if err != nil { - return nil, fmt.Errorf("fetching container: %w", err) + return nil, nil, fmt.Errorf("fetching container: %w", err) } var handleWithMetaService bool const metaOnChainAttr = "__NEOFS__METAINFO_CONSISTENCY" @@ -1928,18 +1935,34 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear default: } + ttl := req.MetaHeader.GetTtl() + if ttl == 0 { + return nil, nil, errors.New("zero TTL") + } + var fs object.SearchFilters + if err = fs.FromProtoMessage(body.Filters); err != nil { + return nil, nil, fmt.Errorf("invalid filters: %w", err) + } + ofs, cursor, err := objectcore.PreprocessSearchQuery(fs, body.Attributes, body.Cursor) + if err != nil { + if errors.Is(err, objectcore.ErrUnreachableQuery) { + return nil, nil, nil + } + return nil, nil, err + } + var res []sdkclient.SearchResultItem var newCursor []byte count := uint16(body.Count) // legit according to the limit switch { case ttl == 1: if res, newCursor, err = s.storage.SearchObjects(cID, ofs, body.Attributes, cursor, count); err != nil { - return nil, err + return nil, nil, err } case handleWithMetaService: res, newCursor, err = s.meta.Search(cID, ofs, body.Attributes, cursor, count) if err != nil { - return nil, err + return nil, nil, err } default: var signed bool @@ -1973,7 +1996,7 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear } if !signed { req.MetaHeader = &protosession.RequestMetaHeader{Ttl: 1, Origin: req.MetaHeader} - if req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(s.signer), req, nil); err != nil { + if req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer[*protoobject.SearchV2Request_Body](neofsecdsa.Signer(s.signer), req, nil); err != nil { resErr = fmt.Errorf("sign request: %w", err) return false } @@ -1995,41 +2018,29 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear err = resErr } if err != nil { - return nil, err + return nil, nil, err } var ( firstAttr string firstFilter *object.SearchFilter ) if len(body.Attributes) > 0 { - firstAttr = body.Filters[0].Key + firstAttr = fs[0].Header() firstFilter = &ofs[0].SearchFilter } cmpInt := firstAttr != "" && objectcore.IsIntegerSearchOp(fs[0].Operation()) var more bool if res, more, err = objectcore.MergeSearchResults(count, firstAttr, cmpInt, sets, mores); err != nil { - return nil, fmt.Errorf("merge results from container nodes: %w", err) + return nil, nil, fmt.Errorf("merge results from container nodes: %w", err) } if more { if newCursor, err = objectcore.CalculateCursor(firstFilter, res[len(res)-1]); err != nil { - return nil, fmt.Errorf("recalculate cursor: %w", err) + return nil, nil, fmt.Errorf("recalculate cursor: %w", err) } } } - resBody := &protoobject.SearchV2Response_Body{ - Result: make([]*protoobject.SearchV2Response_OIDWithMeta, len(res)), - } - for i := range res { - resBody.Result[i] = &protoobject.SearchV2Response_OIDWithMeta{ - Id: res[i].ID.ProtoMessage(), - Attributes: res[i].Attributes, - } - } - if newCursor != nil { - resBody.Cursor = base64.StdEncoding.EncodeToString(newCursor) - } - return resBody, nil + return res, newCursor, nil } func (s *server) searchOnRemoteNode(ctx context.Context, node sdknetmap.NodeInfo, req *protoobject.SearchV2Request) ([]sdkclient.SearchResultItem, bool, error) { diff --git a/pkg/services/object/tombstone/verify.go b/pkg/services/object/tombstone/verify.go index 033f886681..4aab7bb82a 100644 --- a/pkg/services/object/tombstone/verify.go +++ b/pkg/services/object/tombstone/verify.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "strconv" + "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -26,7 +28,7 @@ type ObjectSource interface { // Search returns objects that satisfy provided search filters and // any error that does not allow processing operation. - Search(ctx context.Context, cnr cid.ID, filter object.SearchFilters) ([]oid.ID, error) + Search(ctx context.Context, cnr cid.ID, filter object.SearchFilters, childV2 bool) ([]client.SearchResultItem, error) } // Verifier implements [object.TombVerifier] interface. @@ -130,9 +132,10 @@ func (v *Verifier) verifyMember(ctx context.Context, cnr cid.ID, member oid.ID) func (v *Verifier) verifyV1Child(ctx context.Context, cnr cid.ID, sID object.SplitID) error { filters := object.SearchFilters{} + filters.AddPayloadSizeFilter(object.MatchNumGT, 0) filters.AddSplitIDFilter(object.MatchStringEqual, sID) - ids, err := v.objs.Search(ctx, cnr, filters) + res, err := v.objs.Search(ctx, cnr, filters, false) if err != nil { return fmt.Errorf("searching objects: %w", err) } @@ -140,18 +143,15 @@ func (v *Verifier) verifyV1Child(ctx context.Context, cnr cid.ID, sID object.Spl var addr oid.Address addr.SetContainer(cnr) - for _, child := range ids { - addr.SetObject(child) + for _, child := range res { + addr.SetObject(child.ID) - header, err := v.objs.Head(ctx, addr) + payload, err := strconv.Atoi(child.Attributes[0]) if err != nil { - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) { // see similar call - return nil - } - return fmt.Errorf("heading %s object that was searched: %w", addr, err) + return fmt.Errorf("parsing payload size: %w", err) } - if len(header.Children()) != 0 { + if payload != 0 { return fmt.Errorf("found link object %s", addr) } } @@ -164,7 +164,7 @@ func (v *Verifier) verifyV2Child(ctx context.Context, cnr cid.ID, firstObject oi filters.AddFirstSplitObjectFilter(object.MatchStringEqual, firstObject) filters.AddTypeFilter(object.MatchStringEqual, object.TypeLink) - ids, err := v.objs.Search(ctx, cnr, filters) + ids, err := v.objs.Search(ctx, cnr, filters, true) if err != nil { return fmt.Errorf("searching objects: %w", err) } @@ -174,7 +174,7 @@ func (v *Verifier) verifyV2Child(ctx context.Context, cnr cid.ID, firstObject oi // no link object, child can be deleted return nil case 1: - return fmt.Errorf("found link object %s", ids[0]) + return fmt.Errorf("found link object %s", ids[0].ID) default: // more than one link object somehow, sad but // nothing can be done here diff --git a/pkg/services/object/tombstone/verify_test.go b/pkg/services/object/tombstone/verify_test.go index 8ae8e00107..b403b7bc60 100644 --- a/pkg/services/object/tombstone/verify_test.go +++ b/pkg/services/object/tombstone/verify_test.go @@ -2,9 +2,10 @@ package tombstone import ( "context" + "fmt" "testing" - objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" @@ -21,8 +22,8 @@ type headRes struct { } type testObjectSource struct { - searchV1 map[object.SplitID][]oid.ID - searchV2 map[oid.ID][]oid.ID + searchV1 map[object.SplitID][]client.SearchResultItem + searchV2 map[oid.ID][]client.SearchResultItem head map[oid.Address]headRes } @@ -31,36 +32,31 @@ func (t *testObjectSource) Head(_ context.Context, addr oid.Address) (*object.Ob return res.h, res.err } -func (t *testObjectSource) Search(_ context.Context, _ cid.ID, ff object.SearchFilters) ([]oid.ID, error) { - f := ff[0] - - switch f.Header() { - case object.FilterSplitID: - if t.searchV1 == nil { +func (t *testObjectSource) Search(_ context.Context, _ cid.ID, ff object.SearchFilters, childV2 bool) ([]client.SearchResultItem, error) { + if childV2 { + if t.searchV2 == nil { return nil, nil } - var splitID object.SplitID - err := splitID.Parse(f.Value()) + var firstObject oid.ID + err := firstObject.DecodeString(ff[0].Value()) if err != nil { panic(err) } - return t.searchV1[splitID], nil - case object.FilterFirstSplitObject: - if t.searchV2 == nil { + return t.searchV2[firstObject], nil + } else { + if t.searchV1 == nil { return nil, nil } - var firstObject oid.ID - err := firstObject.DecodeString(f.Value()) + var splitID object.SplitID + err := splitID.Parse(ff[1].Value()) if err != nil { panic(err) } - return t.searchV2[firstObject], nil - default: - panic("unexpected search call") + return t.searchV1[splitID], nil } } @@ -129,19 +125,16 @@ func TestVerifier_VerifyTomb(t *testing.T) { link.SetChildren(childID) linkID := link.GetID() - objectcore.AddressOf(&link) - *os = testObjectSource{ head: map[oid.Address]headRes{ addr: { h: &child, }, - objectcore.AddressOf(&link): { - h: &link, - }, }, - searchV1: map[object.SplitID][]oid.ID{ - *splitID: {linkID}, + searchV1: map[object.SplitID][]client.SearchResultItem{ + *splitID: {client.SearchResultItem{ + ID: linkID, + Attributes: []string{fmt.Sprint(len(link.Payload()))}}}, }, } @@ -161,8 +154,8 @@ func TestVerifier_VerifyTomb(t *testing.T) { h: &child, }, }, - searchV2: map[oid.ID][]oid.ID{ - childID: {oidtest.ID()}, // the first object is a chain ID in itself + searchV2: map[oid.ID][]client.SearchResultItem{ + childID: {client.SearchResultItem{ID: oidtest.ID()}}, // the first object is a chain ID in itself }, } @@ -193,8 +186,8 @@ func TestVerifier_VerifyTomb(t *testing.T) { h: &child, }, }, - searchV2: map[oid.ID][]oid.ID{ - firstObject: {oidtest.ID()}, + searchV2: map[oid.ID][]client.SearchResultItem{ + firstObject: {client.SearchResultItem{ID: oidtest.ID()}}, }, } @@ -249,12 +242,15 @@ func TestVerifier_VerifyTomb(t *testing.T) { os.head[oid.NewAddress(cnr, rootV1ID)] = headRes{h: &rootV1Hdr} v1Children := oidtest.IDs(3) + v1ChildrenItems := make([]client.SearchResultItem, len(v1Children)) for i := range v1Children { - os.head[oid.NewAddress(cnr, v1Children[i])] = headRes{err: apistatus.ErrObjectAlreadyRemoved} + v1ChildrenItems[i] = client.SearchResultItem{ + ID: v1Children[i], + Attributes: []string{"0"}} } - os.searchV1 = make(map[object.SplitID][]oid.ID) - os.searchV1[splitID] = v1Children + os.searchV1 = make(map[object.SplitID][]client.SearchResultItem) + os.searchV1[splitID] = v1ChildrenItems require.NoError(t, v.VerifyTomb(ctx, cnr, tomb)) })