Skip to content

Commit

Permalink
Store TaskDefinitions in the Graph by Task ID (vercel#3428)
Browse files Browse the repository at this point in the history
Moves TaskDefinitions in the `CompleteGraph` struct, so we can look it
up during execution. We want to do this here, because _during_ execution,
The TaskGraph has already been constructed, so it is too late to change the definition
at the point. Although this commit does not try to change any TaskDefinitions, we will
implement that for composable configs in the future.
  • Loading branch information
mehulkar authored Jan 24, 2023
1 parent c6f706f commit a44b461
Show file tree
Hide file tree
Showing 15 changed files with 475 additions and 281 deletions.
3 changes: 3 additions & 0 deletions cli/integration_tests/topological_deps/monorepo/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules/
.turbo
.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "my-app",
"scripts": {
"build": "echo 'building'"
},
"dependencies": {
"util": "*"
}
}
7 changes: 7 additions & 0 deletions cli/integration_tests/topological_deps/monorepo/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name": "monorepo",
"workspaces": [
"apps/**",
"packages/**"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "util",
"scripts": {
"build": "echo 'building'"
}
}
11 changes: 11 additions & 0 deletions cli/integration_tests/topological_deps/monorepo/turbo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"$schema": "https://turbo.build/schema.json",
"pipeline": {
"build": {
"dependsOn": ["^build"]
},
"//#build": {
"dependsOn": []
}
}
}
26 changes: 26 additions & 0 deletions cli/integration_tests/topological_deps/run.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Setup
$ . ${TESTDIR}/../setup.sh
$ . ${TESTDIR}/setup.sh $(pwd)

Check my-app#build output
$ ${TURBO} run build
\xe2\x80\xa2 Packages in scope: //, my-app, util (esc)
\xe2\x80\xa2 Running build in 3 packages (esc)
\xe2\x80\xa2 Remote caching disabled (esc)
util:build: cache miss, executing 04c404a8edf3d3cb
util:build:
util:build: > build
util:build: > echo 'building'
util:build:
util:build: building
my-app:build: cache miss, executing 4f4f453dc15cbe8c
my-app:build:
my-app:build: > build
my-app:build: > echo 'building'
my-app:build:
my-app:build: building

Tasks: 2 successful, 2 total
Cached: 0 cached, 2 total
Time:\s*[\.0-9]+m?s (re)

6 changes: 6 additions & 0 deletions cli/integration_tests/topological_deps/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

SCRIPT_DIR=$(dirname ${BASH_SOURCE[0]})
TARGET_DIR=$1
cp -a ${SCRIPT_DIR}/monorepo/. ${TARGET_DIR}/
${SCRIPT_DIR}/../setup_git.sh ${TARGET_DIR}
41 changes: 31 additions & 10 deletions cli/internal/core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,22 @@ type Visitor = func(taskID string) error

// Engine contains both the DAG for the packages and the tasks and implements the methods to execute tasks in them
type Engine struct {
// WorkspaceGraph is a graph of workspaces
WorkspaceGraph *dag.AcyclicGraph
// TaskGraph is a graph of package-tasks
TaskGraph *dag.AcyclicGraph
// Tasks are a map of tasks in the engine
Tasks map[string]*Task
PackageTaskDeps map[string][]string
rootEnabledTasks util.Set

// completeGraph is the CompleteGraph. We need this to look up the Pipeline, etc.
completeGraph *graph.CompleteGraph
}

