diff --git a/src/controller/jobmonitor/monitor.go b/src/controller/jobmonitor/monitor.go index ed2b004f62b..3149634bbc6 100644 --- a/src/controller/jobmonitor/monitor.go +++ b/src/controller/jobmonitor/monitor.go @@ -17,7 +17,11 @@ package jobmonitor import ( "context" "fmt" + "github.com/gocraft/work" + "github.com/goharbor/harbor/src/common/job" + libRedis "github.com/goharbor/harbor/src/lib/redis" "strings" + "time" jobSvc "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/orm" @@ -87,12 +91,29 @@ func NewMonitorController() MonitorController { taskManager: task.NewManager(), queueManager: jm.NewQueueClient(), queueStatusManager: queuestatus.Mgr, - monitorClient: jm.GetJobServiceMonitorClient, + monitorClient: jobServiceMonitorClient, jobServiceRedisClient: jm.JobServiceRedisClient, executionDAO: taskDao.NewExecutionDAO(), } } +func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) { + cfg, err := job.GlobalClient.GetJobServiceConfig() + if err != nil { + return nil, err + } + config := cfg.RedisPoolConfig + pool, err := libRedis.GetRedisPool(jm.JobServicePool, config.RedisURL, &libRedis.PoolParam{ + PoolMaxIdle: 0, + PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second, + }) + if err != nil { + log.Errorf("failed to get redis pool: %v", err) + return nil, err + } + return work.NewClient(fmt.Sprintf("{%s}", config.Namespace), pool), nil +} + func (w *monitorController) ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) { mClient, err := w.monitorClient() if err != nil { diff --git a/src/controller/replication/execution.go b/src/controller/replication/execution.go index 9b090b5de7b..6e841639a73 100644 --- a/src/controller/replication/execution.go +++ b/src/controller/replication/execution.go @@ -19,8 +19,6 @@ import ( "fmt" "time" - "github.com/goharbor/harbor/src/pkg/jobmonitor" - "github.com/goharbor/harbor/src/controller/event/operator" "github.com/goharbor/harbor/src/controller/replication/flow" replicationmodel "github.com/goharbor/harbor/src/controller/replication/model" @@ -78,28 +76,26 @@ type Controller interface { // NewController creates a new instance of the replication controller func NewController() Controller { return &controller{ - repMgr: replication.Mgr, - execMgr: task.ExecMgr, - taskMgr: task.Mgr, - regMgr: reg.Mgr, - scheduler: scheduler.Sched, - flowCtl: flow.NewController(), - ormCreator: orm.Crt, - wp: lib.NewWorkerPool(10), - observationMgr: jobmonitor.NewObservationManagerImpl(), + repMgr: replication.Mgr, + execMgr: task.ExecMgr, + taskMgr: task.Mgr, + regMgr: reg.Mgr, + scheduler: scheduler.Sched, + flowCtl: flow.NewController(), + ormCreator: orm.Crt, + wp: lib.NewWorkerPool(10), } } type controller struct { - repMgr replication.Manager - execMgr task.ExecutionManager - taskMgr task.Manager - regMgr reg.Manager - scheduler scheduler.Scheduler - flowCtl flow.Controller - ormCreator orm.Creator - wp *lib.WorkerPool - observationMgr jobmonitor.ObservationManager + repMgr replication.Manager + execMgr task.ExecutionManager + taskMgr task.Manager + regMgr reg.Manager + scheduler scheduler.Scheduler + flowCtl flow.Controller + ormCreator orm.Creator + wp *lib.WorkerPool } func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy, resource *model.Resource, trigger string) (int64, error) { @@ -118,16 +114,23 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy, if err != nil { return 0, err } + if policy.SingleActiveReplication { - o, err := c.observationMgr.ObservationByJobNameAndPolicyID(ctx, job.ReplicationVendorType, policy.ID) + count, err := c.execMgr.Count(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": job.ReplicationVendorType, + "VendorID": policy.ID, + "Status": job.RunningStatus.String(), + }, + }) if err != nil { return 0, err } - if o != nil { + + if count > 1 { if err = c.execMgr.MarkSkipped(ctx, id, "Execution skipped: active replication still in progress."); err != nil { return 0, err } - return id, nil } } diff --git a/src/controller/replication/flow/copy.go b/src/controller/replication/flow/copy.go index a44a2ad6982..5c845663bc9 100644 --- a/src/controller/replication/flow/copy.go +++ b/src/controller/replication/flow/copy.go @@ -17,8 +17,6 @@ package flow import ( "context" "encoding/json" - "github.com/goharbor/harbor/src/pkg/jobmonitor" - repctlmodel "github.com/goharbor/harbor/src/controller/replication/model" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/logger" @@ -28,12 +26,11 @@ import ( ) type copyFlow struct { - executionID int64 - resources []*model.Resource - policy *repctlmodel.Policy - executionMgr task.ExecutionManager - taskMgr task.Manager - observationMgr jobmonitor.ObservationManager + executionID int64 + resources []*model.Resource + policy *repctlmodel.Policy + executionMgr task.ExecutionManager + taskMgr task.Manager } // NewCopyFlow returns an instance of the copy flow which replicates the resources from @@ -41,12 +38,11 @@ type copyFlow struct { // will fetch the resources first func NewCopyFlow(executionID int64, policy *repctlmodel.Policy, resources ...*model.Resource) Flow { return ©Flow{ - executionMgr: task.ExecMgr, - taskMgr: task.Mgr, - executionID: executionID, - policy: policy, - resources: resources, - observationMgr: jobmonitor.NewObservationManagerImpl(), + executionMgr: task.ExecMgr, + taskMgr: task.Mgr, + executionID: executionID, + policy: policy, + resources: resources, } } @@ -144,7 +140,6 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [ "dst_resource": string(dest), "speed": c.policy.Speed, "copy_by_chunk": c.policy.CopyByChunk, - "policy_id": c.policy.ID, }, } diff --git a/src/jobservice/common/rds/scripts.go b/src/jobservice/common/rds/scripts.go index ca43ec489f7..fd176312988 100644 --- a/src/jobservice/common/rds/scripts.go +++ b/src/jobservice/common/rds/scripts.go @@ -42,7 +42,7 @@ local function compare(status, revision) local aCode = stCode(ARGV[1]) local aRev = tonumber(ARGV[2]) or 0 local aCheckInT = tonumber(ARGV[3]) or 0 - if revision < aRev or + if revision < aRev or ( revision == aRev and sCode <= aCode ) or ( revision == aRev and aCheckInT ~= 0 ) then @@ -96,7 +96,7 @@ if res then redis.call('persist', KEYS[1]) end end - + return 'ok' end end diff --git a/src/pkg/jobmonitor/observations.go b/src/pkg/jobmonitor/observations.go deleted file mode 100644 index f52c5190c0c..00000000000 --- a/src/pkg/jobmonitor/observations.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package jobmonitor - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/gocraft/work" - - "github.com/goharbor/harbor/src/common/job" - "github.com/goharbor/harbor/src/lib/errors" - "github.com/goharbor/harbor/src/lib/log" - libRedis "github.com/goharbor/harbor/src/lib/redis" -) - -type ObservationManager interface { - // ObservationByJobNameAndPolicyID scans and filters active observations - ObservationByJobNameAndPolicyID(ctx context.Context, jobName string, policyID int64) (observation *work.WorkerObservation, err error) -} - -type ObservationManagerImpl struct { -} - -func NewObservationManagerImpl() *ObservationManagerImpl { - return &ObservationManagerImpl{} -} - -func (m *ObservationManagerImpl) ObservationByJobNameAndPolicyID(_ context.Context, jobName string, policyID int64) (observation *work.WorkerObservation, err error) { - monitorClient, err := GetJobServiceMonitorClient() - if err != nil { - return nil, errors.New(nil).WithCode(errors.PreconditionCode).WithMessagef("unable to get job monitor's client: %v", err) - } - observations, err := monitorClient.WorkerObservations() - if err != nil { - return nil, errors.New(nil).WithCode(errors.PreconditionCode).WithMessagef("unable to get jobs observations: %v", err) - } - for _, o := range observations { - if observationMatch(o, jobName, policyID) { - return o, nil - } - } - return nil, nil -} - -func observationMatch(o *work.WorkerObservation, jobName string, policyID int64) bool { - if o.JobName != jobName { - return false - } - args := map[string]interface{}{} - if err := json.Unmarshal([]byte(o.ArgsJSON), &args); err != nil { - return false - } - policyIDFromArgs, ok := args["policy_id"].(float64) - return ok && int64(policyIDFromArgs) == policyID -} - -func GetJobServiceMonitorClient() (JobServiceMonitorClient, error) { - cfg, err := job.GlobalClient.GetJobServiceConfig() - if err != nil { - return nil, err - } - config := cfg.RedisPoolConfig - pool, err := libRedis.GetRedisPool(JobServicePool, config.RedisURL, &libRedis.PoolParam{ - PoolMaxIdle: 0, - PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second, - }) - if err != nil { - log.Errorf("failed to get redis pool: %v", err) - return nil, err - } - return work.NewClient(fmt.Sprintf("{%s}", config.Namespace), pool), nil -}