From 8fb1012e8f3b53017322700fa96496ce011ec55e Mon Sep 17 00:00:00 2001 From: pattonw Date: Wed, 15 Feb 2023 16:44:27 -0500 Subject: [PATCH] remove parked tests --- parked_tests/test_blockwise_basics.py | 203 --------- parked_tests/test_multi_tasks.py | 597 -------------------------- parked_tests/test_worker_pool.py | 209 --------- 3 files changed, 1009 deletions(-) delete mode 100644 parked_tests/test_blockwise_basics.py delete mode 100644 parked_tests/test_multi_tasks.py delete mode 100644 parked_tests/test_worker_pool.py diff --git a/parked_tests/test_blockwise_basics.py b/parked_tests/test_blockwise_basics.py deleted file mode 100644 index 2da7d41d..00000000 --- a/parked_tests/test_blockwise_basics.py +++ /dev/null @@ -1,203 +0,0 @@ -from __future__ import absolute_import - -from .tmpdir_test import TmpDirTestCase -import daisy -import glob -import os -import logging - -logger = logging.getLogger(__name__) -daisy.scheduler._NO_SPAWN_STATUS_THREAD = True - - -class TestBlockwiseBasics(TmpDirTestCase): - - def test_callback(self): - - total_roi = daisy.Roi((0,), (100,)) - read_roi = daisy.Roi((0,), (5,)) - write_roi = daisy.Roi((0,), (3,)) - - outdir = self.path_to() - - ret = daisy.run_blockwise( - total_roi=total_roi, - read_roi=read_roi, - write_roi=write_roi, - process_function=lambda b: self.process_block(outdir, b), - num_workers=10) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, list(range(32))) - - def test_callback_failure(self): - - total_roi = daisy.Roi((0,), (100,)) - read_roi = daisy.Roi((0,), (5,)) - write_roi = daisy.Roi((0,), (3,)) - - outdir = self.path_to() - - ret = daisy.run_blockwise( - total_roi=total_roi, - read_roi=read_roi, - write_roi=write_roi, - process_function=lambda b: self.process_block(outdir, b, fail=16), - num_workers=10) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertFalse(ret) - expected_block_ids = list(range(32)) - expected_block_ids.remove(16) - self.assertEqual(block_ids, expected_block_ids) - - def test_worker(self): - - total_roi = daisy.Roi((0,), (100,)) - read_roi = daisy.Roi((0,), (5,)) - write_roi = daisy.Roi((0,), (3,)) - - outdir = self.path_to() - - ret = daisy.run_blockwise( - total_roi=total_roi, - read_roi=read_roi, - write_roi=write_roi, - process_function=lambda: self.worker(outdir), - num_workers=10) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, list(range(32))) - - def test_worker_failure(self): - - total_roi = daisy.Roi((0,), (100,)) - read_roi = daisy.Roi((0,), (5,)) - write_roi = daisy.Roi((0,), (3,)) - - outdir = self.path_to() - - ret = daisy.run_blockwise( - total_roi=total_roi, - read_roi=read_roi, - write_roi=write_roi, - process_function=lambda: self.worker(outdir, fail=16), - num_workers=10) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertFalse(ret) - expected_block_ids = list(range(32)) - expected_block_ids.remove(16) - self.assertEqual(block_ids, expected_block_ids) - - def test_negative_offset(self): - - logger.warning("A warning") - - total_roi = daisy.Roi( - (-100,), - (2369,)) - block_write_roi = daisy.Roi( - (0,), - (500,)) - block_read_roi = block_write_roi.grow( - (100,), - (100,)) - - outdir = self.path_to() - - ret = daisy.run_blockwise( - total_roi, - block_read_roi, - block_write_roi, - process_function=lambda b: self.process_block(outdir, b), - num_workers=1, - fit='shrink') - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(len(block_ids), 5) - - def test_multidim(self): - - total_roi = daisy.Roi( - (199, -100, -100, -100), - (12, 5140, 2248, 2369)) - block_write_roi = daisy.Roi( - (0, 0, 0, 0), - (5, 500, 500, 500)) - block_read_roi = block_write_roi.grow( - (1, 100, 100, 100), - (1, 100, 100, 100)) - - outdir = self.path_to() - - ret = daisy.run_blockwise( - total_roi, - block_read_roi, - block_write_roi, - process_function=lambda b: self.process_block(outdir, b), - num_workers=8, - processes=False, - fit='shrink') - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(len(block_ids), 500) - - def process_block(self, outdir, block, fail=None): - - logger.debug("Processing block %s", block) - - if block.block_id == fail: - raise RuntimeError("intended failure") - - path = os.path.join(outdir, '%d.block' % block.block_id) - with open(path, 'w') as f: - f.write(str(block.block_id)) - - def worker(self, outdir, fail=None): - - client = daisy.Client() - - while True: - - block = client.acquire_block() - if block is None: - break - - self.process_block(outdir, block, fail) - - client.release_block(block, 0) diff --git a/parked_tests/test_multi_tasks.py b/parked_tests/test_multi_tasks.py deleted file mode 100644 index 108ebefb..00000000 --- a/parked_tests/test_multi_tasks.py +++ /dev/null @@ -1,597 +0,0 @@ -from __future__ import absolute_import - -from .tmpdir_test import TmpDirTestCase -import daisy -import glob -import os -import logging - -logger = logging.getLogger(__name__) -daisy.scheduler._NO_SPAWN_STATUS_THREAD = True - - -class TestMultipleTasks(TmpDirTestCase): - - def test_single(self): - '''Tests a vanilla task''' - outdir = self.path_to('') - - # this task generates 0-10 - task = self.LeafTask(outdir=outdir) - task_spec = {'task': task} - - expected_block_ids = list(range(10)) - - ret = daisy.distribute([task_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_single_with_request(self): - '''Tests a task with request for a subset ROI''' - outdir = self.path_to('') - - task = self.LeafTask(outdir=outdir) - task_spec = {'task': task, 'request': [daisy.Roi((3,), (2,))]} - - ret = daisy.distribute([task_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - expected_block_ids = list(range(3, 5)) - self.assertEqual(block_ids, expected_block_ids) - - def test_null_request(self): - '''Tests a task with request for null ROI''' - outdir = self.path_to('') - - task = self.LeafTask(outdir=outdir) - task_spec = {'task': task, 'request': [daisy.Roi((3,), (0,))]} - - ret = daisy.distribute([task_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - expected_block_ids = [] - self.assertEqual(block_ids, expected_block_ids) - - def test_multi(self): - '''Tests multiple different task targets''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.LeafTask(outdir=outdir) - expected_block_ids = list(range(0, 10)) - # this task generates 20-30 - task1 = self.LeafTaskAnother(outdir=outdir) - expected_block_ids += list(range(20, 30)) - - task0_spec = {'task': task0} - task1_spec = {'task': task1} - ret = daisy.distribute([task0_spec, task1_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_multi_with_request(self): - '''Tests multiple different task targets with requests''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.LeafTask(outdir=outdir) - # this task generates 20-30 - task1 = self.LeafTaskAnother(outdir=outdir) - - task0_spec = {'task': task0, 'request': [daisy.Roi((3,), (2,))]} - expected_block_ids = list(range(3, 5)) - task1_spec = {'task': task1, 'request': [daisy.Roi((27,), (2,))]} - expected_block_ids += list(range(27, 29)) - - ret = daisy.distribute([task0_spec, task1_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_multi_with_request_same(self): - '''Tests multiple same task targets with requests''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.LeafTask(outdir=outdir) - - task0_spec = {'task': task0, 'request': [daisy.Roi((3,), (2,))]} - expected_block_ids = list(range(3, 5)) - task1_spec = {'task': task0, 'request': [daisy.Roi((7,), (1,))]} - expected_block_ids += list(range(7, 8)) - - ret = daisy.distribute([task0_spec, task1_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_multi_with_request_same_overlapping(self): - '''Tests multiple same task targets with overlapping requests''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.LeafTask(outdir=outdir) - - task0_spec = {'task': task0, 'request': [daisy.Roi((3,), (7,))]} - task1_spec = {'task': task0, 'request': [daisy.Roi((5,), (5,))]} - expected_block_ids = list(range(3, 10)) - - ret = daisy.distribute([task0_spec, task1_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_request_outside_error(self): - '''Tests request that lies outside of total_roi''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.LeafTask(outdir=outdir) - - task0_spec = {'task': task0, 'request': [daisy.Roi((3,), (17,))]} - - try: - daisy.distribute([task0_spec]) - except RuntimeError: - pass - except Exception as e: - print(e) - self.assertFalse(True) # fail for any other Exceptions - - def test_task_chain(self): - '''Tests vanilla task chain''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.ParentTask(outdir=outdir) - task0_spec = {'task': task0} - expected_block_ids = list(range(0, 10)) - - ret = daisy.distribute([task0_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_task_chain_with_request(self): - '''Tests task chain with request''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.ParentTask(outdir=outdir) - task0_spec = {'task': task0, 'request': [daisy.Roi((3,), (2,))]} - expected_block_ids = list(range(3, 5)) - - ret = daisy.distribute([task0_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_task_chain_multi(self): - '''Tests multiple tasks with the same dependency''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.ParentTask(outdir=outdir) - task0_spec = {'task': task0} - # this task also generates 0-10 - task1 = self.ParentTaskAnother(outdir=outdir) - task1_spec = {'task': task1} - # their deps are merged - expected_block_ids = list(range(0, 10)) - - ret = daisy.distribute([task0_spec, task1_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_task_chain_multi_with_request(self): - '''Tests multiple tasks with the same dependency and with request''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.ParentTask(outdir=outdir) - task0_spec = {'task': task0, 'request': [daisy.Roi((1,), (2,))]} - # this task also generates 0-10 - task1 = self.ParentTaskAnother(outdir=outdir) - task1_spec = {'task': task1, 'request': [daisy.Roi((7,), (2,))]} - # their deps are merged - expected_block_ids = list(range(1, 3)) - expected_block_ids += list(range(7, 9)) - - ret = daisy.distribute([task0_spec, task1_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_task_chain_multi_with_overlapping_request(self): - '''Tests multiple tasks with the same dependency and with request''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.ParentTask(outdir=outdir) - task0_spec = {'task': task0, 'request': [daisy.Roi((1,), (5,))]} - # this task also generates 0-10 - task1 = self.ParentTaskAnother(outdir=outdir) - task1_spec = {'task': task1, 'request': [daisy.Roi((2,), (5,))]} - # their deps are merged - expected_block_ids = list(range(1, 7)) - - ret = daisy.distribute([task0_spec, task1_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_task_chain_multi_with_mixed_request(self): - '''Tests multiple tasks with the same dependency and with request''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.ParentTask(outdir=outdir) - task0_spec = {'task': task0, 'request': [daisy.Roi((1,), (2,))]} - # this task also generates 0-10 - task1 = self.ParentTaskAnother(outdir=outdir) - task1_spec = {'task': task1} - # their deps are merged - expected_block_ids = list(range(0, 10)) - - ret = daisy.distribute([task0_spec, task1_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_task_request_alignment(self): - '''Tests multiple tasks with the same dependency and with request''' - outdir = self.path_to('') - - # this task generates 0-5 - task0 = self.TaskWriteRoi2(outdir=outdir) - task0_spec = {'task': task0} - expected_block_ids = list(range(0, 5)) - - ret = daisy.distribute([task0_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_task_request_alignment_with_req(self): - '''Tests multiple tasks with the same dependency and with request''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.TaskWriteRoi2(outdir=outdir) - task0_spec = {'task': task0, 'request': [daisy.Roi((2,), (2,))]} - # request lies in block 1 - expected_block_ids = list(range(1, 2)) - - ret = daisy.distribute([task0_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_task_request_alignment_with_req2(self): - '''Tests multiple tasks with the same dependency and with request''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.TaskWriteRoi2(outdir=outdir) - task0_spec = {'task': task0, 'request': [daisy.Roi((1,), (2,))]} - # request lies between block 0 and block 1 - expected_block_ids = list(range(0, 2)) - - ret = daisy.distribute([task0_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.block')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_block_ids) - - def test_task_request_alignment_with_req3(self): - '''Tests multiple tasks with the same dependency and with request''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.TaskWriteRoi22(outdir=outdir) - task0_spec = {'task': task0} - # request lies between block 0 and block 1 - expected_write_begins = list(range(1, 11, 2)) - - ret = daisy.distribute([task0_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.write_roi')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_write_begins) - - def test_task_request_alignment_with_req4(self): - '''Tests multiple tasks with the same dependency and with request''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.TaskWriteRoi22(outdir=outdir) - task0_spec = {'task': task0, 'request': [daisy.Roi((1,), (2,))]} - expected_write_begins = [1] - - ret = daisy.distribute([task0_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.write_roi')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_write_begins) - - def test_task_request_alignment_with_req5(self): - '''Tests multiple tasks with the same dependency and with request''' - outdir = self.path_to('') - - # this task generates 0-10 - task0 = self.TaskWriteRoi22(outdir=outdir) - task0_spec = {'task': task0, 'request': [daisy.Roi((1,), (6,))]} - # requesting 1, 2, 3, 4, 5, 6 - # is satisfied with 3 write blocks - expected_write_begins = [1, 3, 5] - - ret = daisy.distribute([task0_spec]) - - outfiles = glob.glob(os.path.join(outdir, '*.write_roi')) - block_ids = sorted([ - int(path.split('/')[-1].split('.')[0]) - for path in outfiles - ]) - - self.assertTrue(ret) - self.assertEqual(block_ids, expected_write_begins) - - class TaskWriteRoi22(daisy.Task): - - outdir = daisy.Parameter() - - def prepare(self): - - total_roi = daisy.Roi((1,), (10,)) - read_roi = daisy.Roi((0,), (2,)) - write_roi = daisy.Roi((0,), (2,)) - - self.schedule( - total_roi, - read_roi, - write_roi, - # process_function=TestMultipleTasks.process_block, - process_function=lambda: TestMultipleTasks.worker(self.outdir), - max_retries=0, - fit='shrink') - - class TaskWriteRoi2(daisy.Task): - - outdir = daisy.Parameter() - - def prepare(self): - - total_roi = daisy.Roi((0,), (10,)) - read_roi = daisy.Roi((0,), (2,)) - write_roi = daisy.Roi((0,), (2,)) - - self.schedule( - total_roi, - read_roi, - write_roi, - # process_function=TestMultipleTasks.process_block, - process_function=lambda: TestMultipleTasks.worker(self.outdir), - max_retries=0, - fit='shrink') - - class LeafTask(daisy.Task): - - outdir = daisy.Parameter() - - def prepare(self): - - total_roi = daisy.Roi((0,), (10,)) - read_roi = daisy.Roi((0,), (1,)) - write_roi = daisy.Roi((0,), (1,)) - - self.schedule( - total_roi, - read_roi, - write_roi, - # process_function=TestMultipleTasks.process_block, - process_function=lambda: TestMultipleTasks.worker(self.outdir), - max_retries=0, - fit='shrink') - - class LeafTaskAnother(daisy.Task): - - outdir = daisy.Parameter() - - def prepare(self): - - total_roi = daisy.Roi((20,), (10,)) - read_roi = daisy.Roi((0,), (1,)) - write_roi = daisy.Roi((0,), (1,)) - - self.schedule( - total_roi, - read_roi, - write_roi, - process_function=lambda: TestMultipleTasks.worker(self.outdir), - max_retries=0, - fit='shrink') - - class ParentTask(daisy.Task): - - outdir = daisy.Parameter() - - def prepare(self): - - total_roi = daisy.Roi((0,), (10,)) - read_roi = daisy.Roi((0,), (1,)) - write_roi = daisy.Roi((0,), (1,)) - - self.schedule( - total_roi, - read_roi, - write_roi, - process_function=TestMultipleTasks.process_block_null, - max_retries=0, - fit='shrink') - - def requires(self): - return [TestMultipleTasks.LeafTask(outdir=self.outdir)] - - class ParentTaskAnother(daisy.Task): - - outdir = daisy.Parameter() - - def prepare(self): - - total_roi = daisy.Roi((0,), (10,)) - read_roi = daisy.Roi((0,), (1,)) - write_roi = daisy.Roi((0,), (1,)) - - self.schedule( - total_roi, - read_roi, - write_roi, - process_function=TestMultipleTasks.process_block_null, - max_retries=0, - fit='shrink') - - def requires(self): - return [TestMultipleTasks.LeafTask(outdir=self.outdir)] - - def process_block(outdir, block, fail=None): - - logger.debug("Processing block %s", block) - - if block.block_id == fail: - raise RuntimeError("intended failure") - - path = os.path.join(outdir, '%d.block' % block.block_id) - with open(path, 'w') as f: - f.write(str(block.block_id)) - - # print(block.read_roi.get_begin()) - # print(block.write_roi.get_begin()[0]) - path = os.path.join( - outdir, '%d.write_roi' % block.write_roi.get_begin()[0]) - with open(path, 'w') as f: - f.write(str(block.write_roi)) - - def process_block_null(block): - return 0 - - def worker(outdir, fail=None): - - client = daisy.Client() - - while True: - - block = client.acquire_block() - if block is None: - break - - TestMultipleTasks.process_block(outdir, block, fail) - - client.release_block(block, 0) diff --git a/parked_tests/test_worker_pool.py b/parked_tests/test_worker_pool.py deleted file mode 100644 index fa9ce7a2..00000000 --- a/parked_tests/test_worker_pool.py +++ /dev/null @@ -1,209 +0,0 @@ -from time import sleep -import daisy -import daisy.tcp -import unittest -import logging -import subprocess - -logging.basicConfig(level=logging.DEBUG) - - -class Request(daisy.Message): - def __init__(self, request_id): - self.request_id = request_id - - -class Answer(daisy.Message): - def __init__(self, answer_id): - self.answer_id = answer_id - - -class UnluckyNumberException(Exception): - pass - - -faulty_worker_id = None -early_exit_worker_id = None - - -def basic_worker(): - while True: - sleep(1) - - -def context_worker(): - - context = daisy.Context.from_env() - - logging.debug("started worker %s", context['worker_id']) - - while True: - sleep(1) - - -def error_worker(): - - context = daisy.Context.from_env() - - logging.debug("started worker %s", context['worker_id']) - - if context['worker_id'] == str(faulty_worker_id): - raise UnluckyNumberException( - "%d is an unlucky number!" % - faulty_worker_id) - - while True: - sleep(1) - - -def early_exit_worker(): - - context = daisy.Context.from_env() - - logging.debug("started worker %s", context['worker_id']) - - if context['worker_id'] == str(early_exit_worker_id): - return - - while True: - sleep(1) - - -def command_worker(): - subprocess.check_call(['python', __file__]) - - -def message_worker(): - - context = daisy.Context.from_env() - worker_id = int(context['worker_id']) - server_name = context['server_name'] - server_port = int(context['server_port']) - - logging.debug("started worker %d", worker_id) - logging.debug("connecting to %s:%d", server_name, server_port) - - client = daisy.tcp.TCPClient(server_name, server_port) - - logging.debug("sending request...") - client.send_message(Request(worker_id)) - logging.debug("waiting for reply...") - reply = client.get_message() - logging.debug("got reply %s", reply) - assert isinstance(reply, Answer) - assert reply.answer_id == worker_id + 1 - - -class TestWorkerPool(unittest.TestCase): - - def test_basic(self): - - pool = daisy.WorkerPool(basic_worker) - - pool.set_num_workers(10) - sleep(1) - pool.check_for_errors() - pool.set_num_workers(5) - sleep(1) - pool.check_for_errors() - pool.set_num_workers(20) - sleep(1) - pool.check_for_errors() - pool.stop() - - def test_context(self): - - context = daisy.Context(task_id=0) - - pool = daisy.WorkerPool(context_worker, context) - - pool.set_num_workers(10) - sleep(1) - pool.check_for_errors() - pool.set_num_workers(5) - sleep(1) - pool.check_for_errors() - pool.set_num_workers(20) - sleep(1) - pool.check_for_errors() - pool.stop() - - def test_error(self): - - global faulty_worker_id - faulty_worker_id = daisy.Worker.get_next_id() + 13 - - context = daisy.Context(task_id=0) - - pool = daisy.WorkerPool(error_worker, context) - - pool.set_num_workers(10) - sleep(1) - pool.check_for_errors() - pool.set_num_workers(5) - sleep(1) - pool.check_for_errors() - pool.set_num_workers(20) - sleep(1) - with self.assertRaises(UnluckyNumberException) as cm: - pool.check_for_errors() - self.assertTrue('unlucky' in str(cm.exception)) - pool.stop() - - def test_early_exit(self): - - global early_exit_worker_id - early_exit_worker_id = daisy.Worker.get_next_id() + 13 - - context = daisy.Context(task_id=0) - - pool = daisy.WorkerPool(early_exit_worker, context) - - pool.set_num_workers(10) - sleep(1) - pool.check_for_errors() - pool.set_num_workers(5) - sleep(1) - pool.check_for_errors() - pool.set_num_workers(20) - sleep(1) - pool.check_for_errors() - pool.stop() - - def test_command(self): - - context = daisy.Context(task_id=0) - pool = daisy.WorkerPool(command_worker, context) - - pool.set_num_workers(10) - sleep(1) - pool.check_for_errors() - pool.set_num_workers(5) - sleep(1) - pool.check_for_errors() - pool.set_num_workers(20) - sleep(1) - pool.check_for_errors() - pool.stop() - - def test_tcp_server(self): - - server = daisy.tcp.TCPServer() - context = daisy.Context( - task_id=0, - server_name=server.address[0], - server_port=server.address[1]) - pool = daisy.WorkerPool(message_worker, context) - - pool.set_num_workers(10) - - for _ in range(10): - message = server.get_message() - if isinstance(message, Request): - message.stream.send_message(Answer(message.request_id + 1)) - pool.check_for_errors() - - -if __name__ == "__main__": - - sleep(100)