Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experimental] Dora plugin support parallel tasks. #8003

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion backend/core/plugin/plugin_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package plugin

import (
"context"
"strings"

corecontext "github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
Expand All @@ -35,6 +36,13 @@ const (
SetCurrentSubTask
)

const (
TaskTypeCollector = "collector"
TaskTypeExtractor = "extractor"
TaskTypeConvertor = "convertor"
TaskTypeEnricher = "enricher"
)

type RunningProgress struct {
Type ProgressType
Current int
Expand Down Expand Up @@ -106,7 +114,13 @@ type SubTaskMeta struct {
Dependencies []*SubTaskMeta
DependencyTables []string
ProductTables []string
ForceRunOnResume bool // Should a subtask be ran dispite it was finished before
ForceRunOnResume bool // Should a subtask be run despite it was finished before

TaskType string
}

func (subtaskMeta SubTaskMeta) IsCollector() bool {
return strings.Contains(strings.ToLower(subtaskMeta.Name), "collect") || subtaskMeta.TaskType == TaskTypeCollector
}

// PluginTask Implement this interface to let framework run tasks for you
Expand All @@ -122,3 +136,14 @@ type CloseablePluginTask interface {
PluginTask
Close(taskCtx TaskContext) errors.Error
}

type ParallelTask interface {
PluginTask
GetOrchestratedTask() (OrchestratedTask, errors.Error)
}

type SequentialTasks []SubTaskMeta

type ParallelSequentialTaskGroup []SequentialTasks

type OrchestratedTask []ParallelSequentialTaskGroup
260 changes: 245 additions & 15 deletions backend/core/runner/run_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package runner

import (
gocontext "context"
goerror "errors"
"fmt"
"github.com/apache/incubator-devlake/core/models/common"
"github.com/spf13/cast"
"golang.org/x/sync/errgroup"
"strings"
"time"

Expand Down Expand Up @@ -145,30 +148,257 @@ func RunTask(
return err
}

// RunPluginTask FIXME ...
func RunPluginTask(
func RunPluginTask(ctx gocontext.Context, basicRes context.BasicRes, task *models.Task, progress chan plugin.RunningProgress, syncPolicy *models.SyncPolicy) errors.Error {
pluginMeta, err := plugin.GetPlugin(task.Plugin)
if err != nil {
return errors.Default.WrapRaw(err)
}

if pluginTask, ok := pluginMeta.(plugin.ParallelTask); ok {
return RunPluginSubTasksParallel(ctx, basicRes, task, pluginTask, progress, syncPolicy)
}

if pluginTask, ok := pluginMeta.(plugin.PluginTask); !ok {
return errors.Default.New(fmt.Sprintf("plugin %s doesn't support PluginTask interface", task.Plugin))
} else {
return RunPluginSubTasks(ctx, basicRes, task, pluginTask, progress, syncPolicy)
}
}

func recordSubTaskSequence(basicRes context.BasicRes, taskID uint64, taskCtx plugin.TaskContext, subtaskMetas []plugin.SubTaskMeta) errors.Error {
// record subtasks sequence to DB
collectSubtaskNumber := 0
otherSubtaskNumber := 0
isCollector := false
var subtasks []models.Subtask
for _, subtaskMeta := range subtaskMetas {
subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
if err != nil {
return errors.Default.Wrap(err, fmt.Sprintf("error getting context subtask %s", subtaskMeta.Name))
}
if subtaskCtx == nil {
continue
}
if subtaskMeta.IsCollector() || strings.Contains(strings.ToLower(subtaskMeta.Name), "clone git repo") {
collectSubtaskNumber++
isCollector = true
} else {
otherSubtaskNumber++
isCollector = false
}
s := models.Subtask{
Name: subtaskCtx.GetName(),
TaskID: taskID,
IsCollector: isCollector,
}
if isCollector {
s.Sequence = collectSubtaskNumber
} else {
s.Sequence = otherSubtaskNumber
}
subtasks = append(subtasks, s)
}
if err := basicRes.GetDal().CreateOrUpdate(subtasks); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask list to DB")
}
return nil
}

func getRunnableSubtaskMetas(taskCtx plugin.TaskContext, subtaskMetas plugin.SequentialTasks) ([]plugin.SubTaskMeta, []plugin.SubTaskContext, errors.Error) {
var ret []plugin.SubTaskMeta
var subtaskCtxs []plugin.SubTaskContext
for _, subtaskMeta := range subtaskMetas {
subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
if err != nil {
return nil, nil, errors.Default.Wrap(err, fmt.Sprintf("error getting context subtask %s", subtaskMeta.Name))
}
if subtaskCtx == nil { // subtask was disabled
continue
}
ret = append(ret, subtaskMeta)
subtaskCtxs = append(subtaskCtxs, subtaskCtx)
}
return ret, subtaskCtxs, nil
}

func RunPluginSubTasksParallel(
ctx gocontext.Context,
basicRes context.BasicRes,
task *models.Task,
pluginTask plugin.ParallelTask,
progress chan plugin.RunningProgress,
syncPolicy *models.SyncPolicy,
) errors.Error {
pluginMeta, err := plugin.GetPlugin(task.Plugin)
taskID := task.ID
subTaskFlag, err := getSubtaskFlagMap(pluginTask, syncPolicy, task.Subtasks)
if err != nil {
return errors.Default.WrapRaw(err)
return err
}
pluginTask, ok := pluginMeta.(plugin.PluginTask)
if !ok {
return errors.Default.New(fmt.Sprintf("plugin %s doesn't support PluginTask interface", task.Plugin))
orchestratedParallelTasks, err := pluginTask.GetOrchestratedTask()
if err != nil {
return err
}
return RunPluginSubTasks(
ctx,
basicRes,
task,
pluginTask,
progress,
syncPolicy,
)

taskCtx := contextimpl.NewDefaultTaskContext(ctx, basicRes, task.Plugin, subTaskFlag, progress)
if closeablePlugin, ok := pluginTask.(plugin.CloseablePluginTask); ok {
defer closeablePlugin.Close(taskCtx)
}
taskCtx.SetSyncPolicy(syncPolicy)

taskData, err := pluginTask.PrepareTaskData(taskCtx, task.Options)
if err != nil {
return errors.Default.Wrap(err, fmt.Sprintf("error preparing task data for %s", task.Plugin))
}
taskCtx.SetData(taskData)

if err := recordSubTaskSequence(basicRes, task.ID, taskCtx, pluginTask.SubTaskMetas()); err != nil {
return err
}

steps := getStepsFromSubtasksFlag(subTaskFlag)
taskCtx.SetProgress(0, steps)

subtaskNumber := 0

for _, parallelTasks := range orchestratedParallelTasks {
g := new(errgroup.Group)
for _, commonSubTasks := range parallelTasks {
subTasks, subTaskCtxs, err := getRunnableSubtaskMetas(taskCtx, commonSubTasks)
basicRes.GetLogger().Info("run tasks: %+v", subTasks)
g.Go(func() error {
defer func() {
if panicErr := recover(); err != nil {
basicRes.GetLogger().Error(goerror.New(cast.ToString(panicErr)), "panic in run common task")
}
}()
if err := runSequenceSubTasks(basicRes, taskCtx, taskID, subtaskNumber, progress, subTasks, subTaskCtxs); err != nil {
basicRes.GetLogger().Error(err, "runSequenceSubTasks: %s", subTasks)
return err
}
return nil
})

subtaskNumber += len(subTasks)
}
if err := g.Wait(); err != nil {
return errors.Convert(err)
}
}

return nil
}

func runSequenceSubTasks(basicRes context.BasicRes, taskCtx plugin.TaskContext, taskID uint64, subtaskNumber int, progress chan plugin.RunningProgress, sequenceSubTasks []plugin.SubTaskMeta, sequenceSubTaskCtxs []plugin.SubTaskContext) errors.Error {
for idx, subtaskMeta := range sequenceSubTasks {
subtaskNumber++
setProgress(progress, subtaskMeta.Name, subtaskNumber)
if err := runTaskWithPreCheck(basicRes, sequenceSubTaskCtxs[idx], taskID, subtaskNumber, subtaskMeta); err != nil {
basicRes.GetLogger().Error(err, fmt.Sprintf("error runTaskWithPreCheck: %s", subtaskMeta.Name))
return err
}
taskCtx.IncProgress(1)
}
return nil
}

func setProgress(progress chan plugin.RunningProgress, name string, number int) {
if progress != nil {
progress <- plugin.RunningProgress{
Type: plugin.SetCurrentSubTask,
SubTaskName: name,
SubTaskNumber: number,
}
}
}

func runTaskWithPreCheck(basicRes context.BasicRes, subtaskCtx plugin.SubTaskContext, taskID uint64, subtaskNumber int, subtaskMeta plugin.SubTaskMeta) errors.Error {
logger := basicRes.GetLogger()
subtaskFinished := false
if !subtaskMeta.ForceRunOnResume {
if taskID > 0 {
sfc := errors.Must1(basicRes.GetDal().Count(
dal.From(&models.Subtask{}), dal.Where("task_id = ? AND name = ? AND finished_at IS NOT NULL", taskID, subtaskMeta.Name),
),
)
subtaskFinished = sfc > 0
}
}
if subtaskFinished {
logger.Info("subtask %s already finished previously", subtaskMeta.Name)
} else {
logger.Info("executing subtask %s", subtaskMeta.Name)
start := time.Now()
err := runSubtask(basicRes, subtaskCtx, taskID, subtaskNumber, subtaskMeta.EntryPoint)
logger.Info("subtask %s finished in %d ms", subtaskMeta.Name, time.Since(start).Milliseconds())
if err != nil {
err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
logger.Error(err, "")
where := dal.Where("task_id = ? and name = ?", taskID, subtaskCtx.GetName())
if err := basicRes.GetDal().UpdateColumns(models.Subtask{}, []dal.DalSet{
{ColumnName: "is_failed", Value: 1},
{ColumnName: "message", Value: err.Error()},
}, where); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask %v status to DB", subtaskCtx.GetName())
}
return err
}
}
return nil
}

func getSubtaskFlagMap(pluginTask plugin.PluginTask, syncPolicy *models.SyncPolicy, specifiedTasks []string) (map[string]bool, errors.Error) {
// find out all possible subtasks this plugin can offer
subtaskMetas := pluginTask.SubTaskMetas()
subtasksFlag := make(map[string]bool)
for _, subtaskMeta := range subtaskMetas {
subtasksFlag[subtaskMeta.Name] = subtaskMeta.EnabledByDefault
}
/* subtasksFlag example
subtasksFlag := map[string]bool{
"collectProject": true,
"convertCommits": true,
...
}
*/

// user specifies what subtasks to run
if len(specifiedTasks) > 0 {
// first, disable all subtasks
for task := range subtasksFlag {
subtasksFlag[task] = false
}
}
// second, check specified subtasks is valid and enable them if so
for _, task := range specifiedTasks {
if _, ok := subtasksFlag[task]; ok {
subtasksFlag[task] = true
} else {
return nil, errors.Default.New(fmt.Sprintf("subtask %s does not exist", task))
}
}

// 1. make sure `Collect` subtasks skip if `SkipCollectors` is true
// 2. make sure `Required` subtasks are always enabled
for _, subtaskMeta := range subtaskMetas {
if syncPolicy != nil && syncPolicy.SkipCollectors && subtaskMeta.IsCollector() {
subtasksFlag[subtaskMeta.Name] = false
}
if subtaskMeta.Required {
subtasksFlag[subtaskMeta.Name] = true
}
}
return subtasksFlag, nil
}

func getStepsFromSubtasksFlag(subtasksFlag map[string]bool) int {
// calculate total step(number of task to run)
steps := 0
for _, enabled := range subtasksFlag {
if enabled {
steps++
}
}
return steps
}

// RunPluginSubTasks FIXME ...
Expand Down
2 changes: 1 addition & 1 deletion backend/impls/context/default_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *defaultExecContext) IncProgress(progressType plugin.ProgressType, quant
Current: int(current),
Total: c.total,
}
// subtask progress may go too fast, remove old messages because they don't matter any more
// subtask progress may go too fast, remove old messages because they don't matter anymore
if progressType == plugin.SubTaskSetProgress {
for len(c.progress) > 1 {
<-c.progress
Expand Down
27 changes: 26 additions & 1 deletion backend/plugins/dora/impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package impl

import (
"encoding/json"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
coreModels "github.com/apache/incubator-devlake/core/models"
Expand Down Expand Up @@ -168,3 +167,29 @@ func (p Dora) MakeMetricPluginPipelinePlanV200(projectName string, options json.
}
return plan, nil
}

func (p Dora) GetOrchestratedTask() (plugin.OrchestratedTask, errors.Error) {
ret := []plugin.ParallelSequentialTaskGroup{
{
plugin.SequentialTasks{
tasks.DeploymentGeneratorMeta,
tasks.DeploymentCommitsGeneratorMeta,
tasks.EnrichPrevSuccessDeploymentCommitMeta,
},
plugin.SequentialTasks{
tasks.CalculateChangeLeadTimeMeta,
},
},
{
plugin.SequentialTasks{
tasks.IssuesToIncidentsMeta,
},
},
{
plugin.SequentialTasks{
tasks.ConnectIncidentToDeploymentMeta,
},
},
}
return ret, nil
}
Loading
Loading