Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Option to Prevent Overlapping Replications of the Same Artifact in Harbor #21347

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions api/v2.0/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,7 @@ paths:
items:
$ref: '#/definitions/AuditLogEventType'
'401':
$ref: '#/responses/401'
$ref: '#/responses/401'
/projects/{project_name}/logs:
get:
summary: Get recent logs of the projects (deprecated)
Expand Down Expand Up @@ -1875,7 +1875,7 @@ paths:
'401':
$ref: '#/responses/401'
'500':
$ref: '#/responses/500'
$ref: '#/responses/500'
/p2p/preheat/providers:
get:
summary: List P2P providers
Expand Down Expand Up @@ -6960,8 +6960,8 @@ definitions:
description: The time when this operation is triggered.
AuditLogEventType:
type: object
properties:
event_type:
properties:
event_type:
type: string
description: the event type, such as create_user.
example: create_user
Expand Down Expand Up @@ -7457,6 +7457,12 @@ definitions:
type: boolean
description: Whether to enable copy by chunk.
x-isnullable: true
single_active_replication:
type: boolean
description: |-
Whether to defer execution until the previous active execution finishes,
avoiding the execution of the same replication rules multiple times in parallel.
x-isnullable: true # make this field optional to keep backward compatibility
ReplicationTrigger:
type: object
properties:
Expand Down
2 changes: 2 additions & 0 deletions make/migrations/postgresql/0160_2.13.0_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ CREATE INDEX IF NOT EXISTS idx_audit_log_ext_op_time ON audit_log_ext (op_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_optime ON audit_log_ext (project_id, op_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_resource_type ON audit_log_ext (project_id, resource_type);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_operation ON audit_log_ext (project_id, operation);

ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS single_active_replication boolean;
24 changes: 1 addition & 23 deletions src/controller/jobmonitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,14 @@ import (
"context"
"fmt"
"strings"
"time"

jobSvc "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/queuestatus"

"github.com/goharbor/harbor/src/lib/log"

"github.com/gocraft/work"

"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/lib/q"
libRedis "github.com/goharbor/harbor/src/lib/redis"
jm "github.com/goharbor/harbor/src/pkg/jobmonitor"
"github.com/goharbor/harbor/src/pkg/task"
taskDao "github.com/goharbor/harbor/src/pkg/task/dao"
Expand Down Expand Up @@ -92,29 +87,12 @@ func NewMonitorController() MonitorController {
taskManager: task.NewManager(),
queueManager: jm.NewQueueClient(),
queueStatusManager: queuestatus.Mgr,
monitorClient: jobServiceMonitorClient,
monitorClient: jm.GetJobServiceMonitorClient,
jobServiceRedisClient: jm.JobServiceRedisClient,
executionDAO: taskDao.NewExecutionDAO(),
}
}

func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) {
cfg, err := job.GlobalClient.GetJobServiceConfig()
if err != nil {
return nil, err
}
config := cfg.RedisPoolConfig
pool, err := libRedis.GetRedisPool(jm.JobServicePool, config.RedisURL, &libRedis.PoolParam{
PoolMaxIdle: 0,
PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second,
})
if err != nil {
log.Errorf("failed to get redis pool: %v", err)
return nil, err
}
return work.NewClient(fmt.Sprintf("{%s}", config.Namespace), pool), nil
}

func (w *monitorController) ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) {
mClient, err := w.monitorClient()
if err != nil {
Expand Down
50 changes: 34 additions & 16 deletions src/controller/replication/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"time"

"github.com/goharbor/harbor/src/pkg/jobmonitor"

"github.com/goharbor/harbor/src/controller/event/operator"
"github.com/goharbor/harbor/src/controller/replication/flow"
replicationmodel "github.com/goharbor/harbor/src/controller/replication/model"
Expand Down Expand Up @@ -76,26 +78,28 @@ type Controller interface {
// NewController creates a new instance of the replication controller
func NewController() Controller {
return &controller{
repMgr: replication.Mgr,
execMgr: task.ExecMgr,
taskMgr: task.Mgr,
regMgr: reg.Mgr,
scheduler: scheduler.Sched,
flowCtl: flow.NewController(),
ormCreator: orm.Crt,
wp: lib.NewWorkerPool(10),
repMgr: replication.Mgr,
execMgr: task.ExecMgr,
taskMgr: task.Mgr,
regMgr: reg.Mgr,
scheduler: scheduler.Sched,
flowCtl: flow.NewController(),
ormCreator: orm.Crt,
wp: lib.NewWorkerPool(10),
observationMgr: jobmonitor.NewObservationManagerImpl(),
}
}

type controller struct {
repMgr replication.Manager
execMgr task.ExecutionManager
taskMgr task.Manager
regMgr reg.Manager
scheduler scheduler.Scheduler
flowCtl flow.Controller
ormCreator orm.Creator
wp *lib.WorkerPool
repMgr replication.Manager
execMgr task.ExecutionManager
taskMgr task.Manager
regMgr reg.Manager
scheduler scheduler.Scheduler
flowCtl flow.Controller
ormCreator orm.Creator
wp *lib.WorkerPool
observationMgr jobmonitor.ObservationManager
}

func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy, resource *model.Resource, trigger string) (int64, error) {
Expand All @@ -109,10 +113,24 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
if op := operator.FromContext(ctx); op != "" {
extra["operator"] = op
}

id, err := c.execMgr.Create(ctx, job.ReplicationVendorType, policy.ID, trigger, extra)
if err != nil {
return 0, err
}

if policy.SingleActiveReplication {
o, err := c.observationMgr.ObservationByJobNameAndPolicyID(ctx, job.ReplicationVendorType, policy.ID)
if err != nil {
return 0, err
}
if o != nil {
if err = c.execMgr.MarkSkipped(ctx, policy.ID, "Execution deferred: active replication still in progress."); err != nil {
return 0, err
}
}
}

// start the replication flow in background
// as the process runs inside a goroutine, the transaction in the outer ctx
// may be submitted already when the process starts, so create an new context
Expand Down
55 changes: 37 additions & 18 deletions src/controller/replication/flow/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"encoding/json"

"github.com/goharbor/harbor/src/pkg/jobmonitor"

repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
Expand All @@ -27,23 +29,25 @@ import (
)

type copyFlow struct {
executionID int64
resources []*model.Resource
policy *repctlmodel.Policy
executionMgr task.ExecutionManager
taskMgr task.Manager
executionID int64
resources []*model.Resource
policy *repctlmodel.Policy
executionMgr task.ExecutionManager
taskMgr task.Manager
observationMgr jobmonitor.ObservationManager
}

// NewCopyFlow returns an instance of the copy flow which replicates the resources from
// the source registry to the destination registry. If the parameter "resources" isn't provided,
// will fetch the resources first
func NewCopyFlow(executionID int64, policy *repctlmodel.Policy, resources ...*model.Resource) Flow {
return &copyFlow{
executionMgr: task.ExecMgr,
taskMgr: task.Mgr,
executionID: executionID,
policy: policy,
resources: resources,
executionMgr: task.ExecMgr,
taskMgr: task.Mgr,
executionID: executionID,
policy: policy,
resources: resources,
observationMgr: jobmonitor.NewObservationManagerImpl(),
}
}

Expand Down Expand Up @@ -92,7 +96,7 @@ func (c *copyFlow) Run(ctx context.Context) error {
return err
}

return c.createTasks(ctx, srcResources, dstResources, c.policy.Speed, c.policy.CopyByChunk)
return c.createTasks(ctx, srcResources, dstResources, c.policy)
}

func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
Expand All @@ -103,7 +107,19 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
return execution.Status == job.StoppedStatus.String(), nil
}

func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32, copyByChunk bool) error {
func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, policy *repctlmodel.Policy) error {
if policy.SingleActiveReplication {
o, err := c.observationMgr.ObservationByJobNameAndPolicyID(ctx, job.ReplicationVendorType, policy.ID)
if err != nil {
return err
}
if o != nil {
if err = c.executionMgr.MarkSkipped(ctx, policy.ID, "Execution deferred: active replication still in progress."); err != nil {
return err
}
}
}

var taskCnt int
defer func() {
// if no task be created, mark execution done.
Expand Down Expand Up @@ -137,19 +153,22 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
JobKind: job.KindGeneric,
},
Parameters: map[string]interface{}{
"src_resource": string(src),
"dst_resource": string(dest),
"speed": speed,
"copy_by_chunk": copyByChunk,
"src_resource": string(src),
"dst_resource": string(dest),
"speed": policy.Speed,
"copy_by_chunk": policy.CopyByChunk,
"single_active_replication": policy.SingleActiveReplication,
"policy_id": policy.ID,
},
}

