Skip to content

Commit

Permalink
added workflow and execution db entity
Browse files Browse the repository at this point in the history
  • Loading branch information
yindia committed Oct 16, 2024
1 parent 1e7fd66 commit 8efc245
Show file tree
Hide file tree
Showing 11 changed files with 412 additions and 53 deletions.
94 changes: 52 additions & 42 deletions idl/cloud/v1/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import "validate/validate.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";


// Enum for Task statuses
enum TaskStatusEnum {
QUEUED = 0; // Task is in the queue, waiting to be processed
Expand Down Expand Up @@ -101,49 +100,59 @@ message Task {
string description = 9 [(validate.rules).string = {
max_len: 5000
}];
repeated string dependencies = 10; // IDs of tasks that must complete before this task

// IDs of tasks that must complete before this task.
repeated string dependencies = 10;

// Base image for the task execution environment.
string base_image = 11;

// Entrypoint for the task execution.
string entrypoint = 12;

// Arguments for the task execution.
repeated string args = 13;

// Environment variables for the task execution.
map<string, string> env = 14;
}

// Workflow represents a collection of tasks organized in a DAG
// Workflow represents a collection of tasks organized in a Directed Acyclic Graph (DAG).
message Workflow {
string id = 1;
string name = 2;
string description = 3;
repeated Task tasks = 4;
map<string, string> metadata = 5;
}

// ExecutionStatus represents the current state of a task or workflow execution
string id = 1; // Unique identifier for the workflow.
string name = 2; // Name of the workflow.
string description = 3; // Description of the workflow.
repeated Task tasks = 4; // List of tasks in the workflow.
map<string, string> metadata = 5; // Additional metadata for the workflow.
}

// ExecutionStatus represents the current state of a task or workflow execution.
enum ExecutionStatus {
EXECUTION_STATUS_UNSPECIFIED = 0;
EXECUTION_STATUS_PENDING = 1;
EXECUTION_STATUS_RUNNING = 2;
EXECUTION_STATUS_COMPLETED = 3;
EXECUTION_STATUS_FAILED = 4;
}

// TaskExecution represents the execution of a task
EXECUTION_STATUS_UNSPECIFIED = 0; // Status is not specified.
EXECUTION_STATUS_PENDING = 1; // Task or workflow is pending execution.
EXECUTION_STATUS_RUNNING = 2; // Task or workflow is currently running.
EXECUTION_STATUS_COMPLETED = 3; // Task or workflow has completed execution.
EXECUTION_STATUS_FAILED = 4; // Task or workflow has failed.
}

// TaskExecution represents the execution of a task.
message TaskExecution {
string task_id = 1;
ExecutionStatus status = 2;
google.protobuf.Timestamp created_at = 3;
google.protobuf.Timestamp updated_at = 4;
map<string, string> execution_metadata = 5;
}
// WorkflowExecution represents the execution of a workflow
string task_id = 1; // Unique identifier for the task being executed.
ExecutionStatus status = 2; // Current execution status of the task.
google.protobuf.Timestamp created_at = 3; // Timestamp of when the task execution started.
google.protobuf.Timestamp updated_at = 4; // Timestamp of the last update to the task execution.
map<string, string> execution_metadata = 5; // Metadata related to the task execution.
}

// WorkflowExecution represents the execution of a workflow.
message WorkflowExecution {
string workflow_id = 1;
ExecutionStatus status = 2;
repeated TaskExecution task_executions = 3;
google.protobuf.Timestamp created_at = 4;
google.protobuf.Timestamp updated_at = 5;
map<string, string> execution_metadata = 6;
}
string workflow_id = 1; // Unique identifier for the workflow being executed.
ExecutionStatus status = 2; // Current execution status of the workflow.
repeated TaskExecution task_executions = 3; // List of task executions within the workflow.
google.protobuf.Timestamp created_at = 4; // Timestamp of when the workflow execution started.
google.protobuf.Timestamp updated_at = 5; // Timestamp of the last update to the workflow execution.
map<string, string> execution_metadata = 6; // Metadata related to the workflow execution.
}

// Message for Task history
message TaskHistory {
Expand Down Expand Up @@ -197,7 +206,6 @@ message UpdateTaskStatusRequest {
string message = 3 [(validate.rules).string = {max_len: 2000}];
}


// Task Management service definition
service TaskManagementService {
// Creates a new task based on the provided request.
Expand All @@ -224,11 +232,14 @@ service TaskManagementService {
// Returns a GetStatusResponse containing a map of status counts.
rpc GetStatus(GetStatusRequest) returns (GetStatusResponse) {}

// Sends a heartbeat signal to indicate the service is alive.
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}

// Pulls events related to task execution.
rpc PullEvents(PullEventsRequest) returns (stream PullEventsResponse) {}
}

// Message for heartbeat request
message HeartbeatRequest {
// Timestamp of the heartbeat, in ISO 8601 format (UTC).
// This timestamp indicates when the heartbeat was sent.
Expand All @@ -243,6 +254,7 @@ message HeartbeatRequest {
}];
}

