Skip to content
This repository has been archived by the owner on Nov 5, 2019. It is now read-only.

Implement cursor.get_more() and tailable cursors, add tailing sample app #39

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions asyncmongo/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ def _parse_header(self, header):
def _parse_response(self, response):
# logging.info('got data %r' % response)
callback = self.__callback
request_id = self.__request_id
self.__request_id = None
self.__callback = None
if not self.__deferred_message:
# skip adding to the cache because there is something else
Expand All @@ -169,7 +167,8 @@ def _parse_response(self, response):
self.__pool.cache(self)

try:
response = helpers._unpack_response(response, request_id) # TODO: pass tz_awar
# TODO: pass tz_aware and as_class
response = helpers._unpack_response(response)
except Exception, e:
logging.error('error %s' % e)
callback(None, e)
Expand Down
154 changes: 132 additions & 22 deletions asyncmongo/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
"tailable_cursor": 2,
"slave_okay": 4,
"oplog_replay": 8,
"no_timeout": 16}
"no_timeout": 16,
"await_data": 32,
"exhaust": 64,
"partial": 128}

class Cursor(object):
""" Cursor is a class used to call oeprations on a given db/collection using a specific connection pool.
Expand All @@ -36,12 +39,15 @@ def __init__(self, dbname, collection, pool):
assert isinstance(dbname, (str, unicode))
assert isinstance(collection, (str, unicode))
assert isinstance(pool, object)


self.__id = None
self.__dbname = dbname
self.__collection = collection
self.__pool = pool
self.__slave_okay = False

self.__retrieved = 0
self.__killed = False

@property
def full_collection_name(self):
return u'%s.%s' % (self.__dbname, self.__collection)
Expand Down Expand Up @@ -269,8 +275,9 @@ def find_one(self, spec_or_id, **kwargs):
def find(self, spec=None, fields=None, skip=0, limit=0,
timeout=True, snapshot=False, tailable=False, sort=None,
max_scan=None, slave_okay=False,
await_data=False,
_must_use_master=False, _is_command=False,
callback=None):
callback=None, batch_size=0):
"""Query the database.

