Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(task-services): Add task related queries and mutations #41

Merged
merged 11 commits into from
Oct 18, 2024
68 changes: 67 additions & 1 deletion pyaqueduct/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from __future__ import annotations

from datetime import datetime
from uuid import UUID
from typing import List, Optional
from uuid import UUID

from pydantic import (
BaseModel,
Expand All @@ -20,6 +20,7 @@
from pyaqueduct.experiment import Experiment
from pyaqueduct.extensions import Extension
from pyaqueduct.settings import Settings
from pyaqueduct.task import Task


class API(BaseModel):
Expand Down Expand Up @@ -179,3 +180,68 @@ def get_extensions(self) -> List[Extension]:
)
for extension_data in self._client.get_extensions()
]

@validate_call
def get_task(
self,
task_id: UUID,
) -> Task:
"""Get task by passing task_id.

Returns:
Task object
"""
task = self._client.get_task(
task_id=task_id
)
return Task(
client=self._client,
uuid=task_id,
created_by=task.created_by,
received_at=task.received_at,
experiment=task.experiment,
extension_name=task.extension_name,
action_name=task.action_name,
parameters=task.parameters,
)

@validate_call
def get_tasks( # pylint: disable=too-many-arguments, duplicate-code
self,
limit: PositiveInt = 10,
jatinriverlane marked this conversation as resolved.
Show resolved Hide resolved
offset: NonNegativeInt = 0,
extension_name: Optional[str] = None,
experiment_uuid: Optional[str] = None,
action_name: Optional[str] = None,
username: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> List[Task]:
"""Get task by passing task_id.

Returns:
Task object
"""
tasks = self._client.get_tasks(
limit=limit,
offset=offset,
extension_name=extension_name,
experiment_uuid=experiment_uuid,
action_name=action_name,
username=username,
start_date=start_date,
end_date=end_date,
)
return [
Task(
client=self._client,
uuid=task.task_id,
created_by=task.created_by,
received_at=task.received_at,
experiment=task.experiment,
extension_name=task.extension_name,
action_name=task.action_name,
parameters=task.parameters
)
for task in tasks
jatinriverlane marked this conversation as resolved.
Show resolved Hide resolved
]
89 changes: 82 additions & 7 deletions pyaqueduct/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

from pyaqueduct.client.experiment_types import ExperimentData, ExperimentsInfo, TagsData
from pyaqueduct.client.extension_types import (
ExtensionCancelResultData,
ExtensionData,
ExtensionExecutionResultData,
)
from pyaqueduct.client.task_types import TaskData
from pyaqueduct.exceptions import (
FileDownloadError,
FileRemovalError,
Expand All @@ -30,6 +32,7 @@
)
from pyaqueduct.schemas.mutations import (
add_tags_to_experiment_mutation,
cancel_task_mutation,
create_experiment_mutation,
execute_extension_action_mutation,
remove_experiment_mutation,
Expand All @@ -41,6 +44,8 @@
get_all_tags_query,
get_experiment_query,
get_experiments_query,
get_task_query,
get_tasks_query,
)


Expand All @@ -55,7 +60,7 @@ def process_response_common(code: codes) -> None:
if code is codes.UNAUTHORIZED:
raise UnAuthorizedError("API token couldn't be verified or is missing.") from None

raise RemoteOperationError("Remove operation failed.")
raise RemoteOperationError("Remote operation failed.")


class AqueductClient(BaseModel):
Expand Down Expand Up @@ -296,11 +301,11 @@ def remove_tag_from_experiment(self, experiment_uuid: UUID, tag: str) -> Experim
Remove a tag from an experiment

Args:
- experiment_uuid (UUID): UUID of experiment frin which tag has to be removed
- tag (str): Tag to be removed from experiment
experiment_uuid (UUID): UUID of experiment frin which tag has to be removed
tag (str): Tag to be removed from experiment

Returns:
Experiment: Experiment having tag removed
Experiment: Experiment having tag removed
"""
data = self.fetch_response(
remove_tag_from_experiment_mutation,
Expand Down Expand Up @@ -338,9 +343,9 @@ def get_tags(self, limit: int, offset: int, dangling: bool = True) -> TagsData:
Get a list of existing tags

Args:
- limit (int): Number of tags to be fetched
- offset (offset): Number of tags to skip
- dangling (bool): If tags not linked to any experiment should be included or not
limit (int): Number of tags to be fetched
offset (offset): Number of tags to skip
dangling (bool): If tags not linked to any experiment should be included or not

Returns:
List[str]: A list of all existing tags
Expand Down Expand Up @@ -490,3 +495,73 @@ def execute_extension_action(
result.returnCode,
)
return result

def get_task(self, task_id: UUID) -> TaskData:
"""Get details for a submitted taks

Args:
task_id: Task identifier
"""
task_result = self.fetch_response(get_task_query, variable_values={"taskId": str(task_id)})

result = TaskData.from_dict(task_result["task"]) # pylint: disable=unsubscriptable-object
return result

def get_tasks( # pylint: disable=too-many-arguments
self,
limit: int,
offset: int,
extension_name: Optional[str] = None,
experiment_uuid: Optional[str] = None,
action_name: Optional[str] = None,
username: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> TaskData:
"""Get details for a submitted taks

Args:
limit: Pagination field, number of tasks to be fetched.
offset: Pagination field, number of tasks to skip.
extensionName: Name of extension for which task was ran.
actionName: Name of action for which task was ran.
experimentUuid: Uuid of experiment for which task was ran.
username: Username of user who ran the task.
startDate: Start datetime to filter experiments (timezone aware).
endDate: End datetime to filter experiments to (timezone aware).
"""
task_result = self.fetch_response(
get_tasks_query,
variable_values={
"limit": limit,
"offset": offset,
"extensionName": extension_name,
"experimentUuid": experiment_uuid,
"actionName": action_name,
"username": username,
"startDate": start_date,
"endDate": end_date,
},
)

result = [
TaskData.from_dict(task)
for task in task_result["tasks"]["tasksData"] # pylint: disable=unsubscriptable-object
]
return result

def cancel_task(self, task_id: str) -> ExtensionCancelResultData:
"""Stops and cancels task running in Celery

Args:
task_id: Task identifier
"""
revoke_result = self.fetch_response( # pylint: disable=unsubscriptable-object
cancel_task_mutation,
variable_values={"taskId": task_id},
)["cancelTask"]

result = ExtensionCancelResultData.from_dict(
revoke_result
) # pylint: disable=unsubscriptable-object
return result
37 changes: 37 additions & 0 deletions pyaqueduct/client/extension_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,25 @@ class ExtensionParameterData(BaseModel):
defaultValue: Optional[str]
options: Optional[List[str]]

@classmethod
def from_dict(cls, data: dict) -> ExtensionParameterData:
"""Composes an object from server response

Args:
data: server response

Returns:
Object populated with server response data
"""
return cls(
name=data["name"],
displayName=data["displayName"],
description=data["description"],
dataType=data["dataType"],
defaultValue=data["defaultValue"],
options=data["options"],
)
jatinriverlane marked this conversation as resolved.
Show resolved Hide resolved


class ExtensionActionData(BaseModel):
"""Executable extension action."""
Expand Down Expand Up @@ -68,3 +87,21 @@ def from_dict(cls, data: dict) -> ExtensionExecutionResultData:
Object populated with server response data.
"""
return ExtensionExecutionResultData(**data)


class ExtensionCancelResultData(BaseModel):
"""Results for task cancellation"""

taskStatus: Optional[str] = None

@classmethod
def from_dict(cls, data: dict) -> ExtensionCancelResultData:
"""Compose an object from a server response.

Args:
data: server response

Returns:
Object populated with server response data
"""
return ExtensionCancelResultData(**data)
101 changes: 101 additions & 0 deletions pyaqueduct/client/task_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
""" The module contains classes to represent task-related responses from the server. """

from __future__ import annotations

from datetime import datetime
from typing import List, Optional
from uuid import UUID

from pydantic import BaseModel

from pyaqueduct.client.experiment_types import ExperimentData
from pyaqueduct.client.extension_types import ExtensionParameterData


class ParameterData(BaseModel):
"""Definition for task parameters"""

key: ExtensionParameterData
value: str

@classmethod
def from_dict(cls, data: dict) -> ParameterData:
"""Composes an object from server response

Args:
data: server response

Returns:
Object populated with server response data
"""
return cls(
key=ExtensionParameterData.from_dict(data["key"]),
value=data["value"],
)
jatinriverlane marked this conversation as resolved.
Show resolved Hide resolved

class TaskData(BaseModel):
"""Parameter definition for a task."""

task_id: UUID
task_status: str
extension_name: str
action_name: str
created_by: str
received_at: datetime
experiment: ExperimentData
parameters: List[ParameterData]
result_code: Optional[int] = None
ended_at: Optional[datetime] = None
std_out: Optional[str] = None
std_err: Optional[str] = None

@classmethod
def from_dict(cls, data: dict) -> TaskData:
"""Composes an object from a server response.

Args:
data: server response.

Returns:
Object populated with server response data.
"""
return cls(
task_id=UUID(data["uuid"]),
task_status=data["taskStatus"],
result_code=data["resultCode"],
extension_name=data["extensionName"],
action_name=data["actionName"],
created_by=data["createdBy"],
received_at=data["receivedAt"],
ended_at=data["endedAt"],
std_out=data["stdOut"],
std_err=data["stdErr"],
experiment=ExperimentData.from_dict(data["experiment"]),
parameters=[
ParameterData.from_dict(parameter) for parameter in data["parameters"]
]
)


class TasksData(BaseModel):
"""Parameter definition for tasks"""

tasks: List[TaskData]
total_count: int

@classmethod
def from_dict(cls, data):
"""Composes an object from a server response

Args:
data: server response

Returns:
Object populated with server response data
"""
return cls(
tasks=[
TaskData.from_dict(task_data) for task_data in data["tasksData"]
],
total_count=data["totalTasksCount"],
)
3 changes: 3 additions & 0 deletions pyaqueduct/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ class Experiment(BaseModel):

_client: AqueductClient
"Client object reference."

uuid: UUID
"UUID of the experiment. This is an internal experiment identifier in the database"

eid: str
"EID of the experiment. User-readable identifier, it is unique within one Aqueduct installation"

created_at: datetime
"Creation datetime of the experiment."

Expand Down
Loading
Loading