Skip to content

Commit

Permalink
Merge pull request scrapy#10 from scrapy/sqlite-queue
Browse files Browse the repository at this point in the history
SQLite persistent queue
  • Loading branch information
dangra committed Sep 9, 2015
2 parents 89f6df0 + 227a425 commit 175de8f
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
51 changes: 50 additions & 1 deletion queuelib/queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import struct
import glob
import json
import struct
import sqlite3
from collections import deque


Expand Down Expand Up @@ -171,3 +172,51 @@ def close(self):

def __len__(self):
return self.size


class FifoSQLiteQueue(object):

_sql_create = (
'CREATE TABLE IF NOT EXISTS queue '
'(id INTEGER PRIMARY KEY AUTOINCREMENT, item BLOB)'
)
_sql_size = 'SELECT COUNT(*) FROM queue'
_sql_push = 'INSERT INTO queue (item) VALUES (?)'
_sql_pop = 'SELECT id, item FROM queue ORDER BY id LIMIT 1'
_sql_del = 'DELETE FROM queue WHERE id = ?'

def __init__(self, path):
self._path = os.path.abspath(path)
self._db = sqlite3.Connection(self._path, timeout=60)
self._db.text_factory = bytes
with self._db as conn:
conn.execute(self._sql_create)

def push(self, item):
with self._db as conn:
conn.execute(self._sql_push, (item,))

def pop(self):
with self._db as conn:
for id_, item in conn.execute(self._sql_pop):
conn.execute(self._sql_del, (id_,))
return item

def close(self):
size = len(self)
self._db.close()
if not size:
os.remove(self._path)

def __len__(self):
with self._db as conn:
return next(conn.execute(self._sql_size))[0]


class LifoSQLiteQueue(FifoSQLiteQueue):

_sql_pop = 'SELECT id, item FROM queue ORDER BY id DESC LIMIT 1'


#FifoDiskQueue = FifoSQLiteQueue # noqa
#LifoDiskQueue = LifoSQLiteQueue # noqa
13 changes: 12 additions & 1 deletion queuelib/tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import glob

from queuelib.queue import FifoMemoryQueue, LifoMemoryQueue, FifoDiskQueue, LifoDiskQueue
from queuelib.queue import (
FifoMemoryQueue, LifoMemoryQueue, FifoDiskQueue, LifoDiskQueue,
FifoSQLiteQueue, LifoSQLiteQueue,
)
from queuelib.tests import QueuelibTestCase


Expand Down Expand Up @@ -220,5 +223,13 @@ def test_file_size_shrinks(self):
assert os.path.getsize(self.qpath), size


class FifoSQLiteQueueTest(FifoTestMixin, PersistentTestMixin, QueuelibTestCase):

def queue(self):
return FifoSQLiteQueue(self.qpath)


class LifoSQLiteQueueTest(LifoTestMixin, PersistentTestMixin, QueuelibTestCase):

def queue(self):
return LifoSQLiteQueue(self.qpath)

0 comments on commit 175de8f

Please sign in to comment.