Skip to content
This repository was archived by the owner on Jan 10, 2019. It is now read-only.

Redis v2.6 Lua Scripting #11

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
*.pyc
.*.swp
build/
.project
.pydevproject
.settings
28 changes: 16 additions & 12 deletions contract.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@ Delete Feed:
PUBLISH delfeed [feed]\x00[instance uuid]
EXEC // if nil: go back to WATCH

Set Config:
SET feed.config:[feed] //json(config)
Set Config Value:
HSET feed.config:[feed] name value
PUBLISH conffeed [feed]\x00[instance uuid]

Get Config Value:
HGET feed.config:[feed] name

Feed:

Publish:
//id may be provided or generated. An id that has already been published will update that id

max = feed.config:[feed] max_length
WATCH feed.ids:[feed]
delete_ids = ZRANGE feed.ids:[feed] 0 [-max] // eg ZRANGE feed.ids:test 0 -5 if the max is 4
MULTI
Expand Down Expand Up @@ -188,22 +192,22 @@ Job:
LPUSH feed.ids:[feed] [id]
INCR feed.publishes:[feed]
HSET feed.items:[feed] [id] [item]
ZADD feed.published:[feed] [utc epoch] [id]
ZADD feed.published:[feed] [utc epoch milliseconds] [id]
EXEC

High Priority Put:
// id = generated uuid
MULTI
RPUSH feed.ids:[feed] [id]
HSET feed.items:[feed] [id] [item]
ZADD feed.published:[feed] [utc epoch] [id]
ZADD feed.published:[feed] [utc epoch milliseconds] [id]
EXEC

Get:
id = BRPOP feed.ids:[feed] [timeout]
//if error/timeout, abort
MULTI
ZADD feed.claimed:[feed] [utc epoch] [id]
ZADD feed.claimed:[feed] [utc epoch milliseconds] [id]
item = HGET feed:items[feed] [id]
EXEC
//if the id fails to get from feed.ids to feed.claimed, the maintenance will notice eventually
Expand All @@ -214,15 +218,12 @@ Job:
MULTI
ZREM feed.claimed:[feed] [id]
HDEL feed.cancelled:[feed] [id] //just to make sure
INCR feed.finishes:[feed]
//optionally if publishing a result:
LPUSH feed.jobfinished:[feed]\x00[id] [result]
EXPIRE feed.jobfinished:[feed]\x00[id] [timeout]
PUBLISH job.finish:[feed] [id]\x00[result]
HDEL feed.items:[feed] [id]
EXEC // if nil: go back to WATCH and try again

Get Result:
BRPOP feed:jobfinished:[feed]\x00[id] [timeout]

Get Ids:
HKEYS feed.items:[feed]

Expand Down Expand Up @@ -252,7 +253,7 @@ Job:
SREM feed.stalled:[feed] [id]
//if error, abort
LPUSH feed.ids:[feed] [id]
ZADD feed.published:[feed] [utc epoch] [id]
ZADD feed.published:[feed] [utc epoch milliseconds] [id]
EXEC // if nil retry

Retract:
Expand All @@ -267,6 +268,9 @@ Job:
LREM feed.ids:[feed] 1 [id]
EXEC // if fail, retry

getNumOfFailures:
HGET feed.cancelled:[feed] [id]

Maintenance: //maintain job queue -- only ran by one process per jobqueue on occassion -- still a bit hand-wavey
MULTI
keys = HKEYS feed.items:[feed]
Expand All @@ -280,4 +284,4 @@ Job:
LPUSH feed.ids:[feed] [key]

