From 15191d16a90374c4b5dd9625652332b406d66336 Mon Sep 17 00:00:00 2001 From: Jostein Austvik Jacobsen Date: Mon, 23 Oct 2017 16:59:46 +0200 Subject: [PATCH] threaded book queue handler. all tests in python. --- produksjonsystem.py | 104 ++++++++++++++++++------------- test.py | 145 +++++++++++++++++++++++++++++++++----------- test.sh | 52 ---------------- 3 files changed, 172 insertions(+), 129 deletions(-) delete mode 100755 test.sh diff --git a/produksjonsystem.py b/produksjonsystem.py index 35a4a168..fe27f508 100755 --- a/produksjonsystem.py +++ b/produksjonsystem.py @@ -6,28 +6,49 @@ from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler from pathlib import Path +from threading import Thread, RLock if sys.version_info[0] != 3: print("# This script requires Python version 3.x") sys.exit(1) class Pipeline(PatternMatchingEventHandler): - observer = Observer() + lock = RLock() + + # constants (set during instantiation) + inactivity_timeout = 10 + observer = None base = None + bookHandlerThread = None + bookHandlerThreadShouldRun = False + + # dynamic (reset on stop(), changes over time) queue = [] - inactivity_timeout = 10 + + # other def __init__(self, base): - super(Pipeline, self).__init__() + self.queue = [] # discards pre-existing files self.base = base + super(Pipeline, self).__init__() - def start(self): + def start(self, inactivity_timeout=10): + self.inactivity_timeout = inactivity_timeout + self.observer = Observer() self.observer.schedule(self, path=self.base, recursive=True) self.observer.start() + self.bookHandlerThreadShouldRun = True + self.bookHandlerThread = Thread(target=self.handle_book_events_thread) + self.bookHandlerThread.setDaemon(True) + self.bookHandlerThread.start() def stop(self): - self.observer.stop() - self.observer.join() + if self.bookHandlerThread: + self.bookHandlerThreadShouldRun = False + if self.observer: + self.observer.stop() + self.observer.join() + self.observer = None def process(self, event): source_path = Path(event.src_path).relative_to(self.base) @@ -63,44 +84,30 @@ def process(self, event): 'event_type': str(event.event_type), 'is_directory': event.is_directory } - # TODO: replace with functions (addBookEvent) to avoid code duplication - book_in_queue = False - for queue_item in self.queue: - if queue_item['book'] == book_event['book']: - book_in_queue = True - event_in_queue = False - for queue_event in queue_item['events']: - if queue_event == book_event: - event_in_queue = True - if not event_in_queue: - queue_item['events'].append(book_event) - queue_item['last_event'] = int(time.time()) - break - if not book_in_queue: - self.queue.append({ - 'book': book_event['book'], - 'events': [ book_event ], - 'last_event': int(time.time()) - }) - + self.addBookEvent(book_event) if book_event['event_type'] == 'moved': book_event['book'] = dest_path.parts[0] + self.addBookEvent(book_event) + + def addBookEvent(self, event): + with self.lock: book_in_queue = False - for queue_item in self.queue: - if queue_item['book'] == book_event['book']: + for item in self.queue: + if item['book'] == event['book']: book_in_queue = True event_in_queue = False - for queue_event in queue_item['events']: - if queue_event == book_event: + for queue_event in item['events']: + if queue_event == event: event_in_queue = True + break if not event_in_queue: - queue_item['events'].append(book_event) - queue_item['last_event'] = int(time.time()) + item['events'].append(event) + item['last_event'] = int(time.time()) break if not book_in_queue: self.queue.append({ - 'book': book_event['book'], - 'events': [ book_event ], + 'book': event['book'], + 'events': [ event ], 'last_event': int(time.time()) }) @@ -116,18 +123,30 @@ def on_moved(self, event): def on_deleted(self, event): self.process(event) + def handle_book_events_thread(self): + while self.bookHandlerThreadShouldRun: + try: + self.handle_book_events() + time.sleep(1) + except: + print("Unexpected error:", sys.exc_info()[0]) + def handle_book_events(self): - x = [b['book'] + ": " + str(int(time.time()) - b['last_event']) for b in self.queue] + book = None - books = [b for b in self.queue if int(time.time()) - b['last_event'] > self.inactivity_timeout] - if not len(books): - return - book = books[0] + with self.lock: + x = [b['book'] + ": " + str(int(time.time()) - b['last_event']) for b in self.queue] + + books = [b for b in self.queue if int(time.time()) - b['last_event'] > self.inactivity_timeout] + if not len(books): + return + book = books[0] + + new_queue = [b for b in self.queue if b is not book] + self.queue = new_queue - new_queue = [b for b in self.queue if b is not book] - self.queue = new_queue print("processing book: "+book['book']) - + if __name__ == '__main__': args = sys.argv[1:] @@ -136,7 +155,6 @@ def handle_book_events(self): try: while True: time.sleep(1) - pipeline.handle_book_events() except KeyboardInterrupt: pass pipeline.stop() diff --git a/test.py b/test.py index f5017bb9..7dceb66f 100755 --- a/test.py +++ b/test.py @@ -13,54 +13,131 @@ def fun(x): return x + 1 class PipelineTest(unittest.TestCase): - def test(self): - target = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target') - dir_in = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target/in') - dir_out = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target/out') - if os.path.exists(target): - shutil.rmtree(target) - os.makedirs(dir_in) - os.makedirs(dir_out) - - pipeline = Pipeline(dir_in) - self.assertEqual(len(pipeline.queue), 0) - pipeline.start() + target = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target') + dir_in = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target/in') + dir_out = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target/out') + pipeline = None + + def setUp(self): + print("TEST: setUp") + if os.path.exists(self.target): + shutil.rmtree(self.target) + os.makedirs(self.dir_in) + os.makedirs(self.dir_out) + self.pipeline = Pipeline(self.dir_in) + + def tearDown(self): + print("TEST: tearDown") + time.sleep(2) + self.pipeline.stop() + time.sleep(1) + + def test_file(self): + print("TEST: test_file") + self.pipeline.start(inactivity_timeout=3600) time.sleep(1) + self.assertEqual(len(self.pipeline.queue), 0) - Path(os.path.join(dir_in, 'foo.epub')).touch() + Path(os.path.join(self.dir_in, 'foo.epub')).touch() time.sleep(1) - self.assertEqual(len(pipeline.queue), 1) - self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'foo.epub']), 1) + self.assertEqual(len(self.pipeline.queue), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'foo.epub']), 1) - with open(os.path.join(dir_in, 'foo.epub'), "a") as f: + with open(os.path.join(self.dir_in, 'foo.epub'), "a") as f: f.write("bar") time.sleep(1) - self.assertEqual(len(pipeline.queue), 1) - self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'foo.epub']), 1) + self.assertEqual(len(self.pipeline.queue), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'foo.epub']), 1) - shutil.move(os.path.join(dir_in, 'foo.epub'), os.path.join(dir_in, 'bar.epub')) + shutil.move(os.path.join(self.dir_in, 'foo.epub'), os.path.join(self.dir_in, 'bar.epub')) time.sleep(1) - self.assertEqual(len(pipeline.queue), 2) - self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'foo.epub']), 1) - self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'bar.epub']), 1) + self.assertEqual(len(self.pipeline.queue), 2) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'foo.epub']), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'bar.epub']), 1) - with open(os.path.join(dir_in, 'baz.epub'), "a") as f: + with open(os.path.join(self.dir_in, 'baz.epub'), "a") as f: f.write("baz") time.sleep(1) - self.assertEqual(len(pipeline.queue), 3) - self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'foo.epub']), 1) - self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'bar.epub']), 1) - self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'baz.epub']), 1) + self.assertEqual(len(self.pipeline.queue), 3) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'foo.epub']), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'bar.epub']), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'baz.epub']), 1) - os.remove(os.path.join(dir_in, 'bar.epub')) + os.remove(os.path.join(self.dir_in, 'bar.epub')) + time.sleep(1) + self.assertEqual(len(self.pipeline.queue), 3) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'foo.epub']), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'bar.epub']), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'baz.epub']), 1) + + def test_folder(self): + print("TEST: test_folder") + # create three books before starting the pipeline + os.makedirs(os.path.join(self.dir_in, 'book1')) + Path(os.path.join(self.dir_in, 'book1/ncc.html')).touch() + os.makedirs(os.path.join(self.dir_in, 'book2')) + Path(os.path.join(self.dir_in, 'book2/ncc.html')).touch() + Path(os.path.join(self.dir_in, 'book2/image.png')).touch() + os.makedirs(os.path.join(self.dir_in, 'book3')) + Path(os.path.join(self.dir_in, 'book3/ncc.html')).touch() time.sleep(1) - self.assertEqual(len(pipeline.queue), 3) - self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'foo.epub']), 1) - self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'bar.epub']), 1) - self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'baz.epub']), 1) - time.sleep(2) - pipeline.stop() + # start the pipeline + self.pipeline.start(inactivity_timeout=3600) + time.sleep(1) + + # there should be no books in the pipeline, even though there is a folder in the input directory + self.assertEqual(len(self.pipeline.queue), 0) + + # modify the book + Path(os.path.join(self.dir_in, 'book1/audio1.mp3')).touch() + Path(os.path.join(self.dir_in, 'book1/audio2.mp3')).touch() + Path(os.path.join(self.dir_in, 'book1/content.html')).touch() + Path(os.path.join(self.dir_in, 'book1/image.png')).touch() + time.sleep(1) + + # now there should be a book in the pipeline + self.assertEqual(len(self.pipeline.queue), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book1']), 1) + + # move a file from book2 to book3 + shutil.move(os.path.join(self.dir_in, 'book2/image.png'), os.path.join(self.dir_in, 'book3/image.png')) + time.sleep(1) + + # now there should be 3 book in the pipeline + self.assertEqual(len(self.pipeline.queue), 3) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book1']), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book2']), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book3']), 1) + + def test_queue_handler(self): + print("TEST: test_queue_handler") + + # start the pipeline + self.pipeline.start(inactivity_timeout=10) + time.sleep(1) + + # Create a book + Path(os.path.join(self.dir_in, 'book1')).touch() + time.sleep(3) + self.assertEqual(len(self.pipeline.queue), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book1']), 1) + + # Create another book + Path(os.path.join(self.dir_in, 'book2')).touch() + time.sleep(3) + self.assertEqual(len(self.pipeline.queue), 2) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book1']), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book2']), 1) + + # wait until 12 seconds after book1 was created + time.sleep(6) + self.assertEqual(len(self.pipeline.queue), 1) + self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book2']), 1) + + # wait until 12 seconds after book2 was created + time.sleep(3) + self.assertEqual(len(self.pipeline.queue), 0) if __name__ == '__main__': unittest.main() diff --git a/test.sh b/test.sh deleted file mode 100755 index f97fdac5..00000000 --- a/test.sh +++ /dev/null @@ -1,52 +0,0 @@ -#!/bin/bash - -# fail and exit on first error -set -e - -# go to script dir -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -cd "$DIR" - -# make resource dir for testing and start produksjonsystem -mkdir -p target/resources -./produksjonsystem.py "$DIR/target/resources" & -sleep 1 - -# do some basic stuff -cd target/resources -touch foo.xml -sleep 0.5 -echo bar >> foo.xml -sleep 0.5 -mv foo.xml bar.xml -sleep 0.5 -rm bar.xml -sleep 2 - -mkdir -p book1 -sleep 0.5 -touch book1/ncc.html -sleep 0.5 -touch book1/audio1.mp3 -sleep 0.5 -touch book1/audio2.mp3 -sleep 0.5 -touch book1/content.html -sleep 0.5 -touch book1/image.png -sleep 2 - -mkdir -p book2 -sleep 0.5 -touch book2/ncc.html -sleep 0.5 -mv book1/image.png book2/image.png - -sleep 15 - -rm * -r - -# stop produksjonsystem -ps aux | grep produksjonsystem.py | grep -v grep | awk '{print $2}' | xargs kill -wait 2>/dev/null -sleep 1