Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(circleci-plugin): incremental data collection #7986

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
38 changes: 34 additions & 4 deletions backend/helpers/pluginhelper/api/api_collector_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,19 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg
createdAfter := manager.CollectorStateManager.GetSince()
isIncremental := manager.CollectorStateManager.IsIncremental()

var inputIterator Iterator
if args.CollectNewRecordsByList.BuildInputIterator != nil {
inputIterator, err = args.CollectNewRecordsByList.BuildInputIterator(isIncremental, createdAfter)
if err != nil {
return nil, err
}
}

// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
Input: inputIterator,
Incremental: isIncremental,
UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
Expand All @@ -169,21 +178,41 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg

// time filter or diff sync
if createdAfter != nil && args.CollectNewRecordsByList.GetCreated != nil {
// if the first record of the page was created before createdAfter, return emtpy set and stop
// if the first record of the page was created before createdAfter and not a zero value, return empty set and stop
firstCreated, err := args.CollectNewRecordsByList.GetCreated(items[0])
if err != nil {
return nil, err
}
if firstCreated.Before(*createdAfter) {
if firstCreated.Before(*createdAfter) && !firstCreated.IsZero() {
return nil, ErrFinishCollect
}
// if the last record was created before createdAfter, return records and stop

// If last record was created before CreatedAfter, including a zero value, check each record individually
lastCreated, err := args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
if err != nil {
return nil, err
}
if lastCreated.Before(*createdAfter) {
return items, ErrFinishCollect
var validItems []json.RawMessage
// Only collect items that were created after the last successful collection to prevent duplicates
for _, item := range items {
itemCreatedAt, err := args.CollectNewRecordsByList.GetCreated(item)
if err != nil {
return nil, err
}

if itemCreatedAt.IsZero() {
// If zero then timestamp is null on the response - accept as valid for downstream processing
validItems = append(validItems, item)
continue
}

if itemCreatedAt.Before(*createdAfter) {
// Once we reach an item that was created before the last successful collection, stop & return
return validItems, ErrFinishCollect
}
validItems = append(validItems, item)
}
}
}
return items, err
Expand Down Expand Up @@ -267,6 +296,7 @@ type FinalizableApiCollectorListArgs struct {
Concurrency int // required for Undetermined Strategy, number of concurrent requests
GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) // required for Sequential Strategy, to extract the next page cursor from the given response
GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error) // required for Determined Strategy, to extract the total number of pages from the given response
BuildInputIterator func(isIncremental bool, createdAfter *time.Time) (Iterator, errors.Error)
}

// FinalizableApiCollectorDetailArgs is the arguments for the detail collector
Expand Down
85 changes: 62 additions & 23 deletions backend/plugins/circleci/tasks/job_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ limitations under the License.
package tasks

import (
"encoding/json"
"reflect"
"time"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
Expand All @@ -44,31 +46,68 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
logger := taskCtx.GetLogger()
logger.Info("collect jobs")

clauses := []dal.Clause{
dal.Select("id, pipeline_id"),
dal.From(&models.CircleciWorkflow{}),
dal.Where("_tool_circleci_workflows.connection_id = ? and _tool_circleci_workflows.project_slug = ? ", data.Options.ConnectionId, data.Options.ProjectSlug),
}
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
BuildInputIterator: func(isIncremental bool, createdAfter *time.Time) (api.Iterator, errors.Error) {
clauses := []dal.Clause{
dal.Select("id, pipeline_id"), // pipeline_id not on individual job response but required for result
dal.From(&models.CircleciWorkflow{}),
dal.Where("connection_id = ? and project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
}

db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
}
iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
if err != nil {
return err
}
if isIncremental {
clauses = append(clauses, dal.Where("created_date > ?", createdAfter))
}

db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return nil, err
}
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
},
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
},
GetCreated: func(item json.RawMessage) (time.Time, errors.Error) {
var job struct { // Individual job response lacks created_at field, so have to use started_at
CreatedAt time.Time `json:"started_at"` // This will be null in some cases (e.g. queued, not_running, blocked)
}
if err := json.Unmarshal(item, &job); err != nil {
return time.Time{}, errors.Default.Wrap(err, "failed to unmarshal job")
}
return job.CreatedAt, nil
},
},
CollectUnfinishedDetails: &api.FinalizableApiCollectorDetailArgs{
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", // The individual job endpoint has different fields so need to recollect all jobs for a workflow
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds,
},
BuildInputIterator: func() (api.Iterator, errors.Error) {
clauses := []dal.Clause{
dal.Select("DISTINCT workflow_id"), // Only need to recollect jobs for a workflow once
dal.From(&models.CircleciJob{}),
dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'not_running', 'queued', 'on_hold')", data.Options.ConnectionId, data.Options.ProjectSlug),
}

collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
Input: iterator,
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a job has been deleted
db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return nil, err
}
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciJob{}))
},
},
})
if err != nil {
logger.Error(err, "collect jobs error")
Expand Down
58 changes: 30 additions & 28 deletions backend/plugins/circleci/tasks/pipeline_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package tasks
import (
"encoding/json"
"net/http"
"time"

"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
Expand All @@ -44,35 +43,38 @@ func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error {
logger := taskCtx.GetLogger()
timeAfter := rawDataSubTaskArgs.Ctx.TaskContext().SyncPolicy().TimeAfter
logger.Info("collect pipelines")
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline",
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
data := CircleciPageTokenResp[[]json.RawMessage]{}
err := api.UnmarshalResponse(res, &data)
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline",
Query: BuildQueryParamsWithPageToken,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
data := CircleciPageTokenResp[[]json.RawMessage]{}
err := api.UnmarshalResponse(res, &data)

if err != nil {
return nil, err
}
filteredItems := []json.RawMessage{}
for _, item := range data.Items {
var pipeline struct {
CreatedAt time.Time `json:"created_at"`
}
if err := json.Unmarshal(item, &pipeline); err != nil {
return nil, errors.Default.Wrap(err, "failed to unmarshal pipeline item")
}
if pipeline.CreatedAt.Before(*timeAfter) {
return filteredItems, api.ErrFinishCollect
}
filteredItems = append(filteredItems, item)
if err != nil {
return nil, err
}
filteredItems := []json.RawMessage{}
for _, item := range data.Items {
pipelineCreatedAt, err := extractCreatedAt(item)

}
return filteredItems, nil
if err != nil {
return nil, err
}
if pipelineCreatedAt.Before(*timeAfter) {
return filteredItems, api.ErrFinishCollect
}
filteredItems = append(filteredItems, item)
}
return filteredItems, nil
},
},
GetCreated: extractCreatedAt,
},
})
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion backend/plugins/circleci/tasks/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"net/http"
"net/url"
"time"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
Expand Down Expand Up @@ -108,7 +109,7 @@ func ExtractNextPageToken(prevReqData *api.RequestData, prevPageResponse *http.R
return res.NextPageToken, nil
}

func BuildQueryParamsWithPageToken(reqData *api.RequestData) (url.Values, errors.Error) {
func BuildQueryParamsWithPageToken(reqData *api.RequestData, _ *time.Time) (url.Values, errors.Error) {
query := url.Values{}
if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
query.Set("page-token", pageToken)
Expand All @@ -130,3 +131,13 @@ func ignoreDeletedBuilds(res *http.Response) errors.Error {
}
return nil
}

func extractCreatedAt(item json.RawMessage) (time.Time, errors.Error) {
var entity struct {
CreatedAt time.Time `json:"created_at"`
}
if err := json.Unmarshal(item, &entity); err != nil {
return time.Time{}, errors.Default.Wrap(err, "failed to unmarshal item")
}
return entity.CreatedAt, nil
}
82 changes: 59 additions & 23 deletions backend/plugins/circleci/tasks/workflow_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ limitations under the License.
package tasks

import (
"encoding/json"
"net/http"
"reflect"
"time"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
Expand All @@ -44,31 +47,64 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error {
logger := taskCtx.GetLogger()
logger.Info("collect workflows")

clauses := []dal.Clause{
dal.Select("id"),
dal.From(&models.CircleciPipeline{}),
dal.Where("_tool_circleci_pipelines.connection_id = ? and _tool_circleci_pipelines.project_slug = ? ", data.Options.ConnectionId, data.Options.ProjectSlug),
}
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
BuildInputIterator: func(isIncremental bool, createdAfter *time.Time) (api.Iterator, errors.Error) {
clauses := []dal.Clause{
dal.Select("id"),
dal.From(&models.CircleciPipeline{}),
dal.Where("connection_id = ? AND project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
}

db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
}
iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciPipeline{}))
if err != nil {
return err
}
if isIncremental {
clauses = append(clauses, dal.Where("created_date > ?", createdAfter))
}

db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return nil, err
}
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciPipeline{}))
},
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
},
GetCreated: extractCreatedAt,
},
CollectUnfinishedDetails: &api.FinalizableApiCollectorDetailArgs{
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
UrlTemplate: "/v2/workflow/{{ .Input.Id }}",
Query: nil,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
var data json.RawMessage
err := api.UnmarshalResponse(res, &data)
return []json.RawMessage{data}, err
},
AfterResponse: ignoreDeletedBuilds,
},
BuildInputIterator: func() (api.Iterator, errors.Error) {
clauses := []dal.Clause{
dal.Select("id"),
dal.From(&models.CircleciWorkflow{}),
dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'on_hold', 'failing')", data.Options.ConnectionId, data.Options.ProjectSlug),
}

collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
Input: iterator,
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return nil, err
}
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
},
},
})
if err != nil {
logger.Error(err, "collect workflows error")
Expand Down