Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Refine StageLogPersister handling in SDK #5588

Merged
merged 2 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pkg/app/pipedv1/plugin/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

"go.uber.org/zap"

"github.com/pipe-cd/pipecd/pkg/app/piped/logpersister"
"github.com/pipe-cd/pipecd/pkg/plugin/sdk"
)

Expand All @@ -35,7 +34,7 @@
func (p *plugin) executeWait(ctx context.Context, in *sdk.ExecuteStageInput) sdk.StageStatus {
opts, err := decode(in.Request.StageConfig)
if err != nil {
in.Client.LogPersister.Errorf("failed to decode the stage config: %v", err)
in.Client.LogPersister().Errorf("failed to decode the stage config: %v", err)

Check warning on line 37 in pkg/app/pipedv1/plugin/wait/wait.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/wait.go#L37

Added line #L37 was not covered by tests
return sdk.StageStatusFailure
}

Expand All @@ -49,10 +48,10 @@
}
p.saveStartTime(ctx, in.Client, initialStart, in.Logger)

return wait(ctx, duration, initialStart, in.Client.LogPersister)
return wait(ctx, duration, initialStart, in.Client.LogPersister())

Check warning on line 51 in pkg/app/pipedv1/plugin/wait/wait.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/wait.go#L51

Added line #L51 was not covered by tests
}

func wait(ctx context.Context, duration time.Duration, initialStart time.Time, slp logpersister.StageLogPersister) sdk.StageStatus {
func wait(ctx context.Context, duration time.Duration, initialStart time.Time, slp sdk.StageLogPersister) sdk.StageStatus {
remaining := duration - time.Since(initialStart)
if remaining <= 0 {
// When this stage restarted and the duration has already passed.
Expand Down
27 changes: 24 additions & 3 deletions pkg/plugin/sdk/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import (
"context"

"github.com/pipe-cd/pipecd/pkg/plugin/logpersister"
"github.com/pipe-cd/pipecd/pkg/plugin/pipedapi"
"github.com/pipe-cd/pipecd/pkg/plugin/pipedservice"
)
Expand All @@ -40,8 +39,21 @@
// This field exists only when the client is working with a specific stage; for example, when this client is passed as the ExecuteStage method's argument.
stageID string

// TODO: Define another interface (e.g. Remove Complete() method)
LogPersister logpersister.StageLogPersister
// logPersister is used to persist the stage logs.
// This field exists only when the client is working with a specific stage; for example, when this client is passed as the ExecuteStage method's argument.
logPersister StageLogPersister
}

// StageLogPersister is a interface for persisting the stage logs.
// Use this to persist the stage logs and make it viewable on the UI.
type StageLogPersister interface {
Write(log []byte) (int, error)
Info(log string)
Infof(format string, a ...interface{})
Success(log string)
Successf(format string, a ...interface{})
Error(log string)
Errorf(format string, a ...interface{})
}

// GetStageMetadata gets the metadata of the current stage.
Expand Down Expand Up @@ -118,3 +130,12 @@
})
return resp.Value, err
}

// LogPersister returns the stage log persister.
// Use this to persist the stage logs and make it viewable on the UI.
// This method should be called only when the client is working with a specific stage, for example, when this client is passed as the ExecuteStage method's argument.
// Otherwise, it will return nil.
// TODO: we should consider returning an error instead of nil, or return logger which prints to stdout.
func (c *Client) LogPersister() StageLogPersister {
return c.logPersister

Check warning on line 140 in pkg/plugin/sdk/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/plugin/sdk/client.go#L139-L140

Added lines #L139 - L140 were not covered by tests
}
30 changes: 26 additions & 4 deletions pkg/plugin/sdk/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
"github.com/pipe-cd/pipecd/pkg/plugin/logpersister"
"github.com/pipe-cd/pipecd/pkg/plugin/pipedapi"
"github.com/pipe-cd/pipecd/pkg/plugin/signalhandler"
)

