Skip to content

Commit

Permalink
internal/host: add static host pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
johanbrandhorst committed Dec 18, 2023
1 parent 1e1941c commit 4407abd
Show file tree
Hide file tree
Showing 9 changed files with 1,396 additions and 26 deletions.
10 changes: 10 additions & 0 deletions internal/host/static/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,13 @@ func (agg *hostAgg) getSetIds() []string {
}
return ids
}

type deletedHost struct {
PublicId string `gorm:"primary_key"`
DeleteTime *timestamp.Timestamp
}

// TableName returns the tablename to override the default gorm table name
func (s *deletedHost) TableName() string {
return "static_host_deleted"
}
167 changes: 167 additions & 0 deletions internal/host/static/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,172 @@ final (action, host_id) as (
)
select * from final
order by action, host_id;
`

estimateCountHosts = `
select sum(reltuples::bigint) as estimate from pg_class where oid in ('static_host'::regclass)
`

listHostsTemplate = `
with hosts as (
select public_id,
create_time,
update_time,
name,
description,
catalog_id,
address,
version
from static_host
where catalog_id = @catalog_id
order by create_time desc, public_id asc
limit %d
),
host_set_ids as (
select string_agg(distinct set_id, '|') as set_ids,
host_id
from static_host_set_member
where host_id in (select public_id from hosts)
group by host_id
),
final as (
select h.public_id,
h.create_time,
h.update_time,
h.name,
h.description,
h.catalog_id,
h.address,
h.version,
hsi.set_ids
from hosts h
left outer join host_set_ids hsi on hsi.host_id = h.public_id
)
select *
from final
order by create_time desc, public_id asc;
`

listHostsPageTemplate = `
with hosts as (
select public_id,
create_time,
update_time,
name,
description,
catalog_id,
address,
version
from static_host
where catalog_id = @catalog_id
and (create_time, public_id) < (@last_item_create_time, @last_item_id)
order by create_time desc, public_id asc
limit %d
),
host_set_ids as (
select string_agg(distinct set_id, '|') as set_ids,
host_id
from static_host_set_member
where host_id in (select public_id from hosts)
group by host_id
),
final as (
select h.public_id,
h.create_time,
h.update_time,
h.name,
h.description,
h.catalog_id,
h.address,
h.version,
hsi.set_ids
from hosts h
left outer join host_set_ids hsi on hsi.host_id = h.public_id
)
select *
from final
order by create_time desc, public_id asc;
`

listHostsRefreshTemplate = `
with hosts as (
select public_id,
create_time,
update_time,
name,
description,
catalog_id,
address,
version
from static_host
where catalog_id = @catalog_id
and update_time > @updated_after_time
order by update_time desc, public_id asc
limit %d
),
host_set_ids as (
select string_agg(distinct set_id, '|') as set_ids,
host_id
from static_host_set_member
where host_id in (select public_id from hosts)
group by host_id
),
final as (
select h.public_id,
h.create_time,
h.update_time,
h.name,
h.description,
h.catalog_id,
h.address,
h.version,
hsi.set_ids
from hosts h
left outer join host_set_ids hsi on hsi.host_id = h.public_id
)
select *
from final
order by update_time desc, public_id asc;
`
listHostsRefreshPageTemplate = `
with hosts as (
select public_id,
create_time,
update_time,
name,
description,
catalog_id,
address,
version
from static_host
where catalog_id = @catalog_id
and update_time > @updated_after_time
and (update_time, public_id) < (@last_item_update_time, @last_item_id)
order by update_time desc, public_id asc
limit %d
),
host_set_ids as (
select string_agg(distinct set_id, '|') as set_ids,
host_id
from static_host_set_member
where host_id in (select public_id from hosts)
group by host_id
),
final as (
select h.public_id,
h.create_time,
h.update_time,
h.name,
h.description,
h.catalog_id,
h.address,
h.version,
hsi.set_ids
from hosts h
left outer join host_set_ids hsi on hsi.host_id = h.public_id
)
select *
from final
order by update_time desc, public_id asc;
`
)
135 changes: 122 additions & 13 deletions internal/host/static/repository_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package static

import (
"context"
"database/sql"
"fmt"
"strings"
"time"

"github.com/hashicorp/boundary/globals"
"github.com/hashicorp/boundary/internal/db"
Expand Down Expand Up @@ -226,30 +228,95 @@ func (r *Repository) LookupHost(ctx context.Context, publicId string, opt ...Opt
return ha.toHost(), nil
}

// ListHosts returns a slice of Hosts for the catalogId.
// WithLimit is the only option supported.
func (r *Repository) ListHosts(ctx context.Context, catalogId string, opt ...Option) ([]*Host, error) {
const op = "static.(Repository).ListHosts"
// listHosts returns a slice of Hosts for the catalogId.
// Supported options:
// - WithLimit which overrides the limit set in the Repository object
// - WithStartPageAfterItem which sets where to start listing from
func (r *Repository) listHosts(ctx context.Context, catalogId string, opt ...Option) ([]*Host, time.Time, error) {
const op = "static.(Repository).listHosts"
if catalogId == "" {
return nil, errors.New(ctx, errors.InvalidParameter, op, "no catalog id")
return nil, time.Time{}, errors.New(ctx, errors.InvalidParameter, op, "no catalog id")
}
opts := getOpts(opt...)
limit := r.defaultLimit
if opts.withLimit != 0 {
// non-zero signals an override of the default limit for the repo.
limit = opts.withLimit
}
var aggs []*hostAgg
err := r.reader.SearchWhere(ctx, &aggs, "catalog_id = ?", []any{catalogId}, db.WithLimit(limit))
if err != nil {
return nil, errors.Wrap(ctx, err, op)
query := fmt.Sprintf(listHostsTemplate, limit)
args := []any{sql.Named("catalog_id", catalogId)}
if opts.withStartPageAfterItem != nil {
query = fmt.Sprintf(listHostsPageTemplate, limit)
args = append(args,
sql.Named("last_item_create_time", opts.withStartPageAfterItem.GetCreateTime()),
sql.Named("last_item_id", opts.withStartPageAfterItem.GetPublicId()),
)
}

return r.queryHosts(ctx, query, args)
}

// listHostsRefresh returns a slice of Hosts for the catalogId.
// Supported options:
// - WithLimit which overrides the limit set in the Repository object
// - WithStartPageAfterItem which sets where to start listing from
func (r *Repository) listHostsRefresh(ctx context.Context, catalogId string, updatedAfter time.Time, opt ...Option) ([]*Host, time.Time, error) {
const op = "static.(Repository).listHostsRefresh"
switch {
case catalogId == "":
return nil, time.Time{}, errors.New(ctx, errors.InvalidParameter, op, "no catalog id")
case updatedAfter.IsZero():
return nil, time.Time{}, errors.New(ctx, errors.InvalidParameter, op, "missing updated after time")
}
opts := getOpts(opt...)
limit := r.defaultLimit
if opts.withLimit != 0 {
// non-zero signals an override of the default limit for the repo.
limit = opts.withLimit
}
hosts := make([]*Host, 0, len(aggs))
for _, ha := range aggs {
hosts = append(hosts, ha.toHost())
query := fmt.Sprintf(listHostsRefreshTemplate, limit)
args := []any{
sql.Named("catalog_id", catalogId),
sql.Named("updated_after_time", updatedAfter),
}
if opts.withStartPageAfterItem != nil {
query = fmt.Sprintf(listHostsRefreshPageTemplate, limit)
args = append(args,
sql.Named("last_item_update_time", opts.withStartPageAfterItem.GetUpdateTime()),
sql.Named("last_item_id", opts.withStartPageAfterItem.GetPublicId()),
)
}

return hosts, nil
return r.queryHosts(ctx, query, args)
}

func (r *Repository) queryHosts(ctx context.Context, query string, args []any) ([]*Host, time.Time, error) {
const op = "static.(Repository).queryHosts"

var hosts []*Host
var transactionTimestamp time.Time
if _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, w db.Writer) error {
rows, err := r.Query(ctx, query, args)
if err != nil {
return err
}
defer rows.Close()
var foundHosts []*hostAgg
for rows.Next() {
if err := r.ScanRows(ctx, rows, &foundHosts); err != nil {
return err
}
}
hosts = make([]*Host, 0, len(foundHosts))
for _, ha := range foundHosts {
hosts = append(hosts, ha.toHost())
}
transactionTimestamp, err = r.Now(ctx)
return err
}); err != nil {
return nil, time.Time{}, errors.Wrap(ctx, err, op)
}
return hosts, transactionTimestamp, nil
}

// DeleteHost deletes the host for the provided id from the repository
Expand Down Expand Up @@ -290,3 +357,45 @@ func (r *Repository) DeleteHost(ctx context.Context, projectId string, publicId

return rowsDeleted, nil
}

// listDeletedHostIds lists the public IDs of any hosts deleted since the timestamp provided,
// and the timestamp of the transaction within which the hosts were listed.
func (r *Repository) listDeletedHostIds(ctx context.Context, since time.Time) ([]string, time.Time, error) {
const op = "static.(Repository).listDeletedHostIds"
var deleteHosts []*deletedHost
var transactionTimestamp time.Time
if _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, _ db.Writer) error {
if err := r.SearchWhere(ctx, &deleteHosts, "delete_time >= ?", []any{since}); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("failed to query deleted hosts"))
}
var err error
transactionTimestamp, err = r.Now(ctx)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("failed to get transaction timestamp"))
}
return nil
}); err != nil {
return nil, time.Time{}, err
}
var hostIds []string
for _, t := range deleteHosts {
hostIds = append(hostIds, t.PublicId)
}
return hostIds, transactionTimestamp, nil
}

// estimatedHostCount returns an estimate of the total number of static hosts.
func (r *Repository) estimatedHostCount(ctx context.Context) (int, error) {
const op = "static.(Repository).estimatedHostCount"
rows, err := r.reader.Query(ctx, estimateCountHosts, nil)
if err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query static hosts"))
}
var count int
for rows.Next() {
if err := r.reader.ScanRows(ctx, rows, &count); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query static hosts"))
}
}
return count, nil
}
Loading

0 comments on commit 4407abd

Please sign in to comment.