Skip to content

Commit

Permalink
threaded book queue handler. all tests in python.
Browse files Browse the repository at this point in the history
  • Loading branch information
josteinaj committed Oct 23, 2017
1 parent 7f8c88a commit 15191d1
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 129 deletions.
104 changes: 61 additions & 43 deletions produksjonsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,49 @@
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from pathlib import Path
from threading import Thread, RLock

if sys.version_info[0] != 3:
print("# This script requires Python version 3.x")
sys.exit(1)

class Pipeline(PatternMatchingEventHandler):
observer = Observer()
lock = RLock()

# constants (set during instantiation)
inactivity_timeout = 10
observer = None
base = None
bookHandlerThread = None
bookHandlerThreadShouldRun = False

# dynamic (reset on stop(), changes over time)
queue = []
inactivity_timeout = 10

# other

def __init__(self, base):
super(Pipeline, self).__init__()
self.queue = [] # discards pre-existing files
self.base = base
super(Pipeline, self).__init__()

def start(self):
def start(self, inactivity_timeout=10):
self.inactivity_timeout = inactivity_timeout
self.observer = Observer()
self.observer.schedule(self, path=self.base, recursive=True)
self.observer.start()
self.bookHandlerThreadShouldRun = True
self.bookHandlerThread = Thread(target=self.handle_book_events_thread)
self.bookHandlerThread.setDaemon(True)
self.bookHandlerThread.start()

def stop(self):
self.observer.stop()
self.observer.join()
if self.bookHandlerThread:
self.bookHandlerThreadShouldRun = False
if self.observer:
self.observer.stop()
self.observer.join()
self.observer = None

def process(self, event):
source_path = Path(event.src_path).relative_to(self.base)
Expand Down Expand Up @@ -63,44 +84,30 @@ def process(self, event):
'event_type': str(event.event_type),
'is_directory': event.is_directory
}
# TODO: replace with functions (addBookEvent) to avoid code duplication
book_in_queue = False
for queue_item in self.queue:
if queue_item['book'] == book_event['book']:
book_in_queue = True
event_in_queue = False
for queue_event in queue_item['events']:
if queue_event == book_event:
event_in_queue = True
if not event_in_queue:
queue_item['events'].append(book_event)
queue_item['last_event'] = int(time.time())
break
if not book_in_queue:
self.queue.append({
'book': book_event['book'],
'events': [ book_event ],
'last_event': int(time.time())
})

self.addBookEvent(book_event)
if book_event['event_type'] == 'moved':
book_event['book'] = dest_path.parts[0]
self.addBookEvent(book_event)

def addBookEvent(self, event):
with self.lock:
book_in_queue = False
for queue_item in self.queue:
if queue_item['book'] == book_event['book']:
for item in self.queue:
if item['book'] == event['book']:
book_in_queue = True
event_in_queue = False
for queue_event in queue_item['events']:
if queue_event == book_event:
for queue_event in item['events']:
if queue_event == event:
event_in_queue = True
break
if not event_in_queue:
queue_item['events'].append(book_event)
queue_item['last_event'] = int(time.time())
item['events'].append(event)
item['last_event'] = int(time.time())
break
if not book_in_queue:
self.queue.append({
'book': book_event['book'],
'events': [ book_event ],
'book': event['book'],
'events': [ event ],
'last_event': int(time.time())
})

Expand All @@ -116,18 +123,30 @@ def on_moved(self, event):
def on_deleted(self, event):
self.process(event)

def handle_book_events_thread(self):
while self.bookHandlerThreadShouldRun:
try:
self.handle_book_events()
time.sleep(1)
except:
print("Unexpected error:", sys.exc_info()[0])

def handle_book_events(self):
x = [b['book'] + ": " + str(int(time.time()) - b['last_event']) for b in self.queue]
book = None

books = [b for b in self.queue if int(time.time()) - b['last_event'] > self.inactivity_timeout]
if not len(books):
return
book = books[0]
with self.lock:
x = [b['book'] + ": " + str(int(time.time()) - b['last_event']) for b in self.queue]

books = [b for b in self.queue if int(time.time()) - b['last_event'] > self.inactivity_timeout]
if not len(books):
return
book = books[0]

new_queue = [b for b in self.queue if b is not book]
self.queue = new_queue

new_queue = [b for b in self.queue if b is not book]
self.queue = new_queue
print("processing book: "+book['book'])


if __name__ == '__main__':
args = sys.argv[1:]
Expand All @@ -136,7 +155,6 @@ def handle_book_events(self):
try:
while True:
time.sleep(1)
pipeline.handle_book_events()
except KeyboardInterrupt:
pass
pipeline.stop()
145 changes: 111 additions & 34 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,131 @@ def fun(x):
return x + 1

class PipelineTest(unittest.TestCase):
def test(self):
target = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target')
dir_in = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target/in')
dir_out = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target/out')
if os.path.exists(target):
shutil.rmtree(target)
os.makedirs(dir_in)
os.makedirs(dir_out)

pipeline = Pipeline(dir_in)
self.assertEqual(len(pipeline.queue), 0)
pipeline.start()
target = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target')
dir_in = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target/in')
dir_out = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'target/out')
pipeline = None

def setUp(self):
print("TEST: setUp")
if os.path.exists(self.target):
shutil.rmtree(self.target)
os.makedirs(self.dir_in)
os.makedirs(self.dir_out)
self.pipeline = Pipeline(self.dir_in)