// Message for heartbeat response
message HeartbeatResponse {
// Response message for the heartbeat request.
// Currently, this message is empty, indicating successful receipt of the heartbeat.
Expand All @@ -262,16 +274,13 @@ message PullEventsResponse {

// Message for work assignments
message WorkAssignment {
// Unique identifier for the assignment
int64 assignment_id = 1 ;
// Unique identifier for the assignment.
int64 assignment_id = 1;

// The task to be executed
// The task to be executed.
Task task = 2 [(validate.rules).message.required = true];
}




// Message for GetStatus request (empty)
message GetStatusRequest {}

Expand All @@ -283,7 +292,8 @@ message GetStatusResponse {

// Message for Task List
message TaskList {
repeated Task tasks = 1; // List of tasks in the system.
// List of tasks in the system.
repeated Task tasks = 1;
}

// Message for Task List request
Expand Down
102 changes: 102 additions & 0 deletions server/repository/gormimpl/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package gormimpl

import (
"context"
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"gorm.io/gorm"

interfaces "task/server/repository/interface"
models "task/server/repository/model/task"
)

var (
executionOperations = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "execution_repository_operations_total",
Help: "The total number of execution repository operations",
},
[]string{"operation", "status"},
)
executionLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "execution_repository_operation_duration_seconds",
Help: "Duration of execution repository operations in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"operation"},
)
)

// ExecutionRepo implements the ExecutionRepo interface using GORM for database operations
// and River for execution queue management.
type ExecutionRepo struct {
db *gorm.DB
}

// CreateExecution creates a new execution in the database and enqueues it for processing.
// It returns the created execution with its assigned ID or an error if the operation fails.
func (s *ExecutionRepo) CreateExecution(ctx context.Context, execution models.Execution) (models.Execution, error) {
timer := prometheus.NewTimer(executionLatency.WithLabelValues("create"))
defer timer.ObserveDuration()

result := s.db.Create(&execution)
if result.Error != nil {
executionOperations.WithLabelValues("create", "error").Inc()
return models.Execution{}, fmt.Errorf("failed to create execution: %w", result.Error)
}

if execution.ID == 0 {
executionOperations.WithLabelValues("create", "error").Inc()
return models.Execution{}, fmt.Errorf("failed to get execution ID after creation")
}

executionOperations.WithLabelValues("create", "success").Inc()
return execution, nil
}

// GetExecution retrieves a execution from the database by its ID.
// It returns a pointer to the execution if found, or an error if the execution doesn't exist or if the operation fails.
func (s *ExecutionRepo) GetExecution(ctx context.Context, executionID uint) (*models.Execution, error) {
timer := prometheus.NewTimer(executionLatency.WithLabelValues("get"))
defer timer.ObserveDuration()

var execution models.Execution
if err := s.db.First(&execution, executionID).Error; err != nil {
executionOperations.WithLabelValues("get", "error").Inc()
return nil, fmt.Errorf("failed to retrieve execution by ID: %w", err)
}
executionOperations.WithLabelValues("get", "success").Inc()
return &execution, nil
}

// ListExecutions retrieves a paginated list of executions from the database, filtered by status and type.
// The 'limit' parameter specifies the maximum number of executions to return,
// 'offset' determines the starting point for pagination,
// 'status' allows filtering by execution status, and 'executionType' allows filtering by execution type.
// It returns a slice of executions and an error if the operation fails.
func (s *ExecutionRepo) ListExecution(ctx context.Context) ([]models.Execution, error) {
timer := prometheus.NewTimer(executionLatency.WithLabelValues("list"))
defer timer.ObserveDuration()

var executions []models.Execution

// Execute the query
if err := s.db.Find(&executions).Error; err != nil {
executionOperations.WithLabelValues("list", "error").Inc()
return nil, fmt.Errorf("failed to retrieve executions: %w", err)
}

executionOperations.WithLabelValues("list", "success").Inc()
return executions, nil
}

// NewExecutionRepo creates and returns a new instance of ExecutionRepo.
// It requires a GORM database connection and a River client for execution queue management.
func NewExecutionRepo(db *gorm.DB) interfaces.ExecutionRepo {
return &ExecutionRepo{
db: db,
}
}
102 changes: 102 additions & 0 deletions server/repository/gormimpl/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package gormimpl