check claimed jobs to see if any have been claimed too long and "Cancel" or "Stall" them
publish stats to a feed
publish stats to a feed
11 changes: 11 additions & 0 deletions scripts/config.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- ARGV: name, config(json), instance
if redis.call('sismember', 'feeds', name) then
return false
end
config = cjson.decode(ARGV[2])
feed = 'feed.config:'..ARGV[1]
table.foreach(config, function(k, v)
redis.call('hset', feed, k, v)
end)
redis.call('publish', 'conffeed', ARGV[1]..'\0'..ARGV[3])
return true
13 changes: 13 additions & 0 deletions scripts/create.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- ARGV: name, config, instance
if redis.call('sadd', 'feeds', ARGV[1]) == 0 then
-- feed already exists
return false
end
feed = 'feed.config:'..ARGV[1]
config = cjson.decode(ARGV[2])
-- TODO: check if config has a type key
for k, v in pairs(config) do
redis.call('hset', feed, k, v)
end
redis.call('publish', 'newfeed', ARGV[1]..'\0'..ARGV[3])
return true
40 changes: 40 additions & 0 deletions scripts/delete.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- ARGS: feed, instance
if redis.call('srem', 'feeds', ARGV[1]) == 0 then
return false
end
schema = {
feed = function(name) return {
'feed.config:'..name,
'feed.ids:'..name,
'feed.items:'..name,
'feed.publishes:'..name
} end,
sortedfeed = function(name) return {
'feed.config:'..name,
'feed.ids:'..name,
'feed.items:'..name,
'feed.publishes:'..name,
'feed.idincr:'..name
} end,
queue = function(name) return {
'feed.config:'..name,
'feed.ids:'..name,
'feed.items:'..name,
'feed.publishes:'..name
} end,
job = function(name) return {
'feed.config:'..name,
'feed.ids:'..name,
'feed.items:'..name,
'feed.publishes:'..name,
'feed.published:'..name,
'feed.claimed:'..name,
'feed.cancelled:'..name,
'feed.finishes:'..name,
'feed.stalled:'..name
} end
}
feedtype = redis.call('hget', 'feed.config:'..ARGV[1], 'type')
redis.call('del', unpack(schema[feedtype](ARGV[1])))
redis.call('publish', 'delfeed', ARGV[1]..'\0'..ARGV[2])
return true
19 changes: 19 additions & 0 deletions scripts/feed/publish.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- ARGV: feed, id, item, time
max = redis.call("hget", "feed.config:"..ARGV[1], "max_length")
if max and tonumber(max) > 0 then
ids = redis.call('zrange', 'feed.ids:'..ARGV[1], 0, -tonumber(max))
table.foreach(ids, function(i, id)
redis.call('zrem', 'feed.ids:'..ARGV[1], id)
redis.call('hdel', 'feed.items:'..ARGV[1], id)
redis.call('publish', 'feed.retract:'..ARGV[1], id)
end)
end

redis.call('incr', 'feed.publishes:'..ARGV[1])
redis.call('hset', 'feed.items:'..ARGV[1], ARGV[2], ARGV[3])
if redis.call('zadd', 'feed.ids:'..ARGV[1], ARGV[4], ARGV[2]) == 1 then
redis.call('publish', 'feed.edit:'..ARGV[1], ARGV[2]..'\0'..ARGV[3])
else
redis.call('publish', 'feed.publish:'..ARGV[1], ARGV[2]..'\0'..ARGV[3])
end
return zadd
8 changes: 8 additions & 0 deletions scripts/feed/retract.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- ARGV: feed, id
if redis.call('zrem', 'feed.ids:'..ARGV[1], ARGV[2]) == 0 then
return false
end
redis.call('hdel', 'feed.items:'..ARGV[1], ARGV[2])
redis.call('publish', 'feed.retract:'..ARGV[1], ARGV[2])
return true

7 changes: 7 additions & 0 deletions scripts/jobs/cancel.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- ARGV: name, id
if redis.call('zrem', 'feed.claimed:'..ARGV[1], ARGV[2]) == 0 then
return false
end
redis.call('hincrby', 'feed.cancelled:'..ARGV[1], ARGV[2], 1)
redis.call('lpush', 'feed.ids:'..ARGV[1], ARGV[2])
return true
12 changes: 12 additions & 0 deletions scripts/jobs/finish.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- ARGV: name, id, result(optional)
if redis.call('zrem', 'feed.claimed:'..ARGV[1], ARGV[2]) == 0 then
return false;
end
redis.call('hdel', 'feed.cancelled:'..ARGV[1], ARGV[2])
redis.call('zrem', 'feed.published:'..ARGV[1], ARGV[2])
redis.call('incr', 'feed.finishes:'..ARGV[1])
if table.getn(ARGV) == 3 then
redis.call('publish', 'job.finish:'..ARGV[1], ARGV[2].."\0"..ARGV[3])
end
redis.call('hdel', 'feed.items:'..ARGV[1], ARGV[2])
return true
4 changes: 4 additions & 0 deletions scripts/jobs/get.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- ARGV: name, id, time
r1 = redis.call('zadd', 'feed.claimed:'..ARGV[1], ARGV[3], ARGV[2]);
r2 = redis.call('hget', 'feed.items:'..ARGV[1], ARGV[2]);
return {r1, r2}
9 changes: 9 additions & 0 deletions scripts/jobs/publish.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- ARGV: name, id, item, time, priority
if ARGV[5] == nil then
redis.call('lpush', 'feed.ids:'..ARGV[1], ARGV[2]);
else
redis.call('rpush', 'feed.ids:'..ARGV[1], ARGV[2]);
end
redis.call('incr', 'feed.publishes:'..ARGV[1]);
redis.call('hset', 'feed.items:'..ARGV[1], ARGV[2], ARGV[3]);
return redis.call('zadd', 'feed.published:'..ARGV[1], ARGV[4], ARGV[2]);
11 changes: 11 additions & 0 deletions scripts/jobs/retract.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- ARGV: name, id
if redis.call('hdel', 'feed.items:'..ARGV[1], ARGV[2]) == 0 then
return false
end
redis.call('hdel', 'feed.cancelled:'..ARGV[1], ARGV[2])
redis.call('zrem', 'feed.published:'..ARGV[1], ARGV[2])
redis.call('srem', 'feed.stalled:'..ARGV[1], ARGV[2])
redis.call('zrem', 'feed.claimed:'..ARGV[1], ARGV[2])
redis.call('lrem', 'feed.ids:'..ARGV[1], 1, ARGV[2])
return true