def tearDown(self):
print("TEST: tearDown")
time.sleep(2)
self.pipeline.stop()
time.sleep(1)

def test_file(self):
print("TEST: test_file")
self.pipeline.start(inactivity_timeout=3600)
time.sleep(1)
self.assertEqual(len(self.pipeline.queue), 0)

Path(os.path.join(dir_in, 'foo.epub')).touch()
Path(os.path.join(self.dir_in, 'foo.epub')).touch()
time.sleep(1)
self.assertEqual(len(pipeline.queue), 1)
self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'foo.epub']), 1)
self.assertEqual(len(self.pipeline.queue), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'foo.epub']), 1)

with open(os.path.join(dir_in, 'foo.epub'), "a") as f:
with open(os.path.join(self.dir_in, 'foo.epub'), "a") as f:
f.write("bar")
time.sleep(1)
self.assertEqual(len(pipeline.queue), 1)
self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'foo.epub']), 1)
self.assertEqual(len(self.pipeline.queue), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'foo.epub']), 1)

shutil.move(os.path.join(dir_in, 'foo.epub'), os.path.join(dir_in, 'bar.epub'))
shutil.move(os.path.join(self.dir_in, 'foo.epub'), os.path.join(self.dir_in, 'bar.epub'))
time.sleep(1)
self.assertEqual(len(pipeline.queue), 2)
self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'foo.epub']), 1)
self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'bar.epub']), 1)
self.assertEqual(len(self.pipeline.queue), 2)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'foo.epub']), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'bar.epub']), 1)

with open(os.path.join(dir_in, 'baz.epub'), "a") as f:
with open(os.path.join(self.dir_in, 'baz.epub'), "a") as f:
f.write("baz")
time.sleep(1)
self.assertEqual(len(pipeline.queue), 3)
self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'foo.epub']), 1)
self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'bar.epub']), 1)
self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'baz.epub']), 1)
self.assertEqual(len(self.pipeline.queue), 3)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'foo.epub']), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'bar.epub']), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'baz.epub']), 1)

os.remove(os.path.join(dir_in, 'bar.epub'))
os.remove(os.path.join(self.dir_in, 'bar.epub'))
time.sleep(1)
self.assertEqual(len(self.pipeline.queue), 3)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'foo.epub']), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'bar.epub']), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'baz.epub']), 1)

def test_folder(self):
print("TEST: test_folder")
# create three books before starting the pipeline
os.makedirs(os.path.join(self.dir_in, 'book1'))
Path(os.path.join(self.dir_in, 'book1/ncc.html')).touch()
os.makedirs(os.path.join(self.dir_in, 'book2'))
Path(os.path.join(self.dir_in, 'book2/ncc.html')).touch()
Path(os.path.join(self.dir_in, 'book2/image.png')).touch()
os.makedirs(os.path.join(self.dir_in, 'book3'))
Path(os.path.join(self.dir_in, 'book3/ncc.html')).touch()
time.sleep(1)
self.assertEqual(len(pipeline.queue), 3)
self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'foo.epub']), 1)
self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'bar.epub']), 1)
self.assertEqual(len([b['book'] for b in pipeline.queue if b['book'] == 'baz.epub']), 1)

time.sleep(2)
pipeline.stop()
# start the pipeline
self.pipeline.start(inactivity_timeout=3600)
time.sleep(1)

# there should be no books in the pipeline, even though there is a folder in the input directory
self.assertEqual(len(self.pipeline.queue), 0)

# modify the book
Path(os.path.join(self.dir_in, 'book1/audio1.mp3')).touch()
Path(os.path.join(self.dir_in, 'book1/audio2.mp3')).touch()
Path(os.path.join(self.dir_in, 'book1/content.html')).touch()
Path(os.path.join(self.dir_in, 'book1/image.png')).touch()
time.sleep(1)

# now there should be a book in the pipeline
self.assertEqual(len(self.pipeline.queue), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book1']), 1)

# move a file from book2 to book3
shutil.move(os.path.join(self.dir_in, 'book2/image.png'), os.path.join(self.dir_in, 'book3/image.png'))
time.sleep(1)

# now there should be 3 book in the pipeline
self.assertEqual(len(self.pipeline.queue), 3)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book1']), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book2']), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book3']), 1)

def test_queue_handler(self):
print("TEST: test_queue_handler")

# start the pipeline
self.pipeline.start(inactivity_timeout=10)
time.sleep(1)

# Create a book
Path(os.path.join(self.dir_in, 'book1')).touch()
time.sleep(3)
self.assertEqual(len(self.pipeline.queue), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book1']), 1)

# Create another book
Path(os.path.join(self.dir_in, 'book2')).touch()
time.sleep(3)
self.assertEqual(len(self.pipeline.queue), 2)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book1']), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book2']), 1)

# wait until 12 seconds after book1 was created
time.sleep(6)
self.assertEqual(len(self.pipeline.queue), 1)
self.assertEqual(len([b['book'] for b in self.pipeline.queue if b['book'] == 'book2']), 1)

# wait until 12 seconds after book2 was created
time.sleep(3)
self.assertEqual(len(self.pipeline.queue), 0)

if __name__ == '__main__':
unittest.main()
52 changes: 0 additions & 52 deletions test.sh

This file was deleted.

0 comments on commit 15191d1

Please sign in to comment.