Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue Manager #286

Closed
timlinux opened this issue Sep 18, 2024 · 1 comment
Closed

Queue Manager #286

timlinux opened this issue Sep 18, 2024 · 1 comment
Assignees

Comments

@timlinux
Copy link
Contributor

timlinux commented Sep 18, 2024

For iterating over the tree the plan is to implement a Queue that runs QgsTasks.

We will then iterate over all the layer objects in the tree and queue them up to be processed.

Each layer will have a workflow instance created for it based on the layer properties:

                        {
                            "Layer": "WBL 2024 Workplace Index Score",
                            "ID": "Workplace_Index",
                            "Text": "Workplace DiscriminationThis tab represents data at the national level and must be standardized on a scale ranging from 0 to 5. This indicator is composed by the Workplace Index score of the WBL 2024. The data is already formatted on scale from 1 to 100.Input: The input field takes in the WBL index score.Output:  A raster file containing values ranging from 0 to 5 is saved to the output directory. A value of 5 indicates areas with a high level of workplace discrimination afforded to women, whereas a value of 0 indicates areas where no workplace discrimination are afforded to women. ",
                            "Default Index Score": 83.8,
                            "Index Score": 100,
                            "Use Default Index Score": 1,
                            "Default Multi Buffer Distances": "0,0,0",
                            "Use Multi Buffer Point": 0,
                            "Default Single Buffer Distance": 0,
                            "Use Single Buffer Point": 0,
                            "Default pixel": 0,
                            "Use Create Grid": 0,
                            "Use OSM Downloader": 0,
                            "Use Bbox for AOI": 0,
                            "Use Rasterize Layer": 1,
                            "Use WBL Downloader": 1,
                            "Use Humdata Downloader": 0,
                            "Use Mapillary Downloader": 0,
                            "Use Other Downloader": 0,
                            "Use Add Layers Manually": 0,
                            "Use Classify Poly into Classes": 0,
                            "Use CSV to Point Layer": 0,
                            "Use Poly per Cell": 0,
                            "Use Polyline per Cell": 0,
                            "Use Point per Cell": 0,
                            "Analysis Mode": "Don\u2019t Use",
                            "Layer Required": 1
                        }

We need different workflows based on which 'Use' option the user has chosen for each layer. For example if the layer has clicked the radio button for 'Use Point per Cell' in the UI, there should be a point_per_cell workflow that perfoms the needed steps to generate a liekert scale raster from the point layer, grid layer, bbox and parameters set in the dictionary (e.g. layer source for points)

The approach will be as per the following diagram:

classDiagram
    class WorkflowFactory {
      +create_workflow(params: dict): WorkflowJob
    }
    
    class WorkflowJob {
      +execute()
    }

    class TaskA {
      +execute()
    }

    class TaskB {
      +execute()
    }

    class WorkflowTask {
      -workflow: WorkflowJob
      +run(): bool
      +finished(result: bool)
      +cancel(): bool
    }

    class QueueManager {
      -task_queue: Queue
      -running_tasks: list
      +add_task(workflow: WorkflowJob)
      +_process_next_task()
      +_task_finished(task: WorkflowTask, result: bool)
    }

    WorkflowFactory --> WorkflowJob : "creates"
    WorkflowJob <|-- TaskA
    WorkflowJob <|-- TaskB
    QueueManager --> WorkflowTask : "manages"
    WorkflowTask --> WorkflowJob : "executes"



Loading
@timlinux
Copy link
Contributor Author

timlinux commented Sep 20, 2024

Components Overview

WorkflowQueueManager:

The main interface to manage the task queue.
Uses a thread pool to run multiple workflows concurrently.
It receives task attributes, uses WorkflowFactory to create tasks, and manages the queue of jobs.

WorkflowQueue:

Manages a queue of WorkflowJob tasks.
Handles concurrent execution by utilizing QGIS's task management system (QgsTaskManager).
Ensures that a certain number of tasks run concurrently, respecting the thread pool size.

WorkflowJob:

Represents individual workflow tasks that execute based on attributes.
Uses QgsFeedback to provide progress reporting and handle cancellation requests.
Created using WorkflowFactory, which generates the appropriate workflow (e.g., SpatialAnalysisWorkflow or TemporalAnalysisWorkflow).

WorkflowFactory:

Creates specific workflows based on the Analysis Mode provided in the task's attributes.
Each workflow accepts a QgsFeedback object for progress reporting and cancellation handling.

Concrete Workflow Implementations:

Rasterize Layer Workflow would be a concrete workflows created by the factory.
These workflows use the QgsFeedback object to report progress, check for cancellation, and perform the actual task logic.

Example usage:

# Create the WorkflowFactory
workflow_factory = WorkflowFactory()

# Create the WorkflowQueueManager with a thread pool size of 4
workflow_queue_manager = WorkflowQueueManager(pool_size=4, workflow_factory=workflow_factory)

# Example attributes for tasks
attributes_a = {'Analysis Mode': 'Spatial Analysis', 'some_key': 'value1'}
attributes_b = {'Analysis Mode': 'Temporal Analysis', 'some_key': 'value2'}

# Add tasks to the WorkflowQueueManager
workflow_queue_manager.add_task(attributes_a)
workflow_queue_manager.add_task(attributes_b)

# Start processing the queue
workflow_queue_manager.start_processing()
classDiagram
    class WorkflowQueueManager {
        +add_task(attributes: dict): void
        +start_processing(): void
        +cancel_processing(): void
    }

    class WorkflowQueue {
        +add_job(job: WorkflowJob): void
        +start_processing(): void
        +process_queue(): void
    }

    class WorkflowJob {
        +run(): bool
        +feedback(): QgsFeedback
    }

    class WorkflowFactory {
        +create_workflow(attributes: dict, feedback: QgsFeedback): Workflow
    }

    class SpatialAnalysisWorkflow {
        +execute(): bool
    }

    class TemporalAnalysisWorkflow {
        +execute(): bool
    }

    WorkflowQueueManager --> WorkflowQueue : "Manages"
    WorkflowQueue --> WorkflowJob : "Processes"
    WorkflowJob --> WorkflowFactory : "Uses"
    WorkflowFactory --> SpatialAnalysisWorkflow : "Creates"
    WorkflowFactory --> TemporalAnalysisWorkflow : "Creates"
    WorkflowJob --> QgsFeedback : "Uses for Progress and Cancellation"
Loading

@timlinux timlinux self-assigned this Sep 20, 2024
timlinux added a commit that referenced this issue Sep 23, 2024
timlinux added a commit that referenced this issue Sep 26, 2024
timlinux added a commit that referenced this issue Sep 27, 2024
Improve gpkg creation for project
Fixes #286
timlinux added a commit that referenced this issue Sep 27, 2024
Writing attributes to layers for study area
Fixes #286
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant