Task Service - Case Study

Task Service

Table of Contents

Quick Start

To quickly get started with the Task Service, use the following command:

make bootstrap

docker-compose -f docker-compose.demo.yaml up

This will start all the necessary services, including the server, database, and worker.

Manual Setup

For a more detailed setup process, follow these steps:

1. Development Environment Setup

Install Pixi and activate the shell:

make bootstrap

# Run Database 
docker-compose up -d

2. Server (Control Plane)

Start the server:

make run-server

Access at

3. CLI Tool

Build and test:

make build-cli
./bin/task-cli --help

4. Dashboard Setup

Install dependencies and build:

npm install
npm run dev

Access at

5. Worker (Data Plane)

Start worker instances:

./bin/task-cli serve  --log-level debug

Project Structure

├── cmd/
│   ├── cli/            # CLI for task management
│   └── server/         # Server entry point
├── pkg/
│   ├── config/         # Configuration management
│   ├── gen/            # GRPC generated code
│   ├── plugins/        # Plugin model
│   ├── worker/         # Worker code
│   └── x/              # Utility functions         
├── idl/
│   └── proto/          # Protocol buffer definitions
├── clients/
│   └── dashboard/      # NextJS Dashboard
├── charts/
│   └── task/         # Helm charts for deployment
├── server/
│   ├── repository/            # Database ORM
│   └── root/         # Server Root 
│   └── route/         # All Server Routes
└── docs/               # Documentation files


The Task Service follows a distributed architecture with separate components for the control plane and data plane. Here's a high-level overview of the system:

graph TD
    %% Clients
    A[Dashboard Client] -->|Sends Request| B(Server)
    C[CLI Client] -->|Sends Request| B(Server)

    %% Control Plane
    subgraph Control Plane
        B(Server) -->|Reads/Writes| D[(PostgreSQL Database)]

    %% Data Plane
    subgraph Data Plane
        E[Agent] -->|Initiates Connection| B[Server]
        B[Server] -->|Publish W| E[Agent]
        E -->|Creates CRD| H[CRD]
        F[Controller] -->|Watches CRD| H
        F -->|Executes Task| J[Task Execution]
        F -->|Sends Status Update| B

This architecture allows for:

  • Separation of concerns between the control plane (server) and data plane (workers)
  • Scalability of worker nodes to handle increased workloads
  • Asynchronous task execution through message queuing
  • Real-time status updates from workers to the server

Database Operations

The server interacts with the database for persistent storage of tasks and their history. Here's a summary of the database operations:

  1. Read Operations

    • Get Task by ID
      • Purpose: Retrieve details of a specific task
      • Frequency: On-demand, triggered by API requests
    • List All Tasks
      • Purpose: Retrieve a list of all tasks
      • Frequency: On-demand, typically for dashboard or reporting
    • List Task History
      • Purpose: Retrieve the status change history of a specific task
      • Frequency: On-demand, for detailed task analysis
  2. Write Operations

    • Create New Task
      • Purpose: Store a newly created task
      • Frequency: Each time a new task is submitted
    • Update Task Status
      • Purpose: Modify the status of an existing task
      • Frequency: As task states change (e.g., from queued to running to completed)
    • Create Task History Entry
      • Purpose: Log task status changes and creation events
      • Frequency: On task creation and each status change

Database Schema