if _, err = c.taskMgr.Create(ctx, c.executionID, job, map[string]interface{}{
"operation": "copy",
"resource_type": string(srcResource.Type),
"resource_type": srcResource.Type,
"source_resource": getResourceName(srcResource),
"destination_resource": getResourceName(dstResource),
"references": getResourceReferences(dstResource)}); err != nil {
"references": getResourceReferences(dstResource),
}); err != nil {
return err
}

Expand Down
3 changes: 3 additions & 0 deletions src/controller/replication/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Policy struct {
UpdateTime time.Time `json:"update_time"`
Speed int32 `json:"speed"`
CopyByChunk bool `json:"copy_by_chunk"`
SingleActiveReplication bool `json:"single_active_replication"`
}

// IsScheduledTrigger returns true when the policy is scheduled trigger and enabled
Expand Down Expand Up @@ -141,6 +142,7 @@ func (p *Policy) From(policy *replicationmodel.Policy) error {
p.UpdateTime = policy.UpdateTime
p.Speed = policy.Speed
p.CopyByChunk = policy.CopyByChunk
p.SingleActiveReplication = policy.SingleActiveReplication

if policy.SrcRegistryID > 0 {
p.SrcRegistry = &model.Registry{
Expand Down Expand Up @@ -186,6 +188,7 @@ func (p *Policy) To() (*replicationmodel.Policy, error) {
UpdateTime: p.UpdateTime,
Speed: p.Speed,
CopyByChunk: p.CopyByChunk,
SingleActiveReplication: p.SingleActiveReplication,
}
if p.SrcRegistry != nil {
policy.SrcRegistryID = p.SrcRegistry.ID
Expand Down
4 changes: 4 additions & 0 deletions src/jobservice/job/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
SuccessStatus Status = "Success"
// ScheduledStatus : job status scheduled
ScheduledStatus Status = "Scheduled"
// SkippedStatus : job status skipped
SkippedStatus Status = "Skipped"
)

// Status of job
Expand Down Expand Up @@ -62,6 +64,8 @@ func (s Status) Code() int {
return 3
case "Success":
return 3
case "Skipped":
return 3
default:
}

Expand Down
9 changes: 7 additions & 2 deletions src/jobservice/worker/cworker/c_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ type basicWorker struct {

// workerContext ...
// We did not use this context to pass context info so far, just a placeholder.
type workerContext struct{}
type workerContext struct {
client *work.Client
}

// log the job
func (rpc *workerContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error {
Expand Down Expand Up @@ -146,9 +148,12 @@ func (w *basicWorker) Start() error {
w.pool.Stop()
}()

workCtx := workerContext{
client: w.client,
}
// Start the backend worker pool
// Add middleware
w.pool.Middleware((*workerContext).logJob)
w.pool.Middleware(workCtx.logJob)
// Non blocking call
w.pool.Start()
logger.Infof("Basic worker is started")
Expand Down
Loading
Loading