import (
"context"
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"gorm.io/gorm"

interfaces "task/server/repository/interface"
models "task/server/repository/model/task"
)

var (
workflowOperations = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "workflow_repository_operations_total",
Help: "The total number of workflow repository operations",
},
[]string{"operation", "status"},
)
workflowLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "workflow_repository_operation_duration_seconds",
Help: "Duration of workflow repository operations in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"operation"},
)
)

// WorkflowRepo implements the WorkflowRepo interface using GORM for database operations
// and River for workflow queue management.
type WorkflowRepo struct {
db *gorm.DB
}

// CreateWorkflow creates a new workflow in the database and enqueues it for processing.
// It returns the created workflow with its assigned ID or an error if the operation fails.
func (s *WorkflowRepo) CreateWorkflow(ctx context.Context, workflow models.Workflow) (models.Workflow, error) {
timer := prometheus.NewTimer(workflowLatency.WithLabelValues("create"))
defer timer.ObserveDuration()

result := s.db.Create(&workflow)
if result.Error != nil {
workflowOperations.WithLabelValues("create", "error").Inc()
return models.Workflow{}, fmt.Errorf("failed to create workflow: %w", result.Error)
}

if workflow.ID == 0 {
workflowOperations.WithLabelValues("create", "error").Inc()
return models.Workflow{}, fmt.Errorf("failed to get workflow ID after creation")
}

workflowOperations.WithLabelValues("create", "success").Inc()
return workflow, nil
}

// GetWorkflow retrieves a workflow from the database by its ID.
// It returns a pointer to the workflow if found, or an error if the workflow doesn't exist or if the operation fails.
func (s *WorkflowRepo) GetWorkflow(ctx context.Context, workflowID uint) (*models.Workflow, error) {
timer := prometheus.NewTimer(workflowLatency.WithLabelValues("get"))
defer timer.ObserveDuration()

var workflow models.Workflow
if err := s.db.First(&workflow, workflowID).Error; err != nil {
workflowOperations.WithLabelValues("get", "error").Inc()
return nil, fmt.Errorf("failed to retrieve workflow by ID: %w", err)
}
workflowOperations.WithLabelValues("get", "success").Inc()
return &workflow, nil
}

// ListWorkflows retrieves a paginated list of workflows from the database, filtered by status and type.
// The 'limit' parameter specifies the maximum number of workflows to return,
// 'offset' determines the starting point for pagination,
// 'status' allows filtering by workflow status, and 'workflowType' allows filtering by workflow type.
// It returns a slice of workflows and an error if the operation fails.
func (s *WorkflowRepo) ListWorkflow(ctx context.Context) ([]models.Workflow, error) {
timer := prometheus.NewTimer(workflowLatency.WithLabelValues("list"))
defer timer.ObserveDuration()

var workflows []models.Workflow

// Execute the query
if err := s.db.Find(&workflows).Error; err != nil {
workflowOperations.WithLabelValues("list", "error").Inc()
return nil, fmt.Errorf("failed to retrieve workflows: %w", err)
}

workflowOperations.WithLabelValues("list", "success").Inc()
return workflows, nil
}

// NewWorkflowRepo creates and returns a new instance of WorkflowRepo.
// It requires a GORM database connection and a River client for workflow queue management.
func NewWorkflowRepo(db *gorm.DB) interfaces.WorkflowRepo {
return &WorkflowRepo{
db: db,
}
}
23 changes: 23 additions & 0 deletions server/repository/interface/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package interfaces

import (
"context"

model "task/server/repository/model/task"
)

// ExecutionRepo defines the interface for the task history repository.
// It handles operations related to task history management.
//
//go:generate mockery --output=../mocks --case=underscore --all --with-expecter
type ExecutionRepo interface {
// CreateExecution creates a history entry for a task.
// It takes a context.Context parameter for handling request-scoped values and deadlines.
CreateExecution(ctx context.Context, execution model.Execution) (model.Execution, error)

// GetExecution retrieves the history of a task by its ID.
// Returns a slice of task history entries, or an error if none found.
GetExecution(ctx context.Context, taskID uint) (*model.Execution, error)

ListExecution(ctx context.Context) ([]model.Execution, error)
}
2 changes: 2 additions & 0 deletions server/repository/interface/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ package interfaces
type TaskManagmentInterface interface {
TaskRepo() TaskRepo
TaskHistoryRepo() TaskHistoryRepo
WorkflowRepo() WorkflowRepo
ExecutionRepo() ExecutionRepo
}
Loading

0 comments on commit 8efc245

Please sign in to comment.