diff --git a/internal/host/static/host.go b/internal/host/static/host.go index 7b967944b5..1fe420c3a3 100644 --- a/internal/host/static/host.go +++ b/internal/host/static/host.go @@ -162,3 +162,13 @@ func (agg *hostAgg) getSetIds() []string { } return ids } + +type deletedHost struct { + PublicId string `gorm:"primary_key"` + DeleteTime *timestamp.Timestamp +} + +// TableName returns the tablename to override the default gorm table name +func (s *deletedHost) TableName() string { + return "static_host_deleted" +} diff --git a/internal/host/static/query.go b/internal/host/static/query.go index 8f1868f27e..1daaeb6d69 100644 --- a/internal/host/static/query.go +++ b/internal/host/static/query.go @@ -45,5 +45,172 @@ final (action, host_id) as ( ) select * from final order by action, host_id; +` + + estimateCountHosts = ` +select sum(reltuples::bigint) as estimate from pg_class where oid in ('static_host'::regclass) +` + + listHostsTemplate = ` +with hosts as ( + select public_id, + create_time, + update_time, + name, + description, + catalog_id, + address, + version + from static_host + where catalog_id = @catalog_id + order by create_time desc, public_id asc + limit %d +), +host_set_ids as ( + select string_agg(distinct set_id, '|') as set_ids, + host_id + from static_host_set_member + where host_id in (select public_id from hosts) + group by host_id +), +final as ( + select h.public_id, + h.create_time, + h.update_time, + h.name, + h.description, + h.catalog_id, + h.address, + h.version, + hsi.set_ids + from hosts h + left outer join host_set_ids hsi on hsi.host_id = h.public_id +) + select * + from final +order by create_time desc, public_id asc; +` + + listHostsPageTemplate = ` +with hosts as ( + select public_id, + create_time, + update_time, + name, + description, + catalog_id, + address, + version + from static_host + where catalog_id = @catalog_id + and (create_time, public_id) < (@last_item_create_time, @last_item_id) + order by create_time desc, public_id asc + limit %d +), +host_set_ids as ( + select string_agg(distinct set_id, '|') as set_ids, + host_id + from static_host_set_member + where host_id in (select public_id from hosts) + group by host_id +), +final as ( + select h.public_id, + h.create_time, + h.update_time, + h.name, + h.description, + h.catalog_id, + h.address, + h.version, + hsi.set_ids + from hosts h + left outer join host_set_ids hsi on hsi.host_id = h.public_id +) + select * + from final +order by create_time desc, public_id asc; +` + + listHostsRefreshTemplate = ` +with hosts as ( + select public_id, + create_time, + update_time, + name, + description, + catalog_id, + address, + version + from static_host + where catalog_id = @catalog_id + and update_time > @updated_after_time + order by update_time desc, public_id asc + limit %d +), +host_set_ids as ( + select string_agg(distinct set_id, '|') as set_ids, + host_id + from static_host_set_member + where host_id in (select public_id from hosts) + group by host_id +), +final as ( + select h.public_id, + h.create_time, + h.update_time, + h.name, + h.description, + h.catalog_id, + h.address, + h.version, + hsi.set_ids + from hosts h + left outer join host_set_ids hsi on hsi.host_id = h.public_id +) + select * + from final +order by update_time desc, public_id asc; +` + listHostsRefreshPageTemplate = ` +with hosts as ( + select public_id, + create_time, + update_time, + name, + description, + catalog_id, + address, + version + from static_host + where catalog_id = @catalog_id + and update_time > @updated_after_time + and (update_time, public_id) < (@last_item_update_time, @last_item_id) + order by update_time desc, public_id asc + limit %d +), +host_set_ids as ( + select string_agg(distinct set_id, '|') as set_ids, + host_id + from static_host_set_member + where host_id in (select public_id from hosts) + group by host_id +), +final as ( + select h.public_id, + h.create_time, + h.update_time, + h.name, + h.description, + h.catalog_id, + h.address, + h.version, + hsi.set_ids + from hosts h + left outer join host_set_ids hsi on hsi.host_id = h.public_id +) + select * + from final +order by update_time desc, public_id asc; ` ) diff --git a/internal/host/static/repository_host.go b/internal/host/static/repository_host.go index fba6391a7a..9997f4c9d1 100644 --- a/internal/host/static/repository_host.go +++ b/internal/host/static/repository_host.go @@ -5,8 +5,10 @@ package static import ( "context" + "database/sql" "fmt" "strings" + "time" "github.com/hashicorp/boundary/globals" "github.com/hashicorp/boundary/internal/db" @@ -226,12 +228,14 @@ func (r *Repository) LookupHost(ctx context.Context, publicId string, opt ...Opt return ha.toHost(), nil } -// ListHosts returns a slice of Hosts for the catalogId. -// WithLimit is the only option supported. -func (r *Repository) ListHosts(ctx context.Context, catalogId string, opt ...Option) ([]*Host, error) { - const op = "static.(Repository).ListHosts" +// listHosts returns a slice of Hosts for the catalogId. +// Supported options: +// - WithLimit which overrides the limit set in the Repository object +// - WithStartPageAfterItem which sets where to start listing from +func (r *Repository) listHosts(ctx context.Context, catalogId string, opt ...Option) ([]*Host, time.Time, error) { + const op = "static.(Repository).listHosts" if catalogId == "" { - return nil, errors.New(ctx, errors.InvalidParameter, op, "no catalog id") + return nil, time.Time{}, errors.New(ctx, errors.InvalidParameter, op, "no catalog id") } opts := getOpts(opt...) limit := r.defaultLimit @@ -239,17 +243,80 @@ func (r *Repository) ListHosts(ctx context.Context, catalogId string, opt ...Opt // non-zero signals an override of the default limit for the repo. limit = opts.withLimit } - var aggs []*hostAgg - err := r.reader.SearchWhere(ctx, &aggs, "catalog_id = ?", []any{catalogId}, db.WithLimit(limit)) - if err != nil { - return nil, errors.Wrap(ctx, err, op) + query := fmt.Sprintf(listHostsTemplate, limit) + args := []any{sql.Named("catalog_id", catalogId)} + if opts.withStartPageAfterItem != nil { + query = fmt.Sprintf(listHostsPageTemplate, limit) + args = append(args, + sql.Named("last_item_create_time", opts.withStartPageAfterItem.GetCreateTime()), + sql.Named("last_item_id", opts.withStartPageAfterItem.GetPublicId()), + ) + } + + return r.queryHosts(ctx, query, args) +} + +// listHostsRefresh returns a slice of Hosts for the catalogId. +// Supported options: +// - WithLimit which overrides the limit set in the Repository object +// - WithStartPageAfterItem which sets where to start listing from +func (r *Repository) listHostsRefresh(ctx context.Context, catalogId string, updatedAfter time.Time, opt ...Option) ([]*Host, time.Time, error) { + const op = "static.(Repository).listHostsRefresh" + switch { + case catalogId == "": + return nil, time.Time{}, errors.New(ctx, errors.InvalidParameter, op, "no catalog id") + case updatedAfter.IsZero(): + return nil, time.Time{}, errors.New(ctx, errors.InvalidParameter, op, "missing updated after time") + } + opts := getOpts(opt...) + limit := r.defaultLimit + if opts.withLimit != 0 { + // non-zero signals an override of the default limit for the repo. + limit = opts.withLimit } - hosts := make([]*Host, 0, len(aggs)) - for _, ha := range aggs { - hosts = append(hosts, ha.toHost()) + query := fmt.Sprintf(listHostsRefreshTemplate, limit) + args := []any{ + sql.Named("catalog_id", catalogId), + sql.Named("updated_after_time", updatedAfter), + } + if opts.withStartPageAfterItem != nil { + query = fmt.Sprintf(listHostsRefreshPageTemplate, limit) + args = append(args, + sql.Named("last_item_update_time", opts.withStartPageAfterItem.GetUpdateTime()), + sql.Named("last_item_id", opts.withStartPageAfterItem.GetPublicId()), + ) } - return hosts, nil + return r.queryHosts(ctx, query, args) +} + +func (r *Repository) queryHosts(ctx context.Context, query string, args []any) ([]*Host, time.Time, error) { + const op = "static.(Repository).queryHosts" + + var hosts []*Host + var transactionTimestamp time.Time + if _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, w db.Writer) error { + rows, err := r.Query(ctx, query, args) + if err != nil { + return err + } + defer rows.Close() + var foundHosts []*hostAgg + for rows.Next() { + if err := r.ScanRows(ctx, rows, &foundHosts); err != nil { + return err + } + } + hosts = make([]*Host, 0, len(foundHosts)) + for _, ha := range foundHosts { + hosts = append(hosts, ha.toHost()) + } + transactionTimestamp, err = r.Now(ctx) + return err + }); err != nil { + return nil, time.Time{}, errors.Wrap(ctx, err, op) + } + return hosts, transactionTimestamp, nil } // DeleteHost deletes the host for the provided id from the repository @@ -290,3 +357,45 @@ func (r *Repository) DeleteHost(ctx context.Context, projectId string, publicId return rowsDeleted, nil } + +// listDeletedHostIds lists the public IDs of any hosts deleted since the timestamp provided, +// and the timestamp of the transaction within which the hosts were listed. +func (r *Repository) listDeletedHostIds(ctx context.Context, since time.Time) ([]string, time.Time, error) { + const op = "static.(Repository).listDeletedHostIds" + var deleteHosts []*deletedHost + var transactionTimestamp time.Time + if _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, _ db.Writer) error { + if err := r.SearchWhere(ctx, &deleteHosts, "delete_time >= ?", []any{since}); err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg("failed to query deleted hosts")) + } + var err error + transactionTimestamp, err = r.Now(ctx) + if err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg("failed to get transaction timestamp")) + } + return nil + }); err != nil { + return nil, time.Time{}, err + } + var hostIds []string + for _, t := range deleteHosts { + hostIds = append(hostIds, t.PublicId) + } + return hostIds, transactionTimestamp, nil +} + +// estimatedHostCount returns an estimate of the total number of static hosts. +func (r *Repository) estimatedHostCount(ctx context.Context) (int, error) { + const op = "static.(Repository).estimatedHostCount" + rows, err := r.reader.Query(ctx, estimateCountHosts, nil) + if err != nil { + return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query static hosts")) + } + var count int + for rows.Next() { + if err := r.reader.ScanRows(ctx, rows, &count); err != nil { + return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query static hosts")) + } + } + return count, nil +} diff --git a/internal/host/static/repository_host_test.go b/internal/host/static/repository_host_test.go index ed6f4f5315..aee131d18e 100644 --- a/internal/host/static/repository_host_test.go +++ b/internal/host/static/repository_host_test.go @@ -923,7 +923,7 @@ func TestRepository_ListHosts(t *testing.T) { repo, err := NewRepository(ctx, rw, rw, kms) assert.NoError(err) require.NotNil(repo) - got, err := repo.ListHosts(ctx, tt.in, tt.opts...) + got, ttime, err := repo.listHosts(ctx, tt.in, tt.opts...) if tt.wantIsErr != 0 { assert.Truef(errors.Match(errors.T(tt.wantIsErr), err), "want err: %q got: %q", tt.wantIsErr, err) assert.Nil(got) @@ -935,6 +935,9 @@ func TestRepository_ListHosts(t *testing.T) { protocmp.Transform(), } assert.Empty(cmp.Diff(tt.want, got, opts...)) + // Transaction timestamp should be within ~10 seconds of now + assert.True(time.Now().Before(ttime.Add(10 * time.Second))) + assert.True(time.Now().After(ttime.Add(-10 * time.Second))) }) } } @@ -997,7 +1000,7 @@ func TestRepository_ListHosts_HostSets(t *testing.T) { repo, err := NewRepository(ctx, rw, rw, kms) assert.NoError(err) require.NotNil(repo) - got, err := repo.ListHosts(ctx, tt.in) + got, ttime, err := repo.listHosts(ctx, tt.in) require.NoError(err) assert.Empty( cmp.Diff( @@ -1010,6 +1013,9 @@ func TestRepository_ListHosts_HostSets(t *testing.T) { }), ), ) + // Transaction timestamp should be within ~10 seconds of now + assert.True(time.Now().Before(ttime.Add(10 * time.Second))) + assert.True(time.Now().After(ttime.Add(-10 * time.Second))) }) } } @@ -1042,21 +1048,11 @@ func TestRepository_ListHosts_Limits(t *testing.T) { repoOpts: []Option{WithLimit(3)}, wantLen: 3, }, - { - name: "With negative repo limit", - repoOpts: []Option{WithLimit(-1)}, - wantLen: count, - }, { name: "With List limit", listOpts: []Option{WithLimit(3)}, wantLen: 3, }, - { - name: "With negative List limit", - listOpts: []Option{WithLimit(-1)}, - wantLen: count, - }, { name: "With repo smaller than list limit", repoOpts: []Option{WithLimit(2)}, @@ -1078,13 +1074,176 @@ func TestRepository_ListHosts_Limits(t *testing.T) { repo, err := NewRepository(ctx, rw, rw, kms, tt.repoOpts...) assert.NoError(err) require.NotNil(repo) - got, err := repo.ListHosts(ctx, hosts[0].CatalogId, tt.listOpts...) + got, ttime, err := repo.listHosts(ctx, hosts[0].CatalogId, tt.listOpts...) require.NoError(err) assert.Len(got, tt.wantLen) + // Transaction timestamp should be within ~10 seconds of now + assert.True(time.Now().Before(ttime.Add(10 * time.Second))) + assert.True(time.Now().After(ttime.Add(-10 * time.Second))) }) } } +func Test_listDeletedHostIds(t *testing.T) { + t.Parallel() + ctx := context.Background() + conn, _ := db.TestSetup(t, "postgres") + wrapper := db.TestWrapper(t) + testKms := kms.TestKms(t, conn, wrapper) + iamRepo := iam.TestRepo(t, conn, wrapper) + _, proj1 := iam.TestScopes(t, iamRepo) + catalog := TestCatalogs(t, conn, proj1.GetPublicId(), 1)[0] + + rw := db.New(conn) + repo, err := NewRepository(ctx, rw, rw, testKms) + require.NoError(t, err) + + // Expect no entries at the start + deletedIds, ttime, err := repo.listDeletedHostIds(ctx, time.Now().AddDate(-1, 0, 0)) + require.NoError(t, err) + require.Empty(t, deletedIds) + // Transaction timestamp should be within ~10 seconds of now + assert.True(t, time.Now().Before(ttime.Add(10*time.Second))) + assert.True(t, time.Now().After(ttime.Add(-10*time.Second))) + + // Delete a host + h := TestHosts(t, conn, catalog.GetPublicId(), 1)[0] + _, err = repo.DeleteHost(ctx, proj1.GetPublicId(), h.GetPublicId()) + require.NoError(t, err) + + // Expect a single entry + deletedIds, ttime, err = repo.listDeletedHostIds(ctx, time.Now().AddDate(-1, 0, 0)) + require.NoError(t, err) + require.Equal(t, []string{h.GetPublicId()}, deletedIds) + assert.True(t, time.Now().Before(ttime.Add(10*time.Second))) + assert.True(t, time.Now().After(ttime.Add(-10*time.Second))) + + // Try again with the time set to now, expect no entries + deletedIds, ttime, err = repo.listDeletedHostIds(ctx, time.Now()) + require.NoError(t, err) + require.Empty(t, deletedIds) + assert.True(t, time.Now().Before(ttime.Add(10*time.Second))) + assert.True(t, time.Now().After(ttime.Add(-10*time.Second))) +} + +func Test_estimatedHostCount(t *testing.T) { + t.Parallel() + ctx := context.Background() + conn, _ := db.TestSetup(t, "postgres") + sqlDb, err := conn.SqlDB(ctx) + require.NoError(t, err) + wrapper := db.TestWrapper(t) + testKms := kms.TestKms(t, conn, wrapper) + iamRepo := iam.TestRepo(t, conn, wrapper) + _, proj1 := iam.TestScopes(t, iamRepo) + catalog := TestCatalogs(t, conn, proj1.GetPublicId(), 1)[0] + + rw := db.New(conn) + repo, err := NewRepository(ctx, rw, rw, testKms) + require.NoError(t, err) + + // Check total entries at start, expect 0 + numItems, err := repo.estimatedHostCount(ctx) + require.NoError(t, err) + assert.Equal(t, 0, numItems) + + // Create a host, expect 1 entries + h := TestHosts(t, conn, catalog.GetPublicId(), 1)[0] + // Run analyze to update estimate + _, err = sqlDb.ExecContext(ctx, "analyze") + require.NoError(t, err) + numItems, err = repo.estimatedHostCount(ctx) + require.NoError(t, err) + assert.Equal(t, 1, numItems) + + // Delete the host, expect 0 again + _, err = repo.DeleteHost(ctx, proj1.GetPublicId(), h.GetPublicId()) + require.NoError(t, err) + // Run analyze to update estimate + _, err = sqlDb.ExecContext(ctx, "analyze") + require.NoError(t, err) + numItems, err = repo.estimatedHostCount(ctx) + require.NoError(t, err) + assert.Equal(t, 0, numItems) +} + +func TestRepository_ListHosts_Pagination(t *testing.T) { + t.Parallel() + ctx := context.Background() + conn, _ := db.TestSetup(t, "postgres") + wrapper := db.TestWrapper(t) + testKms := kms.TestKms(t, conn, wrapper) + iamRepo := iam.TestRepo(t, conn, wrapper) + _, proj1 := iam.TestScopes(t, iamRepo) + catalog := TestCatalogs(t, conn, proj1.GetPublicId(), 1)[0] + + total := 5 + TestHosts(t, conn, catalog.GetPublicId(), total) + + rw := db.New(conn) + repo, err := NewRepository(ctx, rw, rw, testKms) + require.NoError(t, err) + + t.Run("no-options", func(t *testing.T) { + got, ttime, err := repo.listHosts(ctx, catalog.GetPublicId()) + require.NoError(t, err) + assert.Equal(t, total, len(got)) + assert.True(t, time.Now().Before(ttime.Add(10*time.Second))) + assert.True(t, time.Now().After(ttime.Add(-10*time.Second))) + }) + + t.Run("withStartPageAfter", func(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + page1, ttime, err := repo.listHosts( + ctx, + catalog.GetPublicId(), + WithLimit(2), + ) + require.NoError(err) + require.Len(page1, 2) + assert.True(time.Now().Before(ttime.Add(10 * time.Second))) + assert.True(time.Now().After(ttime.Add(-10 * time.Second))) + page2, ttime, err := repo.listHosts( + ctx, + catalog.GetPublicId(), + WithLimit(2), + WithStartPageAfterItem(page1[1]), + ) + require.NoError(err) + require.Len(page2, 2) + for _, item := range page1 { + assert.NotEqual(item.GetPublicId(), page2[0].GetPublicId()) + assert.NotEqual(item.GetPublicId(), page2[1].GetPublicId()) + } + assert.True(time.Now().Before(ttime.Add(10 * time.Second))) + assert.True(time.Now().After(ttime.Add(-10 * time.Second))) + page3, ttime, err := repo.listHosts( + ctx, + catalog.GetPublicId(), + WithLimit(2), + WithStartPageAfterItem(page2[1]), + ) + require.NoError(err) + require.Len(page3, 1) + for _, item := range page2 { + assert.NotEqual(item.GetPublicId(), page3[0].GetPublicId()) + } + assert.True(time.Now().Before(ttime.Add(10 * time.Second))) + assert.True(time.Now().After(ttime.Add(-10 * time.Second))) + page4, ttime, err := repo.listHosts( + ctx, + catalog.GetPublicId(), + WithLimit(2), + WithStartPageAfterItem(page3[0]), + ) + require.NoError(err) + require.Empty(page4, 2) + assert.True(time.Now().Before(ttime.Add(10 * time.Second))) + assert.True(time.Now().After(ttime.Add(-10 * time.Second))) + }) +} + func TestRepository_DeleteHost(t *testing.T) { ctx := context.Background() conn, _ := db.TestSetup(t, "postgres") diff --git a/internal/host/static/service_list_hosts.go b/internal/host/static/service_list_hosts.go new file mode 100644 index 0000000000..ba8b7d7049 --- /dev/null +++ b/internal/host/static/service_list_hosts.go @@ -0,0 +1,62 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package static + +import ( + "context" + "time" + + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/host" + "github.com/hashicorp/boundary/internal/pagination" +) + +// ListHosts lists up to page size hosts, filtering out entries that +// do not pass the filter item function. It will automatically request +// more hosts from the database, at page size chunks, to fill the page. +// It returns a new list token used to continue pagination or refresh items. +// Hosts are ordered by create time descending (most recently created first). +func ListHosts( + ctx context.Context, + grantsHash []byte, + pageSize int, + filterItemFn pagination.ListFilterFunc[host.Host], + repo *Repository, + hostCatalogId string, +) (*pagination.ListResponse[host.Host], error) { + const op = "plugin.ListHosts" + + switch { + case len(grantsHash) == 0: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash") + case pageSize < 1: + return nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1") + case filterItemFn == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter item callback") + case repo == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing repo") + case hostCatalogId == "": + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing host catalog ID") + } + + listItemsFn := func(ctx context.Context, lastPageItem host.Host, limit int) ([]host.Host, time.Time, error) { + opts := []Option{ + WithLimit(limit), + } + if lastPageItem != nil { + opts = append(opts, WithStartPageAfterItem(lastPageItem)) + } + sHosts, listTime, err := repo.listHosts(ctx, hostCatalogId, opts...) + if err != nil { + return nil, time.Time{}, err + } + var hosts []host.Host + for _, host := range sHosts { + hosts = append(hosts, host) + } + return hosts, listTime, nil + } + + return pagination.List(ctx, grantsHash, pageSize, filterItemFn, listItemsFn, repo.estimatedHostCount) +} diff --git a/internal/host/static/service_list_hosts_ext_test.go b/internal/host/static/service_list_hosts_ext_test.go new file mode 100644 index 0000000000..753a6bfdff --- /dev/null +++ b/internal/host/static/service_list_hosts_ext_test.go @@ -0,0 +1,612 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package static_test + +import ( + "context" + "slices" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/hashicorp/boundary/globals" + "github.com/hashicorp/boundary/internal/db" + "github.com/hashicorp/boundary/internal/db/timestamp" + "github.com/hashicorp/boundary/internal/host" + "github.com/hashicorp/boundary/internal/host/static" + "github.com/hashicorp/boundary/internal/host/static/store" + "github.com/hashicorp/boundary/internal/iam" + "github.com/hashicorp/boundary/internal/kms" + "github.com/hashicorp/boundary/internal/listtoken" + "github.com/hashicorp/boundary/internal/types/resource" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestService_ListHosts(t *testing.T) { + // Set database read timeout to avoid duplicates in response + oldReadTimeout := globals.RefreshReadLookbackDuration + globals.RefreshReadLookbackDuration = 0 + t.Cleanup(func() { + globals.RefreshReadLookbackDuration = oldReadTimeout + }) + ctx := context.Background() + conn, _ := db.TestSetup(t, "postgres") + sqlDb, err := conn.SqlDB(ctx) + require.NoError(t, err) + wrapper := db.TestWrapper(t) + testKms := kms.TestKms(t, conn, wrapper) + iamRepo := iam.TestRepo(t, conn, wrapper) + _, proj1 := iam.TestScopes(t, iamRepo) + fiveDaysAgo := time.Now().AddDate(0, 0, -5) + catalog := static.TestCatalogs(t, conn, proj1.GetPublicId(), 1)[0] + var hosts []host.Host + for _, host := range static.TestHosts(t, conn, catalog.GetPublicId(), 5) { + hosts = append(hosts, host) + } + // since we sort by create time descending, we need to reverse the slice + slices.Reverse(hosts) + + // Run analyze to update host estimate + _, err = sqlDb.ExecContext(ctx, "analyze") + require.NoError(t, err) + + rw := db.New(conn) + repo, err := static.NewRepository(ctx, rw, rw, testKms) + require.NoError(t, err) + + cmpOpts := []cmp.Option{ + cmpopts.IgnoreUnexported( + static.Host{}, + store.Host{}, + timestamp.Timestamp{}, + timestamppb.Timestamp{}, + ), + } + + t.Run("ListHosts validation", func(t *testing.T) { + t.Parallel() + t.Run("missing grants hash", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + _, err := static.ListHosts(ctx, nil, 1, filterFunc, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing grants hash") + }) + t.Run("zero page size", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + _, err := static.ListHosts(ctx, []byte("some hash"), 0, filterFunc, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "page size must be at least 1") + }) + t.Run("negative page size", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + _, err := static.ListHosts(ctx, []byte("some hash"), -1, filterFunc, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "page size must be at least 1") + }) + t.Run("nil filter func", func(t *testing.T) { + t.Parallel() + _, err := static.ListHosts(ctx, []byte("some hash"), 1, nil, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing filter item callback") + }) + t.Run("nil repo", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + _, err := static.ListHosts(ctx, []byte("some hash"), 1, filterFunc, nil, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing repo") + }) + t.Run("missing host catalog ID", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + _, err := static.ListHosts(ctx, []byte("some hash"), 1, filterFunc, repo, "") + require.ErrorContains(t, err, "missing host catalog ID") + }) + }) + t.Run("ListHostsPage validation", func(t *testing.T) { + t.Parallel() + t.Run("missing grants hash", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewPagination(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), "some-id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsPage(ctx, nil, 1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing grants hash") + }) + t.Run("zero page size", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewPagination(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), "some-id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsPage(ctx, []byte("some hash"), 0, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "page size must be at least 1") + }) + t.Run("negative page size", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewPagination(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), "some-id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsPage(ctx, []byte("some hash"), -1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "page size must be at least 1") + }) + t.Run("nil filter func", func(t *testing.T) { + t.Parallel() + tok, err := listtoken.NewPagination(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), "some-id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsPage(ctx, []byte("some hash"), 1, nil, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing filter item callback") + }) + t.Run("nil token", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + _, err = static.ListHostsPage(ctx, []byte("some hash"), 1, filterFunc, nil, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing token") + }) + t.Run("wrong token type", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewStartRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsPage(ctx, []byte("some hash"), 1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "token did not have a pagination token component") + }) + t.Run("nil repo", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewPagination(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), "some-id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsPage(ctx, []byte("some hash"), 1, filterFunc, tok, nil, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing repo") + }) + t.Run("missing host catalog ID", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewPagination(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), "some-id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsPage(ctx, []byte("some hash"), 1, filterFunc, tok, repo, "") + require.ErrorContains(t, err, "missing host catalog ID") + }) + t.Run("wrong token resource type", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewPagination(ctx, fiveDaysAgo, resource.Target, []byte("some hash"), "some-id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsPage(ctx, []byte("some hash"), 1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "token did not have a host resource type") + }) + }) + t.Run("ListHostsRefresh validation", func(t *testing.T) { + t.Parallel() + t.Run("missing grants hash", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewStartRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefresh(ctx, nil, 1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing grants hash") + }) + t.Run("zero page size", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewStartRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefresh(ctx, []byte("some hash"), 0, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "page size must be at least 1") + }) + t.Run("negative page size", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewStartRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefresh(ctx, []byte("some hash"), -1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "page size must be at least 1") + }) + t.Run("nil filter func", func(t *testing.T) { + t.Parallel() + tok, err := listtoken.NewStartRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefresh(ctx, []byte("some hash"), 1, nil, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing filter item callback") + }) + t.Run("nil token", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + _, err = static.ListHostsRefresh(ctx, []byte("some hash"), 1, filterFunc, nil, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing token") + }) + t.Run("nil repo", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewStartRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefresh(ctx, []byte("some hash"), 1, filterFunc, tok, nil, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing repo") + }) + t.Run("missing host catalog ID", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewStartRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefresh(ctx, []byte("some hash"), 1, filterFunc, tok, repo, "") + require.ErrorContains(t, err, "missing host catalog ID") + }) + t.Run("wrong token resource type", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewStartRefresh(ctx, fiveDaysAgo, resource.Target, []byte("some hash"), fiveDaysAgo, fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefresh(ctx, []byte("some hash"), 1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "token did not have a host resource type") + }) + }) + t.Run("ListHostsRefreshPage validation", func(t *testing.T) { + t.Parallel() + t.Run("missing grants hash", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo, fiveDaysAgo, "some other id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefreshPage(ctx, nil, 1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing grants hash") + }) + t.Run("zero page size", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo, fiveDaysAgo, "some other id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefreshPage(ctx, []byte("some hash"), 0, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "page size must be at least 1") + }) + t.Run("negative page size", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo, fiveDaysAgo, "some other id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefreshPage(ctx, []byte("some hash"), -1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "page size must be at least 1") + }) + t.Run("nil filter func", func(t *testing.T) { + t.Parallel() + tok, err := listtoken.NewRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo, fiveDaysAgo, "some other id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefreshPage(ctx, []byte("some hash"), 1, nil, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing filter item callback") + }) + t.Run("nil token", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + _, err = static.ListHostsRefreshPage(ctx, []byte("some hash"), 1, filterFunc, nil, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing token") + }) + t.Run("wrong token type", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewPagination(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), "some-id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefreshPage(ctx, []byte("some hash"), 1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "token did not have a refresh token component") + }) + t.Run("nil repo", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo, fiveDaysAgo, "some other id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefreshPage(ctx, []byte("some hash"), 1, filterFunc, tok, nil, catalog.GetPublicId()) + require.ErrorContains(t, err, "missing repo") + }) + t.Run("missing credential store id", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewRefresh(ctx, fiveDaysAgo, resource.Host, []byte("some hash"), fiveDaysAgo, fiveDaysAgo, fiveDaysAgo, "some other id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefreshPage(ctx, []byte("some hash"), 1, filterFunc, tok, repo, "") + require.ErrorContains(t, err, "missing host catalog ID") + }) + t.Run("wrong token resource type", func(t *testing.T) { + t.Parallel() + filterFunc := func(_ context.Context, h host.Host) (bool, error) { + return true, nil + } + tok, err := listtoken.NewRefresh(ctx, fiveDaysAgo, resource.Target, []byte("some hash"), fiveDaysAgo, fiveDaysAgo, fiveDaysAgo, "some other id", fiveDaysAgo) + require.NoError(t, err) + _, err = static.ListHostsRefreshPage(ctx, []byte("some hash"), 1, filterFunc, tok, repo, catalog.GetPublicId()) + require.ErrorContains(t, err, "token did not have a host resource type") + }) + }) + + t.Run("simple pagination", func(t *testing.T) { + filterFunc := func(context.Context, host.Host) (bool, error) { + return true, nil + } + resp, err := static.ListHosts(ctx, []byte("some hash"), 1, filterFunc, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.NotNil(t, resp.ListToken) + require.Equal(t, resp.ListToken.GrantsHash, []byte("some hash")) + require.False(t, resp.CompleteListing) + require.Equal(t, resp.EstimatedItemCount, 5) + require.Empty(t, resp.DeletedIds) + require.Len(t, resp.Items, 1) + require.Empty(t, cmp.Diff(resp.Items[0], hosts[0], cmpOpts...)) + + resp2, err := static.ListHostsPage(ctx, []byte("some hash"), 1, filterFunc, resp.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp2.ListToken.GrantsHash, []byte("some hash")) + require.False(t, resp2.CompleteListing) + require.Equal(t, resp2.EstimatedItemCount, 5) + require.Empty(t, resp2.DeletedIds) + require.Len(t, resp2.Items, 1) + require.Empty(t, cmp.Diff(resp2.Items[0], hosts[1], cmpOpts...)) + + resp3, err := static.ListHostsPage(ctx, []byte("some hash"), 1, filterFunc, resp2.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp3.ListToken.GrantsHash, []byte("some hash")) + require.False(t, resp3.CompleteListing) + require.Equal(t, resp3.EstimatedItemCount, 5) + require.Empty(t, resp3.DeletedIds) + require.Len(t, resp3.Items, 1) + require.Empty(t, cmp.Diff(resp3.Items[0], hosts[2], cmpOpts...)) + + resp4, err := static.ListHostsPage(ctx, []byte("some hash"), 1, filterFunc, resp3.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp4.ListToken.GrantsHash, []byte("some hash")) + require.False(t, resp4.CompleteListing) + require.Equal(t, resp4.EstimatedItemCount, 5) + require.Empty(t, resp4.DeletedIds) + require.Len(t, resp4.Items, 1) + require.Empty(t, cmp.Diff(resp4.Items[0], hosts[3], cmpOpts...)) + + resp5, err := static.ListHostsPage(ctx, []byte("some hash"), 1, filterFunc, resp4.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp5.ListToken.GrantsHash, []byte("some hash")) + require.True(t, resp5.CompleteListing) + require.Equal(t, resp5.EstimatedItemCount, 5) + require.Empty(t, resp5.DeletedIds) + require.Len(t, resp5.Items, 1) + require.Empty(t, cmp.Diff(resp5.Items[0], hosts[4], cmpOpts...)) + + // Finished initial pagination phase, request refresh + // Expect no results. + resp6, err := static.ListHostsRefresh(ctx, []byte("some hash"), 1, filterFunc, resp5.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp6.ListToken.GrantsHash, []byte("some hash")) + require.True(t, resp6.CompleteListing) + require.Equal(t, resp6.EstimatedItemCount, 5) + require.Empty(t, resp6.DeletedIds) + require.Empty(t, resp6.Items) + + // Create some new hosts + host1 := static.TestHosts(t, conn, catalog.GetPublicId(), 1)[0] + host2 := static.TestHosts(t, conn, catalog.GetPublicId(), 1)[0] + t.Cleanup(func() { + _, err := repo.DeleteHost(ctx, proj1.GetPublicId(), host1.GetPublicId()) + require.NoError(t, err) + _, err = repo.DeleteHost(ctx, proj1.GetPublicId(), host2.GetPublicId()) + require.NoError(t, err) + // Run analyze to update count estimate + _, err = sqlDb.ExecContext(ctx, "analyze") + require.NoError(t, err) + }) + + // Run analyze to update count estimate + _, err = sqlDb.ExecContext(ctx, "analyze") + require.NoError(t, err) + + // Refresh again, should get host2 + resp7, err := static.ListHostsRefresh(ctx, []byte("some hash"), 1, filterFunc, resp6.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp7.ListToken.GrantsHash, []byte("some hash")) + require.False(t, resp7.CompleteListing) + require.Equal(t, resp7.EstimatedItemCount, 7) + require.Empty(t, resp7.DeletedIds) + require.Len(t, resp7.Items, 1) + require.Empty(t, cmp.Diff(resp7.Items[0], host2, cmpOpts...)) + + // Refresh again, should get host1 + resp8, err := static.ListHostsRefreshPage(ctx, []byte("some hash"), 1, filterFunc, resp7.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp8.ListToken.GrantsHash, []byte("some hash")) + require.True(t, resp8.CompleteListing) + require.Equal(t, resp8.EstimatedItemCount, 7) + require.Empty(t, resp8.DeletedIds) + require.Len(t, resp8.Items, 1) + require.Empty(t, cmp.Diff(resp8.Items[0], host1, cmpOpts...)) + + // Refresh again, should get no results + resp9, err := static.ListHostsRefresh(ctx, []byte("some hash"), 1, filterFunc, resp8.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp9.ListToken.GrantsHash, []byte("some hash")) + require.True(t, resp9.CompleteListing) + require.Equal(t, resp9.EstimatedItemCount, 7) + require.Empty(t, resp9.DeletedIds) + require.Empty(t, resp9.Items) + }) + + t.Run("simple pagination with aggressive filtering", func(t *testing.T) { + filterFunc := func(ctx context.Context, h host.Host) (bool, error) { + return h.GetPublicId() == hosts[1].GetPublicId() || + h.GetPublicId() == hosts[len(hosts)-1].GetPublicId(), nil + } + resp, err := static.ListHosts(ctx, []byte("some hash"), 1, filterFunc, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.NotNil(t, resp.ListToken) + require.Equal(t, resp.ListToken.GrantsHash, []byte("some hash")) + require.False(t, resp.CompleteListing) + require.Equal(t, resp.EstimatedItemCount, 5) + require.Empty(t, resp.DeletedIds) + require.Len(t, resp.Items, 1) + require.Empty(t, cmp.Diff(resp.Items[0], hosts[1], cmpOpts...)) + + resp2, err := static.ListHostsPage(ctx, []byte("some hash"), 1, filterFunc, resp.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.NotNil(t, resp2.ListToken) + require.Equal(t, resp2.ListToken.GrantsHash, []byte("some hash")) + require.True(t, resp2.CompleteListing) + require.Equal(t, resp2.EstimatedItemCount, 5) + require.Empty(t, resp2.DeletedIds) + require.Len(t, resp2.Items, 1) + require.Empty(t, cmp.Diff(resp2.Items[0], hosts[len(hosts)-1], cmpOpts...)) + + // request a refresh, nothing should be returned + resp3, err := static.ListHostsRefresh(ctx, []byte("some hash"), 1, filterFunc, resp.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp3.ListToken.GrantsHash, []byte("some hash")) + require.True(t, resp3.CompleteListing) + require.Equal(t, resp3.EstimatedItemCount, 5) + require.Empty(t, resp3.DeletedIds) + require.Empty(t, resp3.Items) + + // Create some new hosts + host1 := static.TestHosts(t, conn, catalog.GetPublicId(), 1)[0] + host2 := static.TestHosts(t, conn, catalog.GetPublicId(), 1)[0] + host3 := static.TestHosts(t, conn, catalog.GetPublicId(), 1)[0] + host4 := static.TestHosts(t, conn, catalog.GetPublicId(), 1)[0] + t.Cleanup(func() { + _, err := repo.DeleteHost(ctx, proj1.GetPublicId(), host1.GetPublicId()) + require.NoError(t, err) + _, err = repo.DeleteHost(ctx, proj1.GetPublicId(), host2.GetPublicId()) + require.NoError(t, err) + _, err = repo.DeleteHost(ctx, proj1.GetPublicId(), host3.GetPublicId()) + require.NoError(t, err) + _, err = repo.DeleteHost(ctx, proj1.GetPublicId(), host4.GetPublicId()) + require.NoError(t, err) + // Run analyze to update count estimate + _, err = sqlDb.ExecContext(ctx, "analyze") + require.NoError(t, err) + }) + + // Run analyze to update count estimate + _, err = sqlDb.ExecContext(ctx, "analyze") + require.NoError(t, err) + + filterFunc = func(_ context.Context, h host.Host) (bool, error) { + return h.GetPublicId() == host3.GetPublicId() || + h.GetPublicId() == host1.GetPublicId(), nil + } + // Refresh again, should get host3 + resp4, err := static.ListHostsRefresh(ctx, []byte("some hash"), 1, filterFunc, resp3.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp4.ListToken.GrantsHash, []byte("some hash")) + require.False(t, resp4.CompleteListing) + require.Equal(t, resp4.EstimatedItemCount, 9) + require.Empty(t, resp4.DeletedIds) + require.Len(t, resp4.Items, 1) + require.Empty(t, cmp.Diff(resp4.Items[0], host3, cmpOpts...)) + + // Refresh again, should get host1 + resp5, err := static.ListHostsRefreshPage(ctx, []byte("some hash"), 1, filterFunc, resp4.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp5.ListToken.GrantsHash, []byte("some hash")) + require.True(t, resp5.CompleteListing) + require.Equal(t, resp5.EstimatedItemCount, 9) + require.Empty(t, resp5.DeletedIds) + require.Len(t, resp5.Items, 1) + require.Empty(t, cmp.Diff(resp5.Items[0], host1, cmpOpts...)) + }) + + t.Run("simple pagination with deletion", func(t *testing.T) { + filterFunc := func(context.Context, host.Host) (bool, error) { + return true, nil + } + deletedHostId := hosts[0].GetPublicId() + _, err := repo.DeleteHost(ctx, proj1.PublicId, deletedHostId) + require.NoError(t, err) + hosts = hosts[1:] + + // Run analyze to update count estimate + _, err = sqlDb.ExecContext(ctx, "analyze") + require.NoError(t, err) + + resp, err := static.ListHosts(ctx, []byte("some hash"), 1, filterFunc, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.NotNil(t, resp.ListToken) + require.Equal(t, resp.ListToken.GrantsHash, []byte("some hash")) + require.False(t, resp.CompleteListing) + require.Equal(t, resp.EstimatedItemCount, 4) + require.Empty(t, resp.DeletedIds) + require.Len(t, resp.Items, 1) + require.Empty(t, cmp.Diff(resp.Items[0], hosts[0], cmpOpts...)) + + // request remaining results + resp2, err := static.ListHostsPage(ctx, []byte("some hash"), 3, filterFunc, resp.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp2.ListToken.GrantsHash, []byte("some hash")) + require.True(t, resp2.CompleteListing) + require.Equal(t, resp2.EstimatedItemCount, 4) + require.Empty(t, resp2.DeletedIds) + require.Len(t, resp2.Items, 3) + require.Empty(t, cmp.Diff(resp2.Items, hosts[1:], cmpOpts...)) + + deletedHostId = hosts[0].GetPublicId() + _, err = repo.DeleteHost(ctx, proj1.PublicId, deletedHostId) + require.NoError(t, err) + hosts = hosts[1:] + + // Run analyze to update count estimate + _, err = sqlDb.ExecContext(ctx, "analyze") + require.NoError(t, err) + + // request a refresh, nothing should be returned except the deleted id + resp3, err := static.ListHostsRefresh(ctx, []byte("some hash"), 1, filterFunc, resp2.ListToken, repo, catalog.GetPublicId()) + require.NoError(t, err) + require.Equal(t, resp3.ListToken.GrantsHash, []byte("some hash")) + require.True(t, resp3.CompleteListing) + require.Equal(t, resp3.EstimatedItemCount, 3) + require.Contains(t, resp3.DeletedIds, deletedHostId) + require.Empty(t, resp3.Items) + }) +} diff --git a/internal/host/static/service_list_hosts_page.go b/internal/host/static/service_list_hosts_page.go new file mode 100644 index 0000000000..b10fe15f30 --- /dev/null +++ b/internal/host/static/service_list_hosts_page.go @@ -0,0 +1,79 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package static + +import ( + "context" + "time" + + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/host" + "github.com/hashicorp/boundary/internal/listtoken" + "github.com/hashicorp/boundary/internal/pagination" + "github.com/hashicorp/boundary/internal/types/resource" +) + +// ListHostsPage lists up to page size hosts, filtering out entries that +// do not pass the filter item function. It will automatically request +// more hosts from the database, at page size chunks, to fill the page. +// It will start its paging based on the information in the token. +// It returns a new list token used to continue pagination or refresh items. +// Hosts are ordered by create time descending (most recently created first). +func ListHostsPage( + ctx context.Context, + grantsHash []byte, + pageSize int, + filterItemFn pagination.ListFilterFunc[host.Host], + tok *listtoken.Token, + repo *Repository, + hostCatalogId string, +) (*pagination.ListResponse[host.Host], error) { + const op = "plugin.ListHostsPage" + + switch { + case len(grantsHash) == 0: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash") + case pageSize < 1: + return nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1") + case filterItemFn == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter item callback") + case tok == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing token") + case repo == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing repo") + case hostCatalogId == "": + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing host catalog ID") + case tok.ResourceType != resource.Host: + return nil, errors.New(ctx, errors.InvalidParameter, op, "token did not have a host resource type") + } + if _, ok := tok.Subtype.(*listtoken.PaginationToken); !ok { + return nil, errors.New(ctx, errors.InvalidParameter, op, "token did not have a pagination token component") + } + + listItemsFn := func(ctx context.Context, lastPageItem host.Host, limit int) ([]host.Host, time.Time, error) { + opts := []Option{ + WithLimit(limit), + } + if lastPageItem != nil { + opts = append(opts, WithStartPageAfterItem(lastPageItem)) + } else { + lastItem, err := tok.LastItem(ctx) + if err != nil { + return nil, time.Time{}, err + } + opts = append(opts, WithStartPageAfterItem(lastItem)) + } + sHosts, listTime, err := repo.listHosts(ctx, hostCatalogId, opts...) + if err != nil { + return nil, time.Time{}, err + } + var hosts []host.Host + for _, host := range sHosts { + hosts = append(hosts, host) + } + return hosts, listTime, nil + } + + return pagination.ListPage(ctx, grantsHash, pageSize, filterItemFn, listItemsFn, repo.estimatedHostCount, tok) +} diff --git a/internal/host/static/service_list_hosts_refresh.go b/internal/host/static/service_list_hosts_refresh.go new file mode 100644 index 0000000000..aa126432f2 --- /dev/null +++ b/internal/host/static/service_list_hosts_refresh.go @@ -0,0 +1,81 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package static + +import ( + "context" + "time" + + "github.com/hashicorp/boundary/globals" + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/host" + "github.com/hashicorp/boundary/internal/listtoken" + "github.com/hashicorp/boundary/internal/pagination" + "github.com/hashicorp/boundary/internal/types/resource" +) + +// ListHostsRefresh lists hosts according to the page size +// and list token, filtering out entries that do not +// pass the filter item fn. It returns a new list token +// based on the old one, the grants hash, and the returned +// hosts. It also returns the plugin associated with the host catalog. +func ListHostsRefresh( + ctx context.Context, + grantsHash []byte, + pageSize int, + filterItemFn pagination.ListFilterFunc[host.Host], + tok *listtoken.Token, + repo *Repository, + hostCatalogId string, +) (*pagination.ListResponse[host.Host], error) { + const op = "plugin.ListHostsRefresh" + + switch { + case len(grantsHash) == 0: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash") + case pageSize < 1: + return nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1") + case filterItemFn == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter item callback") + case tok == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing token") + case repo == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing repo") + case hostCatalogId == "": + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing host catalog ID") + case tok.ResourceType != resource.Host: + return nil, errors.New(ctx, errors.InvalidParameter, op, "token did not have a host resource type") + } + rt, ok := tok.Subtype.(*listtoken.StartRefreshToken) + if !ok { + return nil, errors.New(ctx, errors.InvalidParameter, op, "token did not have a start-refresh token component") + } + + listItemsFn := func(ctx context.Context, lastPageItem host.Host, limit int) ([]host.Host, time.Time, error) { + opts := []Option{ + WithLimit(limit), + } + if lastPageItem != nil { + opts = append(opts, WithStartPageAfterItem(lastPageItem)) + } + // Add the database read timeout to account for any creations missed due to concurrent + // transactions in the initial pagination phase. + sHosts, listTime, err := repo.listHostsRefresh(ctx, hostCatalogId, rt.PreviousPhaseUpperBound.Add(-globals.RefreshReadLookbackDuration), opts...) + if err != nil { + return nil, time.Time{}, err + } + var hosts []host.Host + for _, host := range sHosts { + hosts = append(hosts, host) + } + return hosts, listTime, nil + } + listDeletedIdsFn := func(ctx context.Context, since time.Time) ([]string, time.Time, error) { + // Add the database read timeout to account for any deletes missed due to concurrent + // transactions in the original list pagination phase. + return repo.listDeletedHostIds(ctx, since.Add(-globals.RefreshReadLookbackDuration)) + } + + return pagination.ListRefresh(ctx, grantsHash, pageSize, filterItemFn, listItemsFn, repo.estimatedHostCount, listDeletedIdsFn, tok) +} diff --git a/internal/host/static/service_list_hosts_refresh_page.go b/internal/host/static/service_list_hosts_refresh_page.go new file mode 100644 index 0000000000..0172b76117 --- /dev/null +++ b/internal/host/static/service_list_hosts_refresh_page.go @@ -0,0 +1,91 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package static + +import ( + "context" + "time" + + "github.com/hashicorp/boundary/globals" + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/host" + "github.com/hashicorp/boundary/internal/listtoken" + "github.com/hashicorp/boundary/internal/pagination" + "github.com/hashicorp/boundary/internal/types/resource" +) + +// ListHostsRefreshPage lists up to page size hosts, filtering out entries that +// do not pass the filter item function. It will automatically request +// more hosts from the database, at page size chunks, to fill the page. +// It will start its paging based on the information in the token. +// It returns a new list token used to continue pagination or refresh items. +// Hosts are ordered by update time descending (most recently updated first). +// Hosts may contain items that were already returned during the initial +// pagination phase. It also returns a list of any hosts deleted since the +// last response. +func ListHostsRefreshPage( + ctx context.Context, + grantsHash []byte, + pageSize int, + filterItemFn pagination.ListFilterFunc[host.Host], + tok *listtoken.Token, + repo *Repository, + hostCatalogId string, +) (*pagination.ListResponse[host.Host], error) { + const op = "credential.ListHostsRefreshPage" + + switch { + case len(grantsHash) == 0: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash") + case pageSize < 1: + return nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1") + case filterItemFn == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter item callback") + case tok == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing token") + case repo == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing repo") + case hostCatalogId == "": + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing host catalog ID") + case tok.ResourceType != resource.Host: + return nil, errors.New(ctx, errors.InvalidParameter, op, "token did not have a host resource type") + } + rt, ok := tok.Subtype.(*listtoken.RefreshToken) + if !ok { + return nil, errors.New(ctx, errors.InvalidParameter, op, "token did not have a refresh token component") + } + + listItemsFn := func(ctx context.Context, lastPageItem host.Host, limit int) ([]host.Host, time.Time, error) { + opts := []Option{ + WithLimit(limit), + } + if lastPageItem != nil { + opts = append(opts, WithStartPageAfterItem(lastPageItem)) + } else { + lastItem, err := tok.LastItem(ctx) + if err != nil { + return nil, time.Time{}, err + } + opts = append(opts, WithStartPageAfterItem(lastItem)) + } + // Add the database read timeout to account for any creations missed due to concurrent + // transactions in the original list pagination phase. + sHosts, listTime, err := repo.listHostsRefresh(ctx, hostCatalogId, rt.PhaseLowerBound.Add(-globals.RefreshReadLookbackDuration), opts...) + if err != nil { + return nil, time.Time{}, err + } + var hosts []host.Host + for _, host := range sHosts { + hosts = append(hosts, host) + } + return hosts, listTime, nil + } + listDeletedIdsFn := func(ctx context.Context, since time.Time) ([]string, time.Time, error) { + // Add the database read timeout to account for any deletes missed due to concurrent + // transactions in the original list pagination phase. + return repo.listDeletedHostIds(ctx, since.Add(-globals.RefreshReadLookbackDuration)) + } + + return pagination.ListRefreshPage(ctx, grantsHash, pageSize, filterItemFn, listItemsFn, repo.estimatedHostCount, listDeletedIdsFn, tok) +}