var (
Expand Down Expand Up @@ -170,14 +171,24 @@
func (s *DeploymentPluginServiceServer[Config, DeployTargetConfig]) BuildQuickSyncStages(context.Context, *deployment.BuildQuickSyncStagesRequest) (*deployment.BuildQuickSyncStagesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method BuildQuickSyncStages not implemented")
}
func (s *DeploymentPluginServiceServer[Config, DeployTargetConfig]) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (*deployment.ExecuteStageResponse, error) {
func (s *DeploymentPluginServiceServer[Config, DeployTargetConfig]) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (response *deployment.ExecuteStageResponse, _ error) {
lp := s.logPersister.StageLogPersister(request.GetInput().GetDeployment().GetId(), request.GetInput().GetStage().GetId())
defer func() {
// When termination signal received and the stage is not completed yet, we should not mark the log persister as completed.
// This can occur when the piped is shutting down while the stage is still running.
if !response.GetStatus().IsCompleted() && signalhandler.Terminated() {
return
}
lp.Complete(time.Minute)

Check warning on line 182 in pkg/plugin/sdk/deployment.go

View check run for this annotation

Codecov / codecov/patch

pkg/plugin/sdk/deployment.go#L174-L182

Added lines #L174 - L182 were not covered by tests
}()

client := &Client{
base: s.client,
pluginName: s.Name(),
applicationID: request.GetInput().GetDeployment().GetApplicationId(),
deploymentID: request.GetInput().GetDeployment().GetId(),
stageID: request.GetInput().GetStage().GetId(),
LogPersister: s.logPersister.StageLogPersister(request.GetInput().GetDeployment().GetId(), request.GetInput().GetStage().GetId()),
logPersister: lp,

Check warning on line 191 in pkg/plugin/sdk/deployment.go

View check run for this annotation

Codecov / codecov/patch

pkg/plugin/sdk/deployment.go#L191

Added line #L191 was not covered by tests
}
return executeStage(ctx, s.base, &s.config, nil, client, request, s.logger) // TODO: pass the deployTargets
}
Expand Down Expand Up @@ -240,15 +251,26 @@
func (s *PipelineSyncPluginServiceServer[Config, DeployTargetConfig]) BuildQuickSyncStages(context.Context, *deployment.BuildQuickSyncStagesRequest) (*deployment.BuildQuickSyncStagesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method BuildQuickSyncStages not implemented")
}
func (s *PipelineSyncPluginServiceServer[Config, DeployTargetConfig]) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (*deployment.ExecuteStageResponse, error) {
func (s *PipelineSyncPluginServiceServer[Config, DeployTargetConfig]) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (response *deployment.ExecuteStageResponse, _ error) {
lp := s.logPersister.StageLogPersister(request.GetInput().GetDeployment().GetId(), request.GetInput().GetStage().GetId())
defer func() {
// When termination signal received and the stage is not completed yet, we should not mark the log persister as completed.
// This can occur when the piped is shutting down while the stage is still running.
if !response.GetStatus().IsCompleted() && signalhandler.Terminated() {
return
}
lp.Complete(time.Minute)

Check warning on line 262 in pkg/plugin/sdk/deployment.go

View check run for this annotation

Codecov / codecov/patch

pkg/plugin/sdk/deployment.go#L254-L262

Added lines #L254 - L262 were not covered by tests
}()

client := &Client{
base: s.client,
pluginName: s.Name(),
applicationID: request.GetInput().GetDeployment().GetApplicationId(),
deploymentID: request.GetInput().GetDeployment().GetId(),
stageID: request.GetInput().GetStage().GetId(),
LogPersister: s.logPersister.StageLogPersister(request.GetInput().GetDeployment().GetId(), request.GetInput().GetStage().GetId()),
logPersister: lp,

Check warning on line 271 in pkg/plugin/sdk/deployment.go

View check run for this annotation

Codecov / codecov/patch

pkg/plugin/sdk/deployment.go#L271

Added line #L271 was not covered by tests
}

Check warning on line 273 in pkg/plugin/sdk/deployment.go

View check run for this annotation

Codecov / codecov/patch

pkg/plugin/sdk/deployment.go#L273

Added line #L273 was not covered by tests
return executeStage(ctx, s.base, &s.config, nil, client, request, s.logger) // TODO: pass the deployTargets
}

Expand Down