The `spec` argument is a prototype document that all results
Expand All @@ -287,6 +294,25 @@ def find(self, spec=None, fields=None, skip=0, limit=0,

Raises :class:`TypeError` if any of the arguments are of
improper type.

Returns a cursor. find() calls your callback either with an error,
or with the first 100 documents. You must call get_more() repeatedly
on the cursor until it is exhausted:

class Handler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
self.cursor = self.db.collection.find({}, batch_size=300,
callback=self._on_response
)

def _on_response(self, response, error):
assert not error
self.write(str(response))
if self.cursor.alive:
self.cursor.get_more(self._on_response)
else:
self.finish()

:Parameters:
- `spec` (optional): a SON object specifying elements which
Expand Down Expand Up @@ -318,14 +344,23 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
continue from the last document received. For details, see
the `tailable cursor documentation
<http://www.mongodb.org/display/DOCS/Tailable+Cursors>`_.
.. versionadded:: 1.2
- `sort` (optional): a list of (key, direction) pairs
specifying the sort order for this query. See
:meth:`~pymongo.cursor.Cursor.sort` for details.
- `max_scan` (optional): limit the number of documents
examined when performing the query
- `slave_okay` (optional): is it okay to connect directly
to and perform queries on a slave instance

- `await_data` (optional): if True, the server will block for
some extra time before returning, waiting for more data to
return. Ignored if `tailable` is False.
.. versionadded:: 1.2
- `callback` (optional): a function that takes arguments (result,
error): a list of result documents, or an Exception
- `batch_size`: The size of each batch of results requested.
.. versionadded:: 1.2

.. mongodoc:: find
"""

Expand All @@ -344,6 +379,8 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
raise TypeError("snapshot must be an instance of bool")
if not isinstance(tailable, bool):
raise TypeError("tailable must be an instance of bool")
if not isinstance(await_data, bool):
raise TypeError("await_data must be an instance of bool")
if not callable(callback):
raise TypeError("callback must be callable")

Expand All @@ -357,10 +394,11 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
self.__fields = fields
self.__skip = skip
self.__limit = limit
self.__batch_size = 0
self.__batch_size = batch_size

self.__timeout = timeout
self.__tailable = tailable
self.__await_data = tailable and await_data
self.__snapshot = snapshot
self.__ordering = sort and helpers._index_document(sort) or None
self.__max_scan = max_scan
Expand All @@ -371,34 +409,77 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
self.__tz_aware = False #collection.database.connection.tz_aware
self.__must_use_master = _must_use_master
self.__is_command = _is_command


ntoreturn = self.__batch_size
if self.__limit:
if self.__batch_size:
ntoreturn = min(self.__limit, self.__batch_size)
else:
ntoreturn = self.__limit

connection = self.__pool.connection()
try:
connection.send_message(
message.query(self.__query_options(),
self.full_collection_name,
self.__skip,
self.__limit,
ntoreturn,
self.__query_spec(),
self.__fields),
callback=functools.partial(self._handle_response, orig_callback=callback))
except Exception, e:
logging.error('Error sending query %s' % e)
connection.close()
raise

return self

def get_more(self, callback, batch_size=None):
"""
Calls the given callback when more data is available from a find()
command.

:Parameters:
- `callback` (optional): a function that takes arguments (result,
error): a list of result documents, or an Exception
- `batch_size`: The size of each batch of results requested.
"""
if batch_size is None:
batch_size = self.__batch_size
if self.__limit:
limit = self.__limit - self.__retrieved
if batch_size:
limit = min(limit, batch_size)
else:
limit = batch_size

connection = self.__pool.connection()
try:
connection.send_message(
message.get_more(
self.full_collection_name,
limit,
self.__id),
callback=functools.partial(self._handle_response, orig_callback=callback))
except Exception, e:
logging.error('Error in get_more %s' % e)
connection.close()
raise

def _handle_response(self, result, error=None, orig_callback=None):
if result and result.get('cursor_id'):
connection = self.__pool.connection()
try:
connection.send_message(
message.kill_cursors([result['cursor_id']]),
callback=None)
except Exception, e:
logging.error('Error killing cursor %s: %s' % (result['cursor_id'], e))
connection.close()
raise

if result:
self.__retrieved += result.get('number_returned', 0)
if self.__id and result.get('cursor_id'):
assert self.__id == result.get('cursor_id')
else:
self.__id = result.get('cursor_id')

if self.__retrieved >= self.__limit > 0:
self.kill()

if result and 0 == result.get('cursor_id'):
self.__killed = True

if error:
logging.error('%s %s' % (self.full_collection_name , error))
orig_callback(None, error=error)
Expand All @@ -409,12 +490,13 @@ def _handle_response(self, result, error=None, orig_callback=None):
else:
orig_callback(result['data'], error=None)


def __query_options(self):
"""Get the query options string to use for this query."""
options = 0
if self.__tailable:
options |= _QUERY_OPTIONS["tailable_cursor"]
if self.__await_data:
options |= _QUERY_OPTIONS["await_data"]
if self.__slave_okay or self.__pool._slave_okay:
options |= _QUERY_OPTIONS["slave_okay"]
if not self.__timeout:
Expand All @@ -437,5 +519,33 @@ def __query_spec(self):
if self.__max_scan:
spec["$maxScan"] = self.__max_scan
return spec



@property
def alive(self):
"""Does this cursor have the potential to return more data?

This is mostly useful with `tailable cursors
<http://www.mongodb.org/display/DOCS/Tailable+Cursors>`_
since they will stop iterating even though they *may* return more
results in the future.

.. versionadded:: 1.2
"""
return not self.__killed

def kill(self):
if self.alive and self.__id:
self.__killed = True
logging.debug('killing cursor %s', self.__id)
connection = self.__pool.connection()
try:
connection.send_message(
message.kill_cursors([self.__id]),
callback=None)
except Exception, e:
logging.error('Error killing cursor %s: %s' % (self.__id, e))
connection.close()
raise

def __del__(self):
self.kill()
6 changes: 1 addition & 5 deletions asyncmongo/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ def _unpack_response(response, cursor_id=None, as_class=dict, tz_aware=False):
"""
response_flag = struct.unpack("<i", response[:4])[0]
if response_flag & 1:
# Shouldn't get this response if we aren't doing a getMore
assert cursor_id is not None

raise InterfaceError("cursor id '%s' not valid at server" %
cursor_id)
raise InterfaceError("cursor not valid at server")
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
if error_object["$err"] == "not master":
Expand Down
44 changes: 44 additions & 0 deletions test/eventually.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
import time
import unittest

from tornado import ioloop

class AssertEventuallyTest(unittest.TestCase):
def setUp(self):
super(AssertEventuallyTest, self).setUp()

# Callbacks registered with assertEventuallyEqual()
self.assert_callbacks = set()

def assertEventuallyEqual(
self, expected, fn, msg=None, timeout_sec=None
):
if timeout_sec is None:
timeout_sec = 5
timeout_sec = max(timeout_sec, int(os.environ.get('TIMEOUT_SEC', 0)))
start = time.time()
loop = ioloop.IOLoop.instance()

def callback():
try:
self.assertEqual(expected, fn(), msg)
# Passed
self.assert_callbacks.remove(callback)
if not self.assert_callbacks:
# All asserts have passed
loop.stop()
except AssertionError:
# Failed -- keep waiting?
if time.time() - start < timeout_sec:
# Try again in about 0.1 seconds
loop.add_timeout(time.time() + 0.1, callback)
else:
# Timeout expired without passing test
loop.stop()
raise

self.assert_callbacks.add(callback)

# Run this callback on the next I/O loop iteration
loop.add_callback(callback)
8 changes: 8 additions & 0 deletions test/sample_tailing_app/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
This Tornado test application demonstrates tailable cursors. It creates a
capped collection in the 'test' database and displays a web page at
http://localhost:8888 that streams new documents in the capped collection to the
page.

Try opening several copies of the page and using the app to add documents to the
collection, or clear the collection. You can also use the mongo shell to insert
a document and see it appear in the page.
Loading