7 changes: 7 additions & 0 deletions scripts/jobs/retry.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- ARGV: name, id, time
if redis.call('srem', 'feed.stalled:'..ARGV[1], ARGV[2]) == 0 then
return false;
end
redis.call('lpush', 'feed.ids:'..ARGV[1], ARGV[2]);
redis.call('zadd', 'feed.published:'..ARGV[1], ARGV[3], ARGV[2]);
return true
8 changes: 8 additions & 0 deletions scripts/jobs/stall.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- ARGV: name, id
if redis.call('zrem', 'feed.claimed:'..ARGV[1], ARGV[2]) == 0 then
return false;
end
redis.call('hdel', 'feed.cancelled:'..ARGV[1], ARGV[2]);
redis.call('sadd', 'feed.stalled:'..ARGV[1], ARGV[2]);
redis.call('zrem', 'feed.published'..ARGV[1], ARGV[2]);
return true
8 changes: 4 additions & 4 deletions test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# You were warned.
# =====================================================================

#[Test]
#host=localhost
#port=6379
#db=10
[Test]
host=localhost
port=6379
db=10
24 changes: 18 additions & 6 deletions tests/test_feed.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
import thoonk
from thoonk.feeds import Feed
import unittest
from ConfigParser import ConfigParser


class TestLeaf(unittest.TestCase):

def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)

def setUp(self, *args, **kwargs):
conf = ConfigParser()
conf.read('test.cfg')
if conf.sections() == ['Test']:
self.ps = thoonk.Thoonk(host=conf.get('Test', 'host'),
port=conf.getint('Test', 'port'),
db=conf.getint('Test', 'db'))
db=conf.getint('Test', 'db'),
listen=False)
self.ps.redis.flushdb()
else:
print 'No test configuration found in test.cfg'
exit()


def tearDown(self):
self.ps.close()

def test_05_basic_retract(self):
"""Test adding and retracting an item."""
l = self.ps.feed("testfeed")
self.assertEqual(type(l), Feed)
l.publish('foo', id='1')
r = l.get_ids()
v = l.get_all()
Expand All @@ -46,6 +49,10 @@ def test_10_basic_feed(self):
def test_20_basic_feed_items(self):
"""Test items match completely."""
l = self.ps.feed("testfeed")
l.publish("hi", id='1')
l.publish("bye", id='2')
l.publish("thanks", id='3')
l.publish("you're welcome", id='4')
r = l.get_ids()
self.assertEqual(r, ['1', '2', '3', '4'], "Queue results did not match publish: %s" % r)
c = {}
Expand All @@ -56,6 +63,10 @@ def test_20_basic_feed_items(self):
def test_30_basic_feed_retract(self):
"""Testing item retract items match."""
l = self.ps.feed("testfeed")
l.publish("hi", id='1')
l.publish("bye", id='2')
l.publish("thanks", id='3')
l.publish("you're welcome", id='4')
l.retract('3')
r = l.get_ids()
self.assertEqual(r, ['1', '2','4'], "Queue results did not match publish: %s" % r)
Expand All @@ -68,6 +79,7 @@ def test_40_create_delete(self):
"""Testing feed delete"""
l = self.ps.feed("test2")
l.delete_feed()


def test_50_max_length(self):
"""Test feeds with a max length"""
Expand Down
Loading