diff --git a/backend/helpers/pluginhelper/api/api_collector_stateful.go b/backend/helpers/pluginhelper/api/api_collector_stateful.go index f4e2c67feec..985392f7b90 100644 --- a/backend/helpers/pluginhelper/api/api_collector_stateful.go +++ b/backend/helpers/pluginhelper/api/api_collector_stateful.go @@ -139,10 +139,19 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg createdAfter := manager.CollectorStateManager.GetSince() isIncremental := manager.CollectorStateManager.IsIncremental() + var inputIterator Iterator + if args.CollectNewRecordsByList.BuildInputIterator != nil { + inputIterator, err = args.CollectNewRecordsByList.BuildInputIterator(isIncremental, createdAfter) + if err != nil { + return nil, err + } + } + // step 1: create a collector to collect newly added records err = manager.InitCollector(ApiCollectorArgs{ ApiClient: args.ApiClient, // common + Input: inputIterator, Incremental: isIncremental, UrlTemplate: args.CollectNewRecordsByList.UrlTemplate, Query: func(reqData *RequestData) (url.Values, errors.Error) { @@ -169,21 +178,41 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg // time filter or diff sync if createdAfter != nil && args.CollectNewRecordsByList.GetCreated != nil { - // if the first record of the page was created before createdAfter, return emtpy set and stop + // if the first record of the page was created before createdAfter and not a zero value, return empty set and stop firstCreated, err := args.CollectNewRecordsByList.GetCreated(items[0]) if err != nil { return nil, err } - if firstCreated.Before(*createdAfter) { + if firstCreated.Before(*createdAfter) && !firstCreated.IsZero() { return nil, ErrFinishCollect } - // if the last record was created before createdAfter, return records and stop + + // If last record was created before CreatedAfter, including a zero value, check each record individually lastCreated, err := args.CollectNewRecordsByList.GetCreated(items[len(items)-1]) if err != nil { return nil, err } if lastCreated.Before(*createdAfter) { - return items, ErrFinishCollect + var validItems []json.RawMessage + // Only collect items that were created after the last successful collection to prevent duplicates + for _, item := range items { + itemCreatedAt, err := args.CollectNewRecordsByList.GetCreated(item) + if err != nil { + return nil, err + } + + if itemCreatedAt.IsZero() { + // If zero then timestamp is null on the response - accept as valid for downstream processing + validItems = append(validItems, item) + continue + } + + if itemCreatedAt.Before(*createdAfter) { + // Once we reach an item that was created before the last successful collection, stop & return + return validItems, ErrFinishCollect + } + validItems = append(validItems, item) + } } } return items, err @@ -267,6 +296,7 @@ type FinalizableApiCollectorListArgs struct { Concurrency int // required for Undetermined Strategy, number of concurrent requests GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) // required for Sequential Strategy, to extract the next page cursor from the given response GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error) // required for Determined Strategy, to extract the total number of pages from the given response + BuildInputIterator func(isIncremental bool, createdAfter *time.Time) (Iterator, errors.Error) } // FinalizableApiCollectorDetailArgs is the arguments for the detail collector diff --git a/backend/plugins/circleci/tasks/job_collector.go b/backend/plugins/circleci/tasks/job_collector.go index 60fae1b9bc3..fd1d78286bf 100644 --- a/backend/plugins/circleci/tasks/job_collector.go +++ b/backend/plugins/circleci/tasks/job_collector.go @@ -18,7 +18,9 @@ limitations under the License. package tasks import ( + "encoding/json" "reflect" + "time" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" @@ -44,31 +46,68 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { logger := taskCtx.GetLogger() logger.Info("collect jobs") - clauses := []dal.Clause{ - dal.Select("id, pipeline_id"), - dal.From(&models.CircleciWorkflow{}), - dal.Where("_tool_circleci_workflows.connection_id = ? and _tool_circleci_workflows.project_slug = ? ", data.Options.ConnectionId, data.Options.ProjectSlug), - } + collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{ + RawDataSubTaskArgs: *rawDataSubTaskArgs, + ApiClient: data.ApiClient, + CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{ + PageSize: int(data.Options.PageSize), + GetNextPageCustomData: ExtractNextPageToken, + BuildInputIterator: func(isIncremental bool, createdAfter *time.Time) (api.Iterator, errors.Error) { + clauses := []dal.Clause{ + dal.Select("id, pipeline_id"), // pipeline_id not on individual job response but required for result + dal.From(&models.CircleciWorkflow{}), + dal.Where("connection_id = ? and project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug), + } - db := taskCtx.GetDal() - cursor, err := db.Cursor(clauses...) - if err != nil { - return err - } - iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{})) - if err != nil { - return err - } + if isIncremental { + clauses = append(clauses, dal.Where("created_date > ?", createdAfter)) + } + + db := taskCtx.GetDal() + cursor, err := db.Cursor(clauses...) + if err != nil { + return nil, err + } + return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{})) + }, + FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{ + UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", + Query: BuildQueryParamsWithPageToken, + ResponseParser: ParseCircleciPageTokenResp, + AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted + }, + GetCreated: func(item json.RawMessage) (time.Time, errors.Error) { + var job struct { // Individual job response lacks created_at field, so have to use started_at + CreatedAt time.Time `json:"started_at"` // This will be null in some cases (e.g. queued, not_running, blocked) + } + if err := json.Unmarshal(item, &job); err != nil { + return time.Time{}, errors.Default.Wrap(err, "failed to unmarshal job") + } + return job.CreatedAt, nil + }, + }, + CollectUnfinishedDetails: &api.FinalizableApiCollectorDetailArgs{ + FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{ + UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", // The individual job endpoint has different fields so need to recollect all jobs for a workflow + Query: BuildQueryParamsWithPageToken, + ResponseParser: ParseCircleciPageTokenResp, + AfterResponse: ignoreDeletedBuilds, + }, + BuildInputIterator: func() (api.Iterator, errors.Error) { + clauses := []dal.Clause{ + dal.Select("DISTINCT workflow_id"), // Only need to recollect jobs for a workflow once + dal.From(&models.CircleciJob{}), + dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'not_running', 'queued', 'on_hold')", data.Options.ConnectionId, data.Options.ProjectSlug), + } - collector, err := api.NewApiCollector(api.ApiCollectorArgs{ - RawDataSubTaskArgs: *rawDataSubTaskArgs, - ApiClient: data.ApiClient, - UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", - Input: iterator, - GetNextPageCustomData: ExtractNextPageToken, - Query: BuildQueryParamsWithPageToken, - ResponseParser: ParseCircleciPageTokenResp, - AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a job has been deleted + db := taskCtx.GetDal() + cursor, err := db.Cursor(clauses...) + if err != nil { + return nil, err + } + return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciJob{})) + }, + }, }) if err != nil { logger.Error(err, "collect jobs error") diff --git a/backend/plugins/circleci/tasks/pipeline_collector.go b/backend/plugins/circleci/tasks/pipeline_collector.go index 20055f89405..2dc8da4f3a2 100644 --- a/backend/plugins/circleci/tasks/pipeline_collector.go +++ b/backend/plugins/circleci/tasks/pipeline_collector.go @@ -20,7 +20,6 @@ package tasks import ( "encoding/json" "net/http" - "time" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" @@ -44,35 +43,38 @@ func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error { logger := taskCtx.GetLogger() timeAfter := rawDataSubTaskArgs.Ctx.TaskContext().SyncPolicy().TimeAfter logger.Info("collect pipelines") - collector, err := api.NewApiCollector(api.ApiCollectorArgs{ - RawDataSubTaskArgs: *rawDataSubTaskArgs, - ApiClient: data.ApiClient, - UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline", - PageSize: int(data.Options.PageSize), - GetNextPageCustomData: ExtractNextPageToken, - Query: BuildQueryParamsWithPageToken, - ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { - data := CircleciPageTokenResp[[]json.RawMessage]{} - err := api.UnmarshalResponse(res, &data) + collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{ + RawDataSubTaskArgs: *rawDataSubTaskArgs, + ApiClient: data.ApiClient, + CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{ + PageSize: int(data.Options.PageSize), + GetNextPageCustomData: ExtractNextPageToken, + FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{ + UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline", + Query: BuildQueryParamsWithPageToken, + ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { + data := CircleciPageTokenResp[[]json.RawMessage]{} + err := api.UnmarshalResponse(res, &data) - if err != nil { - return nil, err - } - filteredItems := []json.RawMessage{} - for _, item := range data.Items { - var pipeline struct { - CreatedAt time.Time `json:"created_at"` - } - if err := json.Unmarshal(item, &pipeline); err != nil { - return nil, errors.Default.Wrap(err, "failed to unmarshal pipeline item") - } - if pipeline.CreatedAt.Before(*timeAfter) { - return filteredItems, api.ErrFinishCollect - } - filteredItems = append(filteredItems, item) + if err != nil { + return nil, err + } + filteredItems := []json.RawMessage{} + for _, item := range data.Items { + pipelineCreatedAt, err := extractCreatedAt(item) - } - return filteredItems, nil + if err != nil { + return nil, err + } + if pipelineCreatedAt.Before(*timeAfter) { + return filteredItems, api.ErrFinishCollect + } + filteredItems = append(filteredItems, item) + } + return filteredItems, nil + }, + }, + GetCreated: extractCreatedAt, }, }) if err != nil { diff --git a/backend/plugins/circleci/tasks/shared.go b/backend/plugins/circleci/tasks/shared.go index c42a3f9c9d1..6e235ecd6b0 100644 --- a/backend/plugins/circleci/tasks/shared.go +++ b/backend/plugins/circleci/tasks/shared.go @@ -21,6 +21,7 @@ import ( "encoding/json" "net/http" "net/url" + "time" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" @@ -108,7 +109,7 @@ func ExtractNextPageToken(prevReqData *api.RequestData, prevPageResponse *http.R return res.NextPageToken, nil } -func BuildQueryParamsWithPageToken(reqData *api.RequestData) (url.Values, errors.Error) { +func BuildQueryParamsWithPageToken(reqData *api.RequestData, _ *time.Time) (url.Values, errors.Error) { query := url.Values{} if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" { query.Set("page-token", pageToken) @@ -130,3 +131,13 @@ func ignoreDeletedBuilds(res *http.Response) errors.Error { } return nil } + +func extractCreatedAt(item json.RawMessage) (time.Time, errors.Error) { + var entity struct { + CreatedAt time.Time `json:"created_at"` + } + if err := json.Unmarshal(item, &entity); err != nil { + return time.Time{}, errors.Default.Wrap(err, "failed to unmarshal item") + } + return entity.CreatedAt, nil +} diff --git a/backend/plugins/circleci/tasks/workflow_collector.go b/backend/plugins/circleci/tasks/workflow_collector.go index 342efc82378..f0f3aebe5c9 100644 --- a/backend/plugins/circleci/tasks/workflow_collector.go +++ b/backend/plugins/circleci/tasks/workflow_collector.go @@ -18,7 +18,10 @@ limitations under the License. package tasks import ( + "encoding/json" + "net/http" "reflect" + "time" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" @@ -44,31 +47,64 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error { logger := taskCtx.GetLogger() logger.Info("collect workflows") - clauses := []dal.Clause{ - dal.Select("id"), - dal.From(&models.CircleciPipeline{}), - dal.Where("_tool_circleci_pipelines.connection_id = ? and _tool_circleci_pipelines.project_slug = ? ", data.Options.ConnectionId, data.Options.ProjectSlug), - } + collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{ + RawDataSubTaskArgs: *rawDataSubTaskArgs, + ApiClient: data.ApiClient, + CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{ + PageSize: int(data.Options.PageSize), + GetNextPageCustomData: ExtractNextPageToken, + BuildInputIterator: func(isIncremental bool, createdAfter *time.Time) (api.Iterator, errors.Error) { + clauses := []dal.Clause{ + dal.Select("id"), + dal.From(&models.CircleciPipeline{}), + dal.Where("connection_id = ? AND project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug), + } - db := taskCtx.GetDal() - cursor, err := db.Cursor(clauses...) - if err != nil { - return err - } - iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciPipeline{})) - if err != nil { - return err - } + if isIncremental { + clauses = append(clauses, dal.Where("created_date > ?", createdAfter)) + } + + db := taskCtx.GetDal() + cursor, err := db.Cursor(clauses...) + if err != nil { + return nil, err + } + return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciPipeline{})) + }, + FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{ + UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow", + Query: BuildQueryParamsWithPageToken, + ResponseParser: ParseCircleciPageTokenResp, + AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted + }, + GetCreated: extractCreatedAt, + }, + CollectUnfinishedDetails: &api.FinalizableApiCollectorDetailArgs{ + FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{ + UrlTemplate: "/v2/workflow/{{ .Input.Id }}", + Query: nil, + ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { + var data json.RawMessage + err := api.UnmarshalResponse(res, &data) + return []json.RawMessage{data}, err + }, + AfterResponse: ignoreDeletedBuilds, + }, + BuildInputIterator: func() (api.Iterator, errors.Error) { + clauses := []dal.Clause{ + dal.Select("id"), + dal.From(&models.CircleciWorkflow{}), + dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'on_hold', 'failing')", data.Options.ConnectionId, data.Options.ProjectSlug), + } - collector, err := api.NewApiCollector(api.ApiCollectorArgs{ - RawDataSubTaskArgs: *rawDataSubTaskArgs, - ApiClient: data.ApiClient, - UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow", - Input: iterator, - GetNextPageCustomData: ExtractNextPageToken, - Query: BuildQueryParamsWithPageToken, - ResponseParser: ParseCircleciPageTokenResp, - AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted + db := taskCtx.GetDal() + cursor, err := db.Cursor(clauses...) + if err != nil { + return nil, err + } + return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{})) + }, + }, }) if err != nil { logger.Error(err, "collect workflows error")