// NewEngine creates a new engine given a topologic graph of workspace package names
func NewEngine(workspaceGraph *dag.AcyclicGraph) *Engine {
func NewEngine(completeGraph *graph.CompleteGraph) *Engine {
return &Engine{
completeGraph: completeGraph,
Tasks: make(map[string]*Task),
WorkspaceGraph: workspaceGraph,
TaskGraph: &dag.AcyclicGraph{},
PackageTaskDeps: map[string][]string{},
rootEnabledTasks: make(util.Set),
Expand Down Expand Up @@ -117,8 +118,10 @@ func (e *Engine) getTaskDefinition(taskName string, taskID string) (*Task, error

func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly bool) error {
traversalQueue := []string{}

for _, pkg := range pkgs {
isRootPkg := pkg == util.RootPkgName

for _, taskName := range taskNames {
if !isRootPkg || e.rootEnabledTasks.Includes(taskName) {
taskID := util.GetTaskId(pkg, taskName)
Expand All @@ -142,10 +145,17 @@ func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly
traversalQueue = traversalQueue[1:]

pkg, taskName := util.GetPackageTaskFromId(taskID)

if pkg == util.RootPkgName && !e.rootEnabledTasks.Includes(taskName) {
return fmt.Errorf("%v needs an entry in turbo.json before it can be depended on because it is a task run from the root package", taskID)
}
task, err := e.getTaskDefinition(taskName, taskID)

taskDefinition, err := e.GetResolvedTaskDefinition(
&e.completeGraph.Pipeline,
taskName,
taskID,
)

if err != nil {
return err
}
Expand All @@ -157,12 +167,14 @@ func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly

visited.Add(taskID)

topoDeps := util.SetFromStrings(task.TaskDefinition.TopologicalDependencies)
// Put this taskDefinition into the Graph so we can look it up later during execution.
e.completeGraph.TaskDefinitions[taskID] = taskDefinition

topoDeps := util.SetFromStrings(taskDefinition.TopologicalDependencies)
deps := make(util.Set)
isPackageTask := util.IsPackageTask(taskName)

for _, dependency := range task.TaskDefinition.TaskDependencies {
for _, dependency := range taskDefinition.TaskDependencies {
// If the current task is a workspace-specific task (including root Task)
// and its dependency is _also_ a workspace-specific task, we need to add
// a reference to this dependency directly into the engine.
Expand Down Expand Up @@ -199,7 +211,7 @@ func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly

// hasTopoDeps will be true if the task depends on any tasks from dependency packages
// E.g. `dev: { dependsOn: [^dev] }`
hasTopoDeps := topoDeps.Len() > 0 && e.WorkspaceGraph.DownEdges(pkg).Len() > 0
hasTopoDeps := topoDeps.Len() > 0 && e.completeGraph.WorkspaceGraph.DownEdges(pkg).Len() > 0

// hasDeps will be true if the task depends on any tasks from its own package
// E.g. `build: { dependsOn: [dev] }`
Expand All @@ -214,7 +226,7 @@ func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly
}

if hasTopoDeps {
depPkgs := e.WorkspaceGraph.DownEdges(pkg)
depPkgs := e.completeGraph.WorkspaceGraph.DownEdges(pkg)
for _, from := range topoDeps.UnsafeListOfStrings() {
// add task dep from all the package deps within repo
for depPkg := range depPkgs {
Expand Down Expand Up @@ -269,14 +281,15 @@ func (e *Engine) AddTask(task *Task) *Engine {
e.rootEnabledTasks.Add(taskName)
}
}

e.Tasks[task.Name] = task
return e
}

// AddDep adds tuples from+to task ID combos in tuple format so they can be looked up later.
func (e *Engine) AddDep(fromTaskID string, toTaskID string) error {
fromPkg, _ := util.GetPackageTaskFromId(fromTaskID)
if fromPkg != ROOT_NODE_NAME && fromPkg != util.RootPkgName && !e.WorkspaceGraph.HasVertex(fromPkg) {
if fromPkg != ROOT_NODE_NAME && fromPkg != util.RootPkgName && !e.completeGraph.WorkspaceGraph.HasVertex(fromPkg) {
return fmt.Errorf("found reference to unknown package: %v in task %v", fromPkg, fromTaskID)
}

Expand Down Expand Up @@ -328,6 +341,7 @@ func (e *Engine) ValidatePersistentDependencies(graph *graph.CompleteGraph) erro
packageName, taskName := util.GetPackageTaskFromId(depTaskID)

// Get the Task Definition so we can check if it is Persistent
// TODO(mehulkar): Do we need to get a resolved taskDefinition here?
depTaskDefinition, taskExists := e.getTaskDefinition(taskName, depTaskID)
if taskExists != nil {
return fmt.Errorf("Cannot find task definition for %v in package %v", depTaskID, packageName)
Expand Down Expand Up @@ -362,3 +376,10 @@ func (e *Engine) ValidatePersistentDependencies(graph *graph.CompleteGraph) erro
// May or may not be set (could be nil)
return validationError
}

// GetResolvedTaskDefinition returns a "resolved" TaskDefinition.
// Today, it just looks up the task from the root Pipeline, but in the future
// we will compose the TaskDefinition from workspaces using the `extends` key.
func (e *Engine) GetResolvedTaskDefinition(rootPipeline *fs.Pipeline, taskName string, taskID string) (*fs.TaskDefinition, error) {
return rootPipeline.GetTask(taskID, taskName)
}
Loading

0 comments on commit a44b461

Please sign in to comment.