The Task Service uses a PostgreSQL database to store task and task history information. Below is an Entity-Relationship Diagram (ERD) representing the database schema:

    %% Task Model
    TASK {
        int id PK
        string name
        int type
        int status
        jsonb payload
        int retries
        int priority
        timestamp created_at
    %% TaskHistory Model
        int id PK
        int task_id FK
        int status
        string details
        timestamp created_at

    %% Relationships
    TASK ||--o{ TASK_HISTORY : has

    %% Indexes (described as comments)
    %% Indexes for TASK
    %% - idx_type_status (type, status)
    %% - idx_created_at (created_at)
    %% - idx_status_created_at (status, created_at)

    %% Indexes for TASK_HISTORY
    %% - idx_task_id_created_at (task_id, created_at)

Note: Ideally, we should create separate tables for tasks 📝 and task executions ⚙️. When a task is created, it should be added to the task table. Upon triggering an execution, a corresponding entry should be created in the execution table, and the execution data should be published to the PostgreSQL queue for processing 📬. This way, the task status remains unchanged, and only the execution status is updated in the execution table ✅.

Table Descriptions

  1. TASK

    • Stores information about individual tasks
    • id: Unique identifier for the task (Primary Key)
    • name: Name of the task
    • type: Type of the task (e.g., send_email, run_query)
    • status: Current status of the task (e.g., pending, running, completed)
    • payload: JSON object containing task-specific parameters
    • retries: Number of retry attempts for the task
    • priority: Priority level of the task
    • created_at: Timestamp of task creation

    • Tracks the history of status changes for tasks
    • id: Unique identifier for the history entry (Primary Key)
    • task_id: Foreign Key referencing the TASK table
    • status: Status of the task at the time of the history entry
    • details: Additional details about the status change
    • created_at: Timestamp of the history entry creation


  • One TASK can have many TASK_HISTORY entries (one-to-many relationship)


To optimize query performance, the following indexes are implemented:

  1. TASK table

    • idx_type_status: Composite index on type and status columns
    • idx_created_at: Index on created_at column
    • idx_status_created_at: Composite index on status and created_at columns
  2. TASK_HISTORY table

    • idx_task_id_created_at: Composite index on task_id and created_at columns

These indexes improve the efficiency of common queries such as filtering tasks by type and status, sorting by creation time, and retrieving task history.

Worker/Data Plane Process

The worker process follows a specific flow for task execution and error handling. Here's a detailed view of the worker's operation:

graph TD
    A[Receive Message] --> B{Update Status: RUNNING}
    B -->|Success| C[Run Task]
    B -->|Failure| D[Log Error]
    D --> K[Move to Next Message]

    C --> E{Task Execution}
    E -->|Success| F[Update Status: SUCCEEDED]
    E -->|Failure| G[Retry Logic]

    G --> H{Retry Attempt <= 3?}
    H -->|Yes| I[Backoff]
    I --> J[Update Status: RETRYING]
    J --> C
    H -->|No| K[Update Status: FAILED]

    F --> L[Move to Next Message]
    K --> L

API Documentation

CLI Usage

Task Management

The Task Service CLI provides several commands to manage tasks. Here's a detailed overview of each command and its available flags:

Create a Task

Create a new task with the specified name, type, and parameters.

task-cli task create [task name] --type [task type] --parameter [key=value]


  • --type, -t: Type of the task (e.g., send_email, run_query)
  • --parameter, -p: Additional parameters for the task as key=value pairs (can be used multiple times)


task-cli task create "Send Newsletter" --type send_email --parameter [email protected] --parameter subject="Weekly Update"

Get Task Details

Retrieve and display the details of a specific task by its ID.

task-cli  task get --id [task ID] [flags]


  • --id, -i: ID of the task (required)
  • --output, -o: Output format (table, json, yaml) (default: "table")


task-cli  task get --id 123 --output json

Get Task History

Retrieve and display the history of a specific task by its ID.

task-cli history --id [task ID] [flags]


  • --id, -i: ID of the task (required)
  • --output, -o: Output format (table, json, yaml) (default: "table")


task-cli  history --id 123 --output yaml

List All Tasks

Retrieve and display a list of all tasks.

task-cli task list [flags]


  • --output, -o: Output format (table, json, yaml) (default: "table")
  • --pageNumber, -n: Page number for pagination (default: 1)
  • --pageCount, -c: Number of items per page (default: 30)


task-cli task list
task-cli task list --output json
task-cli task list --pageNumber 2 --pageCount 20

Task Status

Retrieve the status counts of all tasks in the system.

task-cli task status

Aliases: s, stat


task-cli task status
task-cli task s

This command will display the count of tasks for each status (e.g., PENDING, RUNNING, SUCCEEDED, FAILED).

End-to-End Testing

Run end-to-end tests against the system to verify its functionality.

task-cli end2end [flags]


  • --num-tasks, -n: Number of tasks to create for the test (default: 100, max: 100)


task-cli end2end
task-cli end2end -n 50

This command will:

  1. Create the specified number of tasks (default 100)
  2. Monitor the tasks' completion status for up to 3 minutes
  3. Display progress every 5 seconds
  4. Report the final result (success or partial completion)

The test creates a mix of "run_query" and "send_email" task types to simulate a realistic workload.

Global Flags

The following flag is available for all task commands:

  • --log-level: Set the logging level (default: "error")
  • --address: Control Plane Address (default: "")


task-cli task list --log-level debug

Output Formats

All commands that display task information support three output formats:

  • table: Displays the information in a formatted table (default)
  • json: Outputs the data in JSON format
  • yaml: Outputs the data in YAML format

Use the --output or -o flag to specify the desired format.

Additional Information

  • Control plane (server) manages task creation, scheduling, and status updates
  • Data plane (workers) executes tasks (Currently part of same binary)
  • RiverQueue used for communication between control and data planes using postgres as queue backend
  • Explore the UI or CLI to create and manage tasks

Plugin Model

The Task Service uses a plugin-based architecture to allow for extensibility and customization of task execution. This model enables users to create their own task types and implement custom logic for task execution.

How It Works

  1. Plugin Interface: All plugins must implement the Plugin interface defined in @pkg/plugins/plugins.go. This interface requires a Run method:

    type Plugin interface {
        Run(parameters map[string]string) error
  2. Plugin Registration: Plugins are registered in the NewPlugin function in @pkg/plugins/plugins.go. This function acts as a factory, creating the appropriate plugin based on the task type:

    func NewPlugin(pluginType string) (Plugin, error) {
        switch pluginType {
        case email.PLUGIN_NAME:
            return &email.Email{}, nil
        case query.PLUGIN_NAME:
            return &query.Query{}, nil
        // Add more plugin types here
            return nil, fmt.Errorf("unknown plugin type: %s", pluginType)
  3. Custom Plugin Implementation: Users can create their own plugins by implementing the Plugin interface. For example, the Email plugin in @pkg/email/email.go:

    var PLUGIN_NAME = "send_email"
    type Email struct {}
    func (e *Email) Run(parameters map[string]string) error {
        // Implementation of email sending logic
        return nil
  4. Task Execution: When a task is executed, the system uses the NewPlugin function to create the appropriate plugin based on the task type. It then calls the Run method of the plugin, passing any necessary parameters.

Creating a New Plugin

To create a new plugin:

  1. Create a new package in the @pkg/plugins directory for your plugin.
  2. Implement the Plugin interface in your new package.
  3. Add your plugin to the NewPlugin function in @pkg/plugins/plugins.go.

This modular approach allows for easy extension of the Task Service with new task types and functionalities.

Testing in Kubernetes with Kind

This section guides you through setting up and testing the Task Service in a local Kubernetes cluster using Kind (Kubernetes in Docker) and Helm charts.



  1. Create a Kind cluster:
kind create cluster --name task-service
  1. Set kubectl context to the new cluster:
kubectl cluster-info --context kind-task-service
  1. Add the necessary Helm repositories:
make helm

Deploy Task Service

Install the Task Service Helm chart:

helm install task-service ./charts/task  -n task

Verify Deployment

Check that all pods are running:

kubectl get pods

Port Forward

kubectl port-forward service/task 80 -n task

Access the Service

  1. Port-forward the Task Service:
kubectl port-forward -n task svc/task 8080:80
  1. Access the service at

  2. Use CLI to verify the connection:

./bin/task-cli task l --address

Clean Up

To delete the Kind cluster and all resources:

kind delete cluster --name task-service

This setup allows you to test the entire Task Service stack, including the server, workers, and dependencies, in a local Kubernetes environment. It's an excellent way to validate the Helm charts and ensure everything works together as expected in a Kubernetes setting.