Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Merge pull request #4837 from golemfactory/provider_requestor_switch
Browse files Browse the repository at this point in the history
Provider/requestor switch
  • Loading branch information
Wiezzel authored Oct 30, 2019
2 parents 0fd4e36 + 4d97349 commit f3f8b25
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 30 deletions.
5 changes: 5 additions & 0 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,11 @@ def connection_status(self) -> Dict[str, Any]:
= Client._make_connection_status_human_readable_message(status)
return status

def has_assigned_task(self) -> bool:
if self.task_server is None:
return False
return self.task_server.task_computer.has_assigned_task()

def get_provider_status(self) -> Dict[str, Any]:
# golem is starting
if self.task_server is None:
Expand Down
12 changes: 9 additions & 3 deletions golem/task/requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TaskOp,
TaskStatus,
TASK_STATUS_ACTIVE,
TASK_STATUS_COMPLETED,
)
from golem.task.task_api import EnvironmentTaskApiService
from golem.task.timer import ProviderComputeTimers
Expand Down Expand Up @@ -235,6 +236,12 @@ def is_task_finished(task_id: TaskId) -> bool:
task = RequestedTask.get(RequestedTask.task_id == task_id)
return task.status.is_completed()

@staticmethod
def has_unfinished_tasks() -> bool:
""" Return True iff there are any tasks that need computation. """
return RequestedTask.select()\
.where(RequestedTask.status.not_in(TASK_STATUS_COMPLETED)).exists()

