Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDKQE-3419: Support load-sample buckets for cloud columnar and clusters #66

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 33 additions & 28 deletions deployment/clouddeploy/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,12 @@ func (p *Deployer) listClusters(ctx context.Context) ([]*clusterInfo, error) {
continue
}

clusters := getColumnarsForProject(project.Data.ID)
columnars := getColumnarsForProject(project.Data.ID)

if len(clusters) == 0 {
out = append(out, &clusterInfo{
Meta: meta,
Project: project.Data,
Cluster: nil,
IsCorrupted: false,
})
if len(columnars) == 0 {
// Operational cluster
continue
} else if len(clusters) > 1 {
} else if len(columnars) > 1 {
out = append(out, &clusterInfo{
Meta: meta,
Project: project.Data,
Expand All @@ -197,12 +192,12 @@ func (p *Deployer) listClusters(ctx context.Context) ([]*clusterInfo, error) {
continue
}

cluster := clusters[0]
columnar := columnars[0]

out = append(out, &clusterInfo{
Meta: meta,
Project: project.Data,
Columnar: cluster,
Columnar: columnar,
IsCorrupted: false,
})
}
Expand Down Expand Up @@ -674,7 +669,7 @@ func (p *Deployer) deployNewCluster(ctx context.Context, def *clusterdef.Cluster

p.logger.Debug("waiting for cluster creation to complete")

err = p.mgr.WaitForClusterState(ctx, p.tenantID, cloudClusterID, "healthy")
err = p.mgr.WaitForClusterState(ctx, p.tenantID, cloudClusterID, "healthy", false)
if err != nil {
return nil, errors.Wrap(err, "failed to wait for cluster deployment")
}
Expand Down Expand Up @@ -822,7 +817,7 @@ func (p *Deployer) createNewCluster(ctx context.Context, def *clusterdef.Cluster

p.logger.Debug("waiting for cluster creation to complete")

err = p.mgr.WaitForClusterState(ctx, p.tenantID, cloudClusterID, "healthy")
err = p.mgr.WaitForClusterState(ctx, p.tenantID, cloudClusterID, "healthy", false)
if err != nil {
return nil, errors.Wrap(err, "failed to wait for cluster deployment")
}
Expand Down Expand Up @@ -891,7 +886,7 @@ func (p *Deployer) createNewCluster(ctx context.Context, def *clusterdef.Cluster

p.logger.Debug("waiting for columnar creation to complete")

err = p.mgr.WaitForClusterState(ctx, p.tenantID, cloudClusterID, "healthy")
err = p.mgr.WaitForClusterState(ctx, p.tenantID, cloudClusterID, "healthy", true)
if err != nil {
return nil, errors.Wrap(err, "failed to wait for columnar deployment")
}
Expand Down Expand Up @@ -979,7 +974,7 @@ func (d *Deployer) ModifyCluster(ctx context.Context, clusterID string, def *clu
}

if clusterInfo.Columnar != nil {
d.logger.Warn("can only modify the node count for a columnar cluster")
d.logger.Debug("can/will only modify the node count for a columnar cluster")

newSpec := &capellacontrol.UpdateColumnarInstanceRequest{
Name: clusterInfo.Columnar.Name,
Expand All @@ -993,14 +988,14 @@ func (d *Deployer) ModifyCluster(ctx context.Context, clusterID string, def *clu

d.logger.Debug("waiting for columnar modification to begin")

err = d.mgr.WaitForClusterState(ctx, d.tenantID, clusterInfo.Columnar.ID, "scaling")
err = d.mgr.WaitForClusterState(ctx, d.tenantID, clusterInfo.Columnar.ID, "scaling", true)
if err != nil {
return errors.Wrap(err, "failed to wait for columnar modification to begin")
}

d.logger.Debug("waiting for columnar to be healthy")

err = d.mgr.WaitForClusterState(ctx, d.tenantID, clusterInfo.Columnar.ID, "healthy")
err = d.mgr.WaitForClusterState(ctx, d.tenantID, clusterInfo.Columnar.ID, "healthy", true)
if err != nil {
return errors.Wrap(err, "failed to wait for columnar to be healthy")
}
Expand Down Expand Up @@ -1037,14 +1032,14 @@ func (d *Deployer) ModifyCluster(ctx context.Context, clusterID string, def *clu

d.logger.Debug("waiting for cluster modification to begin")

err = d.mgr.WaitForClusterState(ctx, d.tenantID, cloudClusterID, "scaling")
err = d.mgr.WaitForClusterState(ctx, d.tenantID, cloudClusterID, "scaling", false)
if err != nil {
return errors.Wrap(err, "failed to wait for cluster modification to begin")
}

d.logger.Debug("waiting for cluster to be healthy")

err = d.mgr.WaitForClusterState(ctx, d.tenantID, cloudClusterID, "healthy")
err = d.mgr.WaitForClusterState(ctx, d.tenantID, cloudClusterID, "healthy", false)
if err != nil {
return errors.Wrap(err, "failed to wait for cluster to be healthy")
}
Expand Down Expand Up @@ -1085,12 +1080,12 @@ func (d *Deployer) ModifyCluster(ctx context.Context, clusterID string, def *clu
return errors.Wrap(err, "failed to update server version")
}
//time.Sleep(30 * time.Second)
err = d.mgr.WaitForClusterState(ctx, d.tenantID, cloudClusterID, "upgrading")
err = d.mgr.WaitForClusterState(ctx, d.tenantID, cloudClusterID, "upgrading", false)
if err != nil {
return errors.Wrap(err, "failed to wait for cluster upgrade to begin")
}

err = d.mgr.WaitForClusterState(ctx, d.tenantID, cloudClusterID, "healthy")
err = d.mgr.WaitForClusterState(ctx, d.tenantID, cloudClusterID, "healthy", false)
if err != nil {
return errors.Wrap(err, "failed to wait for cluster returns to healthy")
}
Expand Down Expand Up @@ -1118,7 +1113,7 @@ func (p *Deployer) removeCluster(ctx context.Context, clusterInfo *clusterInfo)

p.logger.Debug("waiting for cluster deletion to finish")

err = p.mgr.WaitForClusterState(ctx, p.tenantID, clusterInfo.Cluster.Id, "")
err = p.mgr.WaitForClusterState(ctx, p.tenantID, clusterInfo.Cluster.Id, "", false)
if err != nil {
return errors.Wrap(err, "failed to wait for cluster destruction")
}
Expand All @@ -1130,7 +1125,7 @@ func (p *Deployer) removeCluster(ctx context.Context, clusterInfo *clusterInfo)

p.logger.Debug("waiting for cluster deletion to finish")

err = p.mgr.WaitForClusterState(ctx, p.tenantID, clusterInfo.Columnar.ID, "")
err = p.mgr.WaitForClusterState(ctx, p.tenantID, clusterInfo.Columnar.ID, "", true)
if err != nil {
return errors.Wrap(err, "failed to wait for cluster destruction")
}
Expand Down Expand Up @@ -1416,7 +1411,7 @@ func (p *Deployer) RemoveAll(ctx context.Context) error {
for _, cluster := range clustersToRemove {
p.logger.Info("waiting for cluster removal to complete", zap.String("cluster-id", cluster.Id))

err := p.mgr.WaitForClusterState(ctx, p.tenantID, cluster.Id, "")
err := p.mgr.WaitForClusterState(ctx, p.tenantID, cluster.Id, "", false)
if err != nil {
return errors.Wrap(err, "failed to wait for cluster removal to finish")
}
Expand Down Expand Up @@ -1459,7 +1454,7 @@ func (p *Deployer) RemoveAll(ctx context.Context) error {
for _, columnar := range columnarsToRemove {
p.logger.Info("waiting for cluster columnar to complete", zap.String("cluster-id", columnar.ID))

err := p.mgr.WaitForClusterState(ctx, p.tenantID, columnar.ID, "")
err := p.mgr.WaitForClusterState(ctx, p.tenantID, columnar.ID, "", true)
if err != nil {
return errors.Wrap(err, "failed to wait for cluster removal to finish")
}
Expand Down Expand Up @@ -1797,7 +1792,17 @@ func (p *Deployer) DeleteBucket(ctx context.Context, clusterID string, bucketNam
}

func (d *Deployer) LoadSampleBucket(ctx context.Context, clusterID string, bucketName string) error {
return errors.New("clouddeploy does not support loading sample buckets")
clusterInfo, err := d.getCluster(ctx, clusterID)
if err != nil {
return err
}

if clusterInfo.Columnar == nil {
req := &capellacontrol.LoadSampleBucketRequest{Name: bucketName}
return d.mgr.Client.LoadClusterSampleBucket(ctx, clusterInfo.Cluster.TenantId, clusterInfo.Cluster.Project.Id, clusterInfo.Cluster.Id, req)
}
req := &capellacontrol.LoadColumnarSampleBucketRequest{SampleName: bucketName}
return d.mgr.Client.LoadColumnarSampleBucket(ctx, clusterInfo.Columnar.TenantID, clusterInfo.Columnar.ProjectID, clusterInfo.Columnar.ID, req)
}

func (p *Deployer) GetCertificate(ctx context.Context, clusterID string) (string, error) {
Expand Down Expand Up @@ -1904,14 +1909,14 @@ func (d *Deployer) RedeployCluster(ctx context.Context, clusterID string) error

d.logger.Debug("waiting for redeploy cluster to begin")

err = d.mgr.WaitForClusterState(ctx, d.tenantID, cluster.Cluster.Id, "rebalancing")
err = d.mgr.WaitForClusterState(ctx, d.tenantID, cluster.Cluster.Id, "rebalancing", false)
if err != nil {
return errors.Wrap(err, "failed to wait for cluster modification to begin")
}

d.logger.Debug("waiting for cluster to be healthy")

err = d.mgr.WaitForClusterState(ctx, d.tenantID, cluster.Cluster.Id, "healthy")
err = d.mgr.WaitForClusterState(ctx, d.tenantID, cluster.Cluster.Id, "healthy", false)
if err != nil {
return errors.Wrap(err, "failed to wait for cluster to be healthy")
}
Expand Down
38 changes: 38 additions & 0 deletions utils/capellacontrol/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,3 +1600,41 @@ func (c *Controller) RedeployCluster(

return err
}

type LoadColumnarSampleBucketRequest struct {
SampleName string `url:"sampleName"`
}

func (c *Controller) LoadColumnarSampleBucket(
ctx context.Context,
tenantID, projectID, clusterID string,
req *LoadColumnarSampleBucketRequest,
) error {

form, _ := query.Values(req)
path := fmt.Sprintf("/v2/organizations/%s/projects/%s/instance/%s/proxy/api/v1/samples?%s", tenantID, projectID, clusterID, form.Encode())
err := c.doBasicReq(ctx, false, "POST", path, req, nil)
if err != nil {
return err
}

return err
}

type LoadSampleBucketRequest struct {
Name string `json:"name"`
}

func (c *Controller) LoadClusterSampleBucket(
ctx context.Context,
tenantID, projectID, clusterID string,
req *LoadSampleBucketRequest,
) error {
path := fmt.Sprintf("/v2/organizations/%s/projects/%s/clusters/%s/buckets/samples", tenantID, projectID, clusterID)
err := c.doBasicReq(ctx, false, "POST", path, req, nil)
if err != nil {
return err
}

return err
}
79 changes: 29 additions & 50 deletions utils/capellacontrol/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (m *Manager) WaitForClusterState(
ctx context.Context,
tenantID, clusterID string,
desiredState string,
columnar bool,
) error {
MISSING_STATE := "*MISSING*"

Expand All @@ -27,59 +28,38 @@ func (m *Manager) WaitForClusterState(
}

for {
clusters, err := m.Client.ListAllClusters(ctx, tenantID, &PaginatedRequest{
Page: 1,
PerPage: 100,
SortBy: "name",
SortDirection: "asc",
})
if err != nil {
return errors.Wrap(err, "failed to list clusters")
}

clusterStatus := ""
for _, cluster := range clusters.Data {
if cluster.Data.Id == clusterID {
clusterStatus = cluster.Data.Status.State
if !columnar {
clusters, err := m.Client.ListAllClusters(ctx, tenantID, &PaginatedRequest{
Page: 1,
PerPage: 100,
SortBy: "name",
SortDirection: "asc",
})
if err != nil {
return errors.Wrap(err, "failed to list clusters")
}
}

if clusterStatus == "" {
clusterStatus = MISSING_STATE
}

if clusterStatus == MISSING_STATE && desiredState != MISSING_STATE {
break
//return fmt.Errorf("cluster disappeared during wait for '%s' state", desiredState)
}

m.Logger.Info("waiting for cluster status...",
zap.String("current", clusterStatus),
zap.String("desired", desiredState))

if clusterStatus != desiredState {
time.Sleep(10 * time.Second)
continue
}

break
}

for {
clusters, err := m.Client.ListAllColumnars(ctx, tenantID, &PaginatedRequest{
Page: 1,
PerPage: 100,
SortBy: "name",
SortDirection: "asc",
})
if err != nil {
return errors.Wrap(err, "failed to list clusters")
}
for _, cluster := range clusters.Data {
if cluster.Data.Id == clusterID {
clusterStatus = cluster.Data.Status.State
}
}
} else {
columnars, err := m.Client.ListAllColumnars(ctx, tenantID, &PaginatedRequest{
Page: 1,
PerPage: 100,
SortBy: "name",
SortDirection: "asc",
})
if err != nil {
return errors.Wrap(err, "failed to list columnars")
}

clusterStatus := ""
for _, cluster := range clusters.Data {
if cluster.Data.ID == clusterID {
clusterStatus = cluster.Data.State
for _, columnar := range columnars.Data {
if columnar.Data.ID == clusterID {
clusterStatus = columnar.Data.State
}
}
}

Expand All @@ -102,7 +82,6 @@ func (m *Manager) WaitForClusterState(

break
}

return nil
}

Expand Down
Loading