From bbed6826ba846288cef90b193d59b59ef6623897 Mon Sep 17 00:00:00 2001 From: evilkost Date: Thu, 14 Apr 2011 18:34:17 +0400 Subject: [PATCH 1/9] +ReconnectTestCase --- tests/server_commands.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/server_commands.py b/tests/server_commands.py index 6370952..4a5a057 100644 --- a/tests/server_commands.py +++ b/tests/server_commands.py @@ -78,6 +78,9 @@ def callback(result): msg=source_line+' Got:'+repr(result)) return callback + def delayed(self, timeout, cb): + self.loop.add_timeout(time.time()+timeout, cb) + def finish(self, *args): self.loop.stop() @@ -713,5 +716,16 @@ def simulate(client, callbacks): self.loop.add_callback(lambda: simulate(self.client, self.finish)) self.start() +class ReconnectTestCase(TornadoTestCase): + def test_run_stop_redis(self): + self.client.set('foo', 'bar', self.expect(True)) + self.delayed(10, lambda: + self.client.get('foo', [ + self.expect('bar'), + self.finish + ]) + ) + self.start() + if __name__ == '__main__': unittest.main() From 78378633845d4bc737b05991f7acb7b201e28fec Mon Sep 17 00:00:00 2001 From: evilkost Date: Thu, 14 Apr 2011 21:08:12 +0400 Subject: [PATCH 2/9] remove unused method Connection.consume --- brukva/client.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/brukva/client.py b/brukva/client.py index fc3315d..3b77c9b 100644 --- a/brukva/client.py +++ b/brukva/client.py @@ -110,12 +110,6 @@ def write(self, data): else: self._stream.write(data) - def consume(self, length): - if not self._stream: - self.on_reconnect() - if not self._stream: - raise ConnectionError('Tried to consume from non-existent connection') - self._stream.read_bytes(length, NOOP_CB) def read(self, length, callback): try: From 63ba42b9edfa263f9f276574fd0504fb19bcaac7 Mon Sep 17 00:00:00 2001 From: evilkost Date: Thu, 14 Apr 2011 21:15:55 +0400 Subject: [PATCH 3/9] +ReconnectTestCase.test_redis_timeout_with_pipe --- tests/server_commands.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/server_commands.py b/tests/server_commands.py index 4a5a057..5a40122 100644 --- a/tests/server_commands.py +++ b/tests/server_commands.py @@ -13,6 +13,7 @@ from brukva.adisp import process, async from brukva.exceptions import ResponseError +import logging; logging.basicConfig() def callable(obj): return hasattr(obj, '__call__') @@ -716,8 +717,9 @@ def simulate(client, callbacks): self.loop.add_callback(lambda: simulate(self.client, self.finish)) self.start() + class ReconnectTestCase(TornadoTestCase): - def test_run_stop_redis(self): + def test_redis_timeout(self): self.client.set('foo', 'bar', self.expect(True)) self.delayed(10, lambda: self.client.get('foo', [ @@ -727,5 +729,21 @@ def test_run_stop_redis(self): ) self.start() + def test_redis_timeout_with_pipe(self): + self.client.set('foo', 'bar', self.expect(True)) + pipe = self.client.pipeline(transactional=True) + pipe.get('foo') + + self.delayed(10, lambda: + pipe.execute([ + self.pexpect([ + 'bar', + ]), + self.finish, + ]) + ) + self.start() + + if __name__ == '__main__': unittest.main() From dac8d516464a74051f453a9692abd27d7306871e Mon Sep 17 00:00:00 2001 From: evilkost Date: Thu, 14 Apr 2011 21:19:52 +0400 Subject: [PATCH 4/9] redis-conf for new test case --- redis.conf | 368 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 368 insertions(+) create mode 100644 redis.conf diff --git a/redis.conf b/redis.conf new file mode 100644 index 0000000..3a33987 --- /dev/null +++ b/redis.conf @@ -0,0 +1,368 @@ +# Redis configuration file example + +# Note on units: when memory size is needed, it is possible to specifiy +# it in the usual form of 1k 5GB 4M and so forth: +# +# 1k => 1000 bytes +# 1kb => 1024 bytes +# 1m => 1000000 bytes +# 1mb => 1024*1024 bytes +# 1g => 1000000000 bytes +# 1gb => 1024*1024*1024 bytes +# +# units are case insensitive so 1GB 1Gb 1gB are all the same. + +# By default Redis does not run as a daemon. Use 'yes' if you need it. +# Note that Redis will write a pid file in /var/run/redis.pid when daemonized. +daemonize yes + +# When running daemonized, Redis writes a pid file in /var/run/redis.pid by +# default. You can specify a custom pid file location here. +#pidfile /var/run/redis.pid + +# Accept connections on the specified port, default is 6379. +# If port 0 is specified Redis will not listen on a TCP socket. +#port 6379 + +# If you want you can bind a single interface, if the bind option is not +# specified all the interfaces will listen for incoming connections. +# +# bind 127.0.0.1 + +# Specify the path for the unix socket that will be used to listen for +# incoming connections. There is no default, so Redis will not listen +# on a unix socket when not specified. +# +# unixsocket /tmp/redis.sock + +# Close the connection after a client is idle for N seconds (0 to disable) +timeout 1 + +# Set server verbosity to 'debug' +# it can be one of: +# debug (a lot of information, useful for development/testing) +# verbose (many rarely useful info, but not a mess like the debug level) +# notice (moderately verbose, what you want in production probably) +# warning (only very important / critical messages are logged) +#loglevel verbose +loglevel notice + +# Specify the log file name. Also 'stdout' can be used to force +# Redis to log on the standard output. Note that if you use standard +# output for logging but daemonize, logs will be sent to /dev/null +#logfile stdout + +# To enable logging to the system logger, just set 'syslog-enabled' to yes, +# and optionally update the other syslog parameters to suit your needs. +# syslog-enabled no + +# Specify the syslog identity. +# syslog-ident redis + +# Specify the syslog facility. Must be USER or between LOCAL0-LOCAL7. +# syslog-facility local0 + +# Set the number of databases. The default database is DB 0, you can select +# a different one on a per-connection basis using SELECT where +# dbid is a number between 0 and 'databases'-1 +databases 16 + +################################ SNAPSHOTTING ################################# +# +# Save the DB on disk: +# +# save +# +# Will save the DB if both the given number of seconds and the given +# number of write operations against the DB occurred. +# +# In the example below the behaviour will be to save: +# after 900 sec (15 min) if at least 1 key changed +# after 300 sec (5 min) if at least 10 keys changed +# after 60 sec if at least 10000 keys changed +# +# Note: you can disable saving at all commenting all the "save" lines. + +#save 900 1 +#save 300 10 +#save 60 10000 + +# Compress string objects using LZF when dump .rdb databases? +# For default that's set to 'yes' as it's almost always a win. +# If you want to save some CPU in the saving child set it to 'no' but +# the dataset will likely be bigger if you have compressible values or keys. +#rdbcompression yes + +# The filename where to dump the DB +#dbfilename dump.rdb + +# The working directory. +# +# The DB will be written inside this directory, with the filename specified +# above using the 'dbfilename' configuration directive. +# +# Also the Append Only File will be created inside this directory. +# +# Note that you must specify a directory here, not a file name. +#dir ./ + +################################# REPLICATION ################################# + +# Master-Slave replication. Use slaveof to make a Redis instance a copy of +# another Redis server. Note that the configuration is local to the slave +# so for example it is possible to configure the slave to save the DB with a +# different interval, or to listen to another port, and so on. +# +# slaveof + +# If the master is password protected (using the "requirepass" configuration +# directive below) it is possible to tell the slave to authenticate before +# starting the replication synchronization process, otherwise the master will +# refuse the slave request. +# +# masterauth + +# When a slave lost the connection with the master, or when the replication +# is still in progress, the slave can act in two different ways: +# +# 1) if slave-serve-stale-data is set to 'yes' (the default) the slave will +# still reply to client requests, possibly with out of data data, or the +# data set may just be empty if this is the first synchronization. +# +# 2) if slave-serve-stale data is set to 'no' the slave will reply with +# an error "SYNC with master in progress" to all the kind of commands +# but to INFO and SLAVEOF. +# +# slave-serve-stale-data yes + +################################## SECURITY ################################### + +# Require clients to issue AUTH before processing any other +# commands. This might be useful in environments in which you do not trust +# others with access to the host running redis-server. +# +# This should stay commented out for backward compatibility and because most +# people do not need auth (e.g. they run their own servers). +# +# Warning: since Redis is pretty fast an outside user can try up to +# 150k passwords per second against a good box. This means that you should +# use a very strong password otherwise it will be very easy to break. +# +# requirepass foobared + +# Command renaming. +# +# It is possilbe to change the name of dangerous commands in a shared +# environment. For instance the CONFIG command may be renamed into something +# of hard to guess so that it will be still available for internal-use +# tools but not available for general clients. +# +# Example: +# +# rename-command CONFIG b840fc02d524045429941cc15f59e41cb7be6c52 +# +# It is also possilbe to completely kill a command renaming it into +# an empty string: +# +# rename-command CONFIG "" + +################################### LIMITS #################################### + +# Set the max number of connected clients at the same time. By default there +# is no limit, and it's up to the number of file descriptors the Redis process +# is able to open. The special value '0' means no limits. +# Once the limit is reached Redis will close all the new connections sending +# an error 'max number of clients reached'. +# +# maxclients 128 + +# Don't use more memory than the specified amount of bytes. +# When the memory limit is reached Redis will try to remove keys with an +# EXPIRE set. It will try to start freeing keys that are going to expire +# in little time and preserve keys with a longer time to live. +# Redis will also try to remove objects from free lists if possible. +# +# If all this fails, Redis will start to reply with errors to commands +# that will use more memory, like SET, LPUSH, and so on, and will continue +# to reply to most read-only commands like GET. +# +# WARNING: maxmemory can be a good idea mainly if you want to use Redis as a +# 'state' server or cache, not as a real DB. When Redis is used as a real +# database the memory usage will grow over the weeks, it will be obvious if +# it is going to use too much memory in the long run, and you'll have the time +# to upgrade. With maxmemory after the limit is reached you'll start to get +# errors for write operations, and this may even lead to DB inconsistency. +# +# maxmemory + +# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory +# is reached? You can select among five behavior: +# +# volatile-lru -> remove the key with an expire set using an LRU algorithm +# allkeys-lru -> remove any key accordingly to the LRU algorithm +# volatile-random -> remove a random key with an expire set +# allkeys->random -> remove a random key, any key +# volatile-ttl -> remove the key with the nearest expire time (minor TTL) +# noeviction -> don't expire at all, just return an error on write operations +# +# Note: with all the kind of policies, Redis will return an error on write +# operations, when there are not suitable keys for eviction. +# +# At the date of writing this commands are: set setnx setex append +# incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd +# sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby +# zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby +# getset mset msetnx exec sort +# +# The default is: +# +# maxmemory-policy volatile-lru + +# LRU and minimal TTL algorithms are not precise algorithms but approximated +# algorithms (in order to save memory), so you can select as well the sample +# size to check. For instance for default Redis will check three keys and +# pick the one that was used less recently, you can change the sample size +# using the following configuration directive. +# +# maxmemory-samples 3 + +############################## APPEND ONLY MODE ############################### + +# By default Redis asynchronously dumps the dataset on disk. If you can live +# with the idea that the latest records will be lost if something like a crash +# happens this is the preferred way to run Redis. If instead you care a lot +# about your data and don't want to that a single record can get lost you should +# enable the append only mode: when this mode is enabled Redis will append +# every write operation received in the file appendonly.aof. This file will +# be read on startup in order to rebuild the full dataset in memory. +# +# Note that you can have both the async dumps and the append only file if you +# like (you have to comment the "save" statements above to disable the dumps). +# Still if append only mode is enabled Redis will load the data from the +# log file at startup ignoring the dump.rdb file. +# +# IMPORTANT: Check the BGREWRITEAOF to check how to rewrite the append +# log file in background when it gets too big. + +appendonly no + +# The name of the append only file (default: "appendonly.aof") +# appendfilename appendonly.aof + +# The fsync() call tells the Operating System to actually write data on disk +# instead to wait for more data in the output buffer. Some OS will really flush +# data on disk, some other OS will just try to do it ASAP. +# +# Redis supports three different modes: +# +# no: don't fsync, just let the OS flush the data when it wants. Faster. +# always: fsync after every write to the append only log . Slow, Safest. +# everysec: fsync only if one second passed since the last fsync. Compromise. +# +# The default is "everysec" that's usually the right compromise between +# speed and data safety. It's up to you to understand if you can relax this to +# "no" that will will let the operating system flush the output buffer when +# it wants, for better performances (but if you can live with the idea of +# some data loss consider the default persistence mode that's snapshotting), +# or on the contrary, use "always" that's very slow but a bit safer than +# everysec. +# +# If unsure, use "everysec". + +# appendfsync always +# appendfsync everysec +# appendfsync no + +# When the AOF fsync policy is set to always or everysec, and a background +# saving process (a background save or AOF log background rewriting) is +# performing a lot of I/O against the disk, in some Linux configurations +# Redis may block too long on the fsync() call. Note that there is no fix for +# this currently, as even performing fsync in a different thread will block +# our synchronous write(2) call. +# +# In order to mitigate this problem it's possible to use the following option +# that will prevent fsync() from being called in the main process while a +# BGSAVE or BGREWRITEAOF is in progress. +# +# This means that while another child is saving the durability of Redis is +# the same as "appendfsync none", that in pratical terms means that it is +# possible to lost up to 30 seconds of log in the worst scenario (with the +# default Linux settings). +# +# If you have latency problems turn this to "yes". Otherwise leave it as +# "no" that is the safest pick from the point of view of durability. +# no-appendfsync-on-rewrite no + +#################################### DISK STORE ############################### + +# When disk store is active Redis works as an on-disk database, where memory +# is only used as a object cache. +# +# This mode is good for datasets that are bigger than memory, and in general +# when you want to trade speed for: +# +# - less memory used +# - immediate server restart +# - per key durability, without need for backgrond savig +# +# On the other hand, with disk store enabled MULTI/EXEC are no longer +# transactional from the point of view of the persistence on disk, that is, +# Redis transactions will still guarantee that commands are either processed +# all or nothing, but there is no guarantee that all the keys are flushed +# on disk in an atomic way. +# +# Of course with disk store enabled Redis is not as fast as it is when +# working with just the memory back end. + +# diskstore-enabled no +# diskstore-path redis.ds +# cache-max-memory 0 +# cache-flush-delay 0 + +############################### ADVANCED CONFIG ############################### + +# Hashes are encoded in a special way (much more memory efficient) when they +# have at max a given numer of elements, and the biggest element does not +# exceed a given threshold. You can configure this limits with the following +# configuration directives. +hash-max-zipmap-entries 512 +hash-max-zipmap-value 64 + +# Similarly to hashes, small lists are also encoded in a special way in order +# to save a lot of space. The special representation is only used when +# you are under the following limits: +list-max-ziplist-entries 512 +list-max-ziplist-value 64 + +# Sets have a special encoding in just one case: when a set is composed +# of just strings that happens to be integers in radix 10 in the range +# of 64 bit signed integers. +# The following configuration setting sets the limit in the size of the +# set in order to use this special memory saving encoding. +set-max-intset-entries 512 + +# Active rehashing uses 1 millisecond every 100 milliseconds of CPU time in +# order to help rehashing the main Redis hash table (the one mapping top-level +# keys to values). The hash table implementation redis uses (see dict.c) +# performs a lazy rehashing: the more operation you run into an hash table +# that is rhashing, the more rehashing "steps" are performed, so if the +# server is idle the rehashing is never complete and some more memory is used +# by the hash table. +# +# The default is to use this millisecond 10 times every second in order to +# active rehashing the main dictionaries, freeing memory when possible. +# +# If unsure: +# use "activerehashing no" if you have hard latency requirements and it is +# not a good thing in your environment that Redis can reply form time to time +# to queries with 2 milliseconds delay. +# +# use "activerehashing yes" if you don't have such hard requirements but +# want to free memory asap when possible. +activerehashing yes + +loglevel verbose +port 6379 +pidfile /home/vgol/dev/github/brukva/redis.6379.pid +logfile /home/vgol/dev/github/brukva/tests/log/redis-server.6379.log +include /home/vgol/dev/github/brukva/common.conf From f9b252fadadb46560ed2147565dfe3d3eca743fb Mon Sep 17 00:00:00 2001 From: evilkost Date: Thu, 14 Apr 2011 21:48:50 +0400 Subject: [PATCH 5/9] fix import in redis.conf --- redis.conf | 1 - 1 file changed, 1 deletion(-) diff --git a/redis.conf b/redis.conf index 3a33987..430fdbc 100644 --- a/redis.conf +++ b/redis.conf @@ -365,4 +365,3 @@ loglevel verbose port 6379 pidfile /home/vgol/dev/github/brukva/redis.6379.pid logfile /home/vgol/dev/github/brukva/tests/log/redis-server.6379.log -include /home/vgol/dev/github/brukva/common.conf From 3d15814e8d0030f1a7520a1ee106c70b7a70e6cd Mon Sep 17 00:00:00 2001 From: evilkost Date: Fri, 15 Apr 2011 04:48:22 +0400 Subject: [PATCH 6/9] imporoved forward_error context manager --- brukva/client.py | 42 +++++++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/brukva/client.py b/brukva/client.py index 3b77c9b..352c2f6 100644 --- a/brukva/client.py +++ b/brukva/client.py @@ -16,20 +16,36 @@ log = logging.getLogger('brukva.client') -@contextlib.contextmanager -def forward_error(callbacks, cleanup=None): - try: - yield callbacks - except Exception, e: - log.error(e) - if isinstance(callbacks, Iterable): - for cb in callbacks: - cb(e) +class ForwardErrorManager(object): + def __init__(self, callbacks): + self.callbacks = callbacks + self.is_active = True + + def __enter__(self): + return self + + def __exit__(self, type, value, tb): + if type is None: + return True + + if self.is_active: + if isinstance(self.callbacks, Iterable): + for cb in self.callbacks: + cb(value) + else: + self.callbacks(value) + return True else: - callbacks(e) - finally: - if cleanup: - cleanup() + return False + + def disable(self): + self.is_active = False + + def enable(self): + self.is_active = True + +def forward_error(callbacks): + return ForwardErrorManager(callbacks) class Message(object): def __init__(self, kind, channel, body): From b505740564dcdb654a024d777e0e89202433dc1e Mon Sep 17 00:00:00 2001 From: evilkost Date: Fri, 15 Apr 2011 05:04:47 +0400 Subject: [PATCH 7/9] possible reconnect fix --- brukva/client.py | 137 +++++++++++++++++++++++---------------- tests/server_commands.py | 3 + 2 files changed, 83 insertions(+), 57 deletions(-) diff --git a/brukva/client.py b/brukva/client.py index 352c2f6..dd73e99 100644 --- a/brukva/client.py +++ b/brukva/client.py @@ -89,13 +89,15 @@ def format_pipeline_request(command_stack): return ''.join(format(c.cmd, *c.args, **c.kwargs) for c in command_stack) class Connection(object): - def __init__(self, host, port, on_reconnect, timeout=None, io_loop=None): + def __init__(self, host, port, on_connect, on_disconnect, timeout=None, io_loop=None): self.host = host self.port = port - self.on_reconnect = on_reconnect + self.on_connect = on_connect + self.on_disconnect = on_disconnect self.timeout = timeout self._stream = None self._io_loop = io_loop + self.try_left = 2 self.in_progress = False self.read_queue = [] @@ -107,8 +109,10 @@ def connect(self): sock.settimeout(self.timeout) sock.connect((self.host, self.port)) self._stream = IOStream(sock, io_loop=self._io_loop) + self.connected() except socket.error, e: raise ConnectionError(str(e)) + self.on_connect() def disconnect(self): if self._stream: @@ -118,38 +122,41 @@ def disconnect(self): pass self._stream = None - def write(self, data): + def write(self, data, try_left=None): + if try_left is None: + try_left = self.try_left if not self._stream: - self.on_reconnect() + self.connect() if not self._stream: raise ConnectionError('Tried to write to non-existent connection') - else: - self._stream.write(data) + if try_left > 0: + try: + #print('try to write: %s'% data) + self._stream.write(data) + except IOError, e: + self.disconnect() + self.write(data, try_left - 1) + else: + raise ConnectionError('Tried to write to non-existent connection') def read(self, length, callback): try: if not self._stream: - self.client._sudden_disconnect([callback]) - self.on_reconnect() - if not self._stream: - raise ConnectionError('Tried to read from non-existent connection') + self.disconnect() + raise ConnectionError('Tried to read from non-existent connection') self._stream.read_bytes(length, callback) except IOError: - self.client._sudden_disconnect([callback]) - self.on_reconnect() + self.on_disconnect() def readline(self, callback): try: if not self._stream: - self.client._sudden_disconnect([callback]) - self.on_reconnect() - if not self._stream: - raise ConnectionError('Tried to read from non-existent connection') + self.disconnect() + raise ConnectionError('Tried to read from non-existent connection') self._stream.read_until('\r\n', callback) except IOError: - self.client._sudden_disconnect([callback]) - self.on_reconnect() + self.on_disconnect() def try_to_perform_read(self): if not self.in_progress and self.read_queue: @@ -244,16 +251,18 @@ def __getattr__(self, item): class Client(object): - def __init__(self, host='localhost', port=6379, password=None, reconnect=False, io_loop=None): + def __init__(self, host='localhost', port=6379, password=None, + selected_db=None, io_loop=None): self._io_loop = io_loop or IOLoop.instance() - - self.connection = Connection(host, port, self.on_reconnect, io_loop=self._io_loop) + self.connection = Connection(host, port, + self.on_connect, self.on_disconnect, io_loop=self._io_loop) self.async = _AsyncWrapper(weakref.proxy(self)) self.queue = [] self.current_cmd_line = None self.subscribed = False self.password = password - self.reconnect = reconnect + self.selected_db = selected_db + self.write_try_num = 2 self.REPLY_MAP = dict_merge( string_keys_to_dict('AUTH BGREWRITEAOF BGSAVE DEL EXISTS EXPIRE HDEL HEXISTS ' 'HMSET MOVE MSET MSETNX SAVE SETNX', @@ -290,22 +299,31 @@ def __repr__(self): def pipeline(self, transactional=False): if not self._pipeline: - self._pipeline = Pipeline(io_loop = self._io_loop, transactional=transactional) + self._pipeline = Pipeline( + selected_db=self.selected_db, + io_loop = self._io_loop, + transactional=transactional + ) self._pipeline.connection = self.connection return self._pipeline #### connection + def connect(self): self.connection.connect() - if self.password: - self.auth(self.password) def disconnect(self): self.connection.disconnect() - def on_reconnect(self): - if self.reconnect: - self.connect() + def on_connect(self): + if self.password: + self.auth(self.password) + if self.selected_db: + self.select(self.selected_db) + + def on_disconnect(self, callbacks): + self.pipeline().discard() + raise ConnectionError("Socket closed on remote end") #### #### formatting @@ -342,22 +360,18 @@ def call_callbacks(self, callbacks, *args, **kwargs): for cb in callbacks: cb(*args, **kwargs) - def _sudden_disconnect(self, callbacks): - self.connection.disconnect() - raise ConnectionError("Socket closed on remote end") - @process def execute_command(self, cmd, callbacks, *args, **kwargs): - with forward_error(callbacks): + result = None + with forward_error(callbacks) as forward: if callbacks is None: callbacks = [] elif not hasattr(callbacks, '__iter__'): callbacks = [callbacks] + try: - if self.reconnect and not self.connection.connected(): - self.connect() self.connection.write(self.format(cmd, *args, **kwargs)) - except IOError: + except IOError, e: self._sudden_disconnect(callbacks) except Exception, e: self.connection.disconnect() @@ -369,13 +383,15 @@ def execute_command(self, cmd, callbacks, *args, **kwargs): data = yield async(self.connection.readline)() if not data: result = None + self.connection.read_done() raise Exception('TODO: [no data from connection->readline') else: response = yield self.process_data(data, cmd_line) result = self.format_reply(cmd_line, response) - self.connection.read_done() - self.call_callbacks(callbacks, result) + self.connection.read_done() + + self.call_callbacks(callbacks, result) @async @process @@ -389,7 +405,6 @@ def process_data(self, data, cmd_line, callback): response = [] else: if len(data) == 0: - self.on_reconnect() raise IOError('Disconnected') head, tail = data[0], data[1:] @@ -408,7 +423,7 @@ def process_data(self, data, cmd_line, callback): else: raise ResponseError('Unknown response type %s' % head, cmd_line) - callback(response) + callback(response) @async @process @@ -424,7 +439,7 @@ def consume_multibulk(self, length, cmd_line, callback): ) token = yield self.process_data(data, cmd_line) #FIXME error tokens.append( token ) - callback(tokens) + callback(tokens) @async @process @@ -437,7 +452,7 @@ def consume_bulk(self, length, callback): raise ResponseError('EmptyResponse') else: data = data[:-2] - callback(data) + callback(data) #### ### MAINTENANCE @@ -460,6 +475,7 @@ def info(self, callbacks=None): self.execute_command('INFO', callbacks) def select(self, db, callbacks=None): + self.selected_db = db self.execute_command('SELECT', callbacks, db) def shutdown(self, callbacks=None): @@ -823,7 +839,7 @@ def publish(self, channel, message, callbacks=None): @process def listen(self, callbacks=None): # 'LISTEN' is just for receiving information, it is not actually sent anywhere - with forward_error(callbacks): + with forward_error(callbacks) as forward: callbacks = callbacks or [] if not hasattr(callbacks, '__iter__'): callbacks = [callbacks] @@ -839,8 +855,10 @@ def listen(self, callbacks=None): if isinstance(response, Exception): raise response result = self.format_reply(cmd_listen, response) - self.call_callbacks(callbacks, result) + forward.disable() + self.call_callbacks(callbacks, result) + forward.enable() ### CAS def watch(self, key, callbacks=None): self.execute_command('WATCH', callbacks, key) @@ -855,19 +873,26 @@ def __init__(self, transactional, *args, **kwargs): self.command_stack = [] def execute_command(self, cmd, callbacks, *args, **kwargs): - if cmd in ('AUTH'): - raise Exception('403') + if cmd in ('AUTH', 'SELECT'): + raise RuntimeError('cmd %s must not be in pipe ' % cmd) self.command_stack.append(CmdLine(cmd, *args, **kwargs)) def discard(self): # actually do nothing with redis-server, just flush command_stack self.command_stack = [] - def _sudden_disconnect(self, callbacks, error=None): - self.connection.disconnect() - raise error or ConnectionError("Socket closed on remote end") + ### + def select(self, db, callbacks=None): + self.selected_db = db + super(Pipeline, self).execute_command('SELECT', callbacks, db) + + def auth(self, password, callbacks=None): + super(Pipeline, self).execute_command('AUTH', callbacks, password) + ### + @process def execute(self, callbacks): + results = None with forward_error(callbacks): command_stack = self.command_stack self.command_stack = [] @@ -881,16 +906,17 @@ def execute(self, callbacks): command_stack = [CmdLine('MULTI')] + command_stack + [CmdLine('EXEC')] request = format_pipeline_request(command_stack) + try: - if self.reconnect and not self.connection.connected(): - self.connect() self.connection.write(request) except IOError: self.command_stack = [] - self._sudden_disconnect(callbacks) + self.connection.disconnect() + raise ConnectionError("Socket closed on remote end") except Exception, e: self.command_stack = [] - self._sudden_disconnect(callbacks, e) + self.connection.disconnect() + raise e yield self.connection.queue_wait() responses = [] @@ -901,7 +927,6 @@ def execute(self, callbacks): data = yield async(self.connection.readline)() if not data: raise ResponseError('Not enough data after EXEC') - try: cmd_line = cmds.next() if self.transactional and cmd_line.cmd != 'EXEC': @@ -932,6 +957,4 @@ def format_replies(cmd_lines, responses): else: results = format_replies(command_stack, responses) - self.call_callbacks(callbacks, results) - - + self.call_callbacks(callbacks, results) diff --git a/tests/server_commands.py b/tests/server_commands.py index 5a40122..e600c56 100644 --- a/tests/server_commands.py +++ b/tests/server_commands.py @@ -485,6 +485,9 @@ def make_list(key, items, expect_value=True): self.finish()]) self.start() + + +class PipelineTestCase(TornadoTestCase): ### Pipeline ### def test_pipe_simple(self): pipe = self.client.pipeline() From eb5a05501eb3d9ab8098ce32ea451c16919b2c9e Mon Sep 17 00:00:00 2001 From: evilkost Date: Fri, 15 Apr 2011 05:10:54 +0400 Subject: [PATCH 8/9] some fixes and cleanup in test cases --- tests/server_commands.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/server_commands.py b/tests/server_commands.py index e600c56..3a6ae32 100644 --- a/tests/server_commands.py +++ b/tests/server_commands.py @@ -66,6 +66,9 @@ def pexpect(self, expected_list, list_without_errors=True): source_line = '\n' + tb.format_stack()[-2] def callback(result): + if isinstance(result, Exception): + self.fail('got exception %s' % result) + self.assertEqual(len(result), len(expected_list) ) for result, (exp_e, exp_d) in zip(result, expected_list): if exp_e: @@ -82,7 +85,7 @@ def callback(result): def delayed(self, timeout, cb): self.loop.add_timeout(time.time()+timeout, cb) - def finish(self, *args): + def finish(self, *args, **kwargs): self.loop.stop() def start(self): @@ -546,6 +549,7 @@ def test_mix_with_pipe(self): pipe.execute([self.pexpect(['123', {'zar': 'gza'}]), self.finish]) self.start() + def test_mix_with_pipe_multi(self): pipe = self.client.pipeline(transactional=True) @@ -610,6 +614,7 @@ def test_pipe_zsets(self): ]), self.finish, ]) + self.start() def test_pipe_zsets2(self): pipe = self.client.pipeline(transactional=False) @@ -690,14 +695,10 @@ def on_subscription(msg): self.assertEqual(msg.kind, 'subscribe') self.assertEqual(msg.channel, 'foo') self.assertEqual(msg.body, 1) - self.client2.listen(on_recv) self.client2.subscribe('foo', on_subscription) self.loop.add_timeout(time.time()+0.1, lambda: - self.client.publish('foo', 'bar', None) - ) - self.loop.add_timeout(time.time()+0.2, lambda: self.client.publish('foo', 'bar', lambda *args: self.loop.add_timeout(time.time()+0.1, self.finish) @@ -747,6 +748,5 @@ def test_redis_timeout_with_pipe(self): ) self.start() - if __name__ == '__main__': unittest.main() From aeabd47a79b50a74c57f1cb9e8c5b361408ea483 Mon Sep 17 00:00:00 2001 From: evilkost Date: Fri, 15 Apr 2011 16:00:27 +0400 Subject: [PATCH 9/9] cleanup --- brukva/client.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/brukva/client.py b/brukva/client.py index dd73e99..c494702 100644 --- a/brukva/client.py +++ b/brukva/client.py @@ -45,6 +45,17 @@ def enable(self): self.is_active = True def forward_error(callbacks): + """ + Syntax sugar. + If some error occurred inside with block, + it will be suppressed and forwarded to callbacks. + + Error handling can be disabled using context.disable(), + and re enabled again using context.enable(). + + @type callbacks: callable or iterator over callables + @rtype: context + """ return ForwardErrorManager(callbacks) class Message(object): @@ -134,7 +145,7 @@ def write(self, data, try_left=None): try: #print('try to write: %s'% data) self._stream.write(data) - except IOError, e: + except IOError: self.disconnect() self.write(data, try_left - 1) else: @@ -322,7 +333,6 @@ def on_connect(self): self.select(self.selected_db) def on_disconnect(self, callbacks): - self.pipeline().discard() raise ConnectionError("Socket closed on remote end") #### @@ -363,7 +373,7 @@ def call_callbacks(self, callbacks, *args, **kwargs): @process def execute_command(self, cmd, callbacks, *args, **kwargs): result = None - with forward_error(callbacks) as forward: + with forward_error(callbacks): if callbacks is None: callbacks = [] elif not hasattr(callbacks, '__iter__'):