async def has_pending_subtasks(self, task_id: TaskId) -> bool:
""" Return True is there are pending subtasks waiting for
computation at the given moment. If there are the next call to
Expand Down Expand Up @@ -399,8 +406,8 @@ async def _verify(
if not self._get_pending_subtasks(task_id):
task.status = TaskStatus.finished
task.save()
self._notice_task_updated(task, op=TaskOp.FINISHED)
await self._shutdown_app_client(task.app_id)
self._notice_task_updated(task, op=TaskOp.FINISHED)

return result

Expand All @@ -418,9 +425,8 @@ async def abort_task(self, task_id: TaskId):
subtask.save()
self._finish_subtask(subtask, SubtaskOp.ABORTED)

self._notice_task_updated(task, op=TaskOp.ABORTED)

await self._abort_task_and_shutdown(task)
self._notice_task_updated(task, op=TaskOp.ABORTED)

@staticmethod
def get_requested_task(task_id: TaskId) -> Optional[RequestedTask]:
Expand Down
6 changes: 6 additions & 0 deletions golem/task/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,9 @@ def create_task_dry_run(self, task_dict) \
def create_task_api_task(self, task_params: dict, golem_params: dict):
logger.info('Creating Task API task. golem_params=%r', golem_params)

if self.client.has_assigned_task():
raise RuntimeError('Cannot create task while computing')

create_task_params = requestedtaskmanager.CreateTaskParams(
app_id=golem_params['app_id'],
name=golem_params['name'],
Expand Down Expand Up @@ -577,13 +580,16 @@ def create_task_api_task(self, task_params: dict, golem_params: dict):
create_task_params.max_subtasks,
)

self.client.update_setting('accept_tasks', False)

@defer.inlineCallbacks
def init_task():
try:
yield deferred_from_future(
self.requested_task_manager.init_task(task_id))
except Exception:
self.client.funds_locker.remove_task(task_id)
self.client.update_setting('accept_tasks', True)
raise
else:
self.requested_task_manager.start_task(task_id)
Expand Down
4 changes: 3 additions & 1 deletion golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,10 +903,12 @@ def finished_task_listener(self, event='default', task_id=None, op=None,
if not (event == 'task_status_updated'
and self.client.p2pservice):
return
if not (op in [TaskOp.FINISHED, TaskOp.TIMEOUT]):
if not (op in [TaskOp.FINISHED, TaskOp.TIMEOUT, TaskOp.ABORTED]):
return
self.client.p2pservice.remove_task(task_id)
self.client.funds_locker.remove_task(task_id)
if not self.requested_task_manager.has_unfinished_tasks():
self.client.update_setting('accept_tasks', True)

def _increase_trust_payment(self, node_id: str, amount: int):
Trust.PAYMENT.increase(node_id, self.max_trust)
Expand Down
11 changes: 9 additions & 2 deletions golem/task/taskstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ def is_creating(self) -> bool:
return self in [self.creating, self.errorCreating]

def is_completed(self) -> bool:
return self in [self.finished, self.aborted,
self.timeout, self.restarted]
return self in TASK_STATUS_COMPLETED

def is_preparing(self) -> bool:
return self in (
Expand All @@ -191,6 +190,14 @@ def is_active(self) -> bool:
return self in TASK_STATUS_ACTIVE


TASK_STATUS_COMPLETED = [
TaskStatus.finished,
TaskStatus.aborted,
TaskStatus.timeout,
TaskStatus.restarted
]


TASK_STATUS_ACTIVE = [
TaskStatus.sending,
TaskStatus.waiting,
Expand Down
8 changes: 8 additions & 0 deletions tests/golem/task/test_requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@ async def test_init_task(self, mock_client):
env_id=env_id,
prerequisites=prerequisites
)

# when
await self.rtm.init_task(task_id)
row = RequestedTask.get(RequestedTask.task_id == task_id)

# then
assert row.status == TaskStatus.creating
assert row.env_id == env_id
Expand All @@ -97,6 +99,7 @@ async def test_init_task(self, mock_client):
row.app_params
)
self.app_manager.enabled.assert_called_once_with(row.app_id)
assert self.rtm.has_unfinished_tasks()

@pytest.mark.asyncio
async def test_init_task_wrong_status(self, mock_client):
Expand Down Expand Up @@ -196,6 +199,7 @@ async def test_verify(self, freezer, mock_client):
mock_client.shutdown.assert_called_once_with()
assert task_row.status.is_completed() is True
assert subtask_row.status.is_finished() is True
assert not self.rtm.has_unfinished_tasks()

@pytest.mark.asyncio
@pytest.mark.freeze_time("1000")
Expand Down Expand Up @@ -224,6 +228,7 @@ async def test_verify_failed(self, freezer, mock_client):
mock_client.shutdown.assert_not_called()
assert task_row.status.is_active() is True
assert subtask_row.status == SubtaskStatus.failure
assert self.rtm.has_unfinished_tasks()

@pytest.mark.asyncio
@pytest.mark.freeze_time("1000")
Expand All @@ -250,6 +255,7 @@ async def test_abort(self, freezer, mock_client):
mock_client.shutdown.assert_called_once_with()
assert task_row.status == TaskStatus.aborted
assert subtask_row.status == SubtaskStatus.cancelled
assert not self.rtm.has_unfinished_tasks()

@pytest.mark.asyncio
async def test_task_timeout(self, mock_client):
Expand All @@ -266,6 +272,7 @@ async def test_task_timeout(self, mock_client):
assert self.rtm.is_task_finished(task_id)
mock_client.abort_task.assert_called_once_with(task_id)
mock_client.shutdown.assert_called_once_with()
assert not self.rtm.has_unfinished_tasks()

@pytest.mark.asyncio
async def test_task_timeout_with_subtask(self, mock_client):
Expand All @@ -286,6 +293,7 @@ async def test_task_timeout_with_subtask(self, mock_client):
RequestedSubtask.subtask_id == subtask_id)
assert task.status == TaskStatus.timeout
assert subtask.status == SubtaskStatus.timeout
assert not self.rtm.has_unfinished_tasks()

@pytest.mark.asyncio
async def test_get_started_tasks(self, mock_client):
Expand Down
23 changes: 22 additions & 1 deletion tests/golem/task/test_rpc_task_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import os
from pathlib import Path
import tempfile
import unittest
from unittest import mock
from mock import Mock
from mock import Mock, call

from golem.client import Client
from golem.ethereum import fundslocker, transactionsystem
Expand Down Expand Up @@ -60,7 +61,10 @@ def test_success(self):
}
golem_params = self.get_golem_params()
task_id = 'test_task_id'
self.client.has_assigned_task.return_value = False
self.requested_task_manager.create_task.return_value = task_id
self.requested_task_manager.init_task.return_value = asyncio.Future()
self.requested_task_manager.init_task.return_value.set_result(None)

new_task_id = self.rpc.create_task_api_task(task_params, golem_params)
self.assertEqual(task_id, new_task_id)
Expand Down Expand Up @@ -113,11 +117,28 @@ def test_success(self):
)

self.requested_task_manager.init_task.assert_called_once_with(task_id)
self.client.update_setting.assert_called_once_with(
'accept_tasks', False)

def test_has_assigned_task(self):
self.client.has_assigned_task.return_value = True

with self.assertRaises(RuntimeError):
self.rpc.create_task_api_task({}, self.get_golem_params())

self.requested_task_manager.create_task.assert_not_called()
self.requested_task_manager.init_task.assert_not_called()
self.client.funds_locker.lock_funds.assert_not_called()

def test_failed_init(self):
self.client.has_assigned_task.return_value = False
self.requested_task_manager.init_task.side_effect = Exception

task_id = self.rpc.create_task_api_task({}, self.get_golem_params())

self.client.funds_locker.remove_task.assert_called_once_with(task_id)
self.requested_task_manager.start_task.assert_not_called()
self.client.update_setting.assert_has_calls((
call('accept_tasks', False),
call('accept_tasks', True)
))
56 changes: 33 additions & 23 deletions tests/golem/task/test_taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1203,29 +1203,39 @@ def test_finished_task_listener(self, *_):
self.ts.client = Mock()
remove_task = self.ts.client.p2pservice.remove_task
remove_task_funds_lock = self.ts.client.funds_locker.remove_task

values = dict(TaskOp.__members__)
values.pop('FINISHED')
values.pop('TIMEOUT')

for value in values:
self.ts.finished_task_listener(op=value)
assert not remove_task.called

for value in values:
self.ts.finished_task_listener(event='task_status_updated',
op=value)
assert not remove_task.called

self.ts.finished_task_listener(event='task_status_updated',
op=TaskOp.FINISHED)
assert remove_task.called
assert remove_task_funds_lock.called

self.ts.finished_task_listener(event='task_status_updated',
op=TaskOp.TIMEOUT)
assert remove_task.call_count == 2
assert remove_task_funds_lock.call_count == 2
update_setting = self.ts.client.update_setting

self.ts.requested_task_manager = Mock()
self.ts.requested_task_manager.has_unfinished_tasks.return_value = False

# Listener should ignore events other than 'task_status_updated'
for op in TaskOp:
self.ts.finished_task_listener(op=op)

remove_task.assert_not_called()
remove_task_funds_lock.assert_not_called()
update_setting.assert_not_called()

relevant_ops = {TaskOp.FINISHED, TaskOp.TIMEOUT, TaskOp.ABORTED}
irrelevant_ops = set(TaskOp) - relevant_ops

# Listener should ignore irrelevant operations
for op in irrelevant_ops:
self.ts.finished_task_listener(event='task_status_updated', op=op)

remove_task.assert_not_called()
remove_task_funds_lock.assert_not_called()
update_setting.assert_not_called()

# Listener should fire for relevant operations
task_id = 'test_task'
for op in relevant_ops:
self.ts.finished_task_listener(
event='task_status_updated', task_id=task_id, op=op)
remove_task.assert_called_once_with(task_id)
remove_task_funds_lock.assert_called_once_with(task_id)
update_setting.assert_called_once_with('accept_tasks', True)
self.ts.client.reset_mock()


class TestSendResults(TaskServerTestBase):
Expand Down
12 changes: 12 additions & 0 deletions tests/golem/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,18 @@ def test_restore_locks(self, *_):
)


class TestHasAssignedTask(TestClientBase):

def test_no_task_server(self):
self.assertFalse(self.client.has_assigned_task())

def test_true(self):
task_computer = Mock()
task_computer.has_assigned_task.return_value = True
self.client.task_server = Mock(task_computer=task_computer)
self.assertTrue(self.client.has_assigned_task())


class TestGetTasks(TestClientBase):

def setUp(self):
Expand Down

0 comments on commit f3f8b25

Please sign in to comment.