diff --git a/examples/fetch.lua b/examples/fetch.lua index 27dede8..a284e3f 100755 --- a/examples/fetch.lua +++ b/examples/fetch.lua @@ -7,7 +7,7 @@ local ip = arg[2] local port = arg[3] local cjson = require'cjson' -local peer = require'jet.peer'.new{ip=ip,port=port} +local peer = require'jet.peer'.new{ip=ip,port=port,persist=100} local is_json,exp_json = pcall(cjson.decode,exp) if is_json then diff --git a/examples/persistant_ticker.lua b/examples/persistant_ticker.lua new file mode 100755 index 0000000..3ad2f33 --- /dev/null +++ b/examples/persistant_ticker.lua @@ -0,0 +1,24 @@ +#!/usr/bin/env lua +-- example program for manually testing persistant peers +local jet = require'jet' +local ev = require'ev' + +assert(arg[1],'ip exepected') + +local peer = jet.peer.new({ + ip = arg[1], + persist = 10, +}) + +local tick_tack = peer:state({ + path = 'tick_tack', + value = 1 +}) + +ev.Timer.new(function() + local new = tick_tack:value() + 1 + print(new) + tick_tack:value(new) + end,1,1):start(ev.Loop.default) + +peer:loop() diff --git a/rockspecs/lua-jet-scm-1.rockspec b/rockspecs/lua-jet-scm-1.rockspec index a0c386d..c825ed8 100644 --- a/rockspecs/lua-jet-scm-1.rockspec +++ b/rockspecs/lua-jet-scm-1.rockspec @@ -17,7 +17,8 @@ dependencies = { 'lua-websockets', 'luasocket', 'lua-ev', - 'lpack' + 'lpack', + 'lua-step' } build = { diff --git a/spec/daemon_spec.lua b/spec/daemon_spec.lua index 6c97f29..ea6200e 100644 --- a/spec/daemon_spec.lua +++ b/spec/daemon_spec.lua @@ -83,12 +83,16 @@ for _,info in ipairs(addresses_to_test) do local sock + local new_sock = function() + if info.family == 'inet6' then + return socket.tcp6() + else + return socket.tcp() + end + end + before_each(function() - if info.family == 'inet6' then - sock = socket.tcp6() - else - sock = socket.tcp() - end + sock = new_sock() end) after_each(function() @@ -204,6 +208,379 @@ for _,info in ipairs(addresses_to_test) do message_socket:send('this is no json') end) + it('a peer can configure persist and resume',function(done) + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response.id,1) + assert.is_string(response.result) + local resume_id = response.result + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response.id,2) + -- the server has received two messages associated with this persist session + -- (config.persist and config.resume) + assert.is_same(response.result,2) + done() + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + receivedCount = 1 + } + }, + id = 2, + })) + + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + persist = 0.001 + }, + id = 1, + })) + end) + + it('a peer can configure persist and resume (twice)',function(done) + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response.id,1) + assert.is_string(response.result) + local resume_id = response.result + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response.id,2) + -- the server has received two messages associated with this persist session + -- (config.persist and config.resume) + assert.is_same(response.result,2) + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response.id,3) + -- the server has received three messages associated with this persist session + -- (config.persist, config.resume and config.resume) + assert.is_same(response.result,3) + done() + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + receivedCount = 2 + } + }, + id = 3, + })) + + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + receivedCount = 1 + } + }, + id = 2, + })) + + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + persist = 0.001 + }, + id = 1, + })) + end) + + + it('a peer can configure persist and resume and missed messages are resend by daemon twice',function(done) + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + if #response > 0 then + response = response[1] + end + assert.is_string(response.result) + local resume_id = response.result + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response,{ + { + result = 5, -- five messages received by daemon yet (assoc. to the persist id) + id = 5, -- config.resume id + }, + { + result = true, -- fetch set up successfully + id = 3, + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 123, + event = 'add', + } + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 234, + event = 'change', + } + }, + { + result = true, -- change notification was success + id = 4 + } + }) + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response,{ + { + result = 6, -- six messages received by daemon yet (assoc. to the persist id) + id = 6, -- config.resume id + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 123, + event = 'add', + } + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 234, + event = 'change', + } + }, + { + result = true, -- change notification was success + id = 4 + } + }) + -- close the socket and wait some time + -- to let the daemon cleanup + -- the dummy states assoc with this + -- sock. should be improved be using + -- *some* callback + sock:close() + ev.Timer.new(function() + done() + end,0.01):start(ev.Loop.default) + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + -- pretend only 2 more messages are + -- received (config and fetch setup) + receivedCount = 2+2 + } + }, + id = 6, + })) + + end)) + + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + -- pretend only config and add responses have been receveived + receivedCount = 2 + } + }, + id = 5, + })) + + end)) + message_socket:send(cjson.encode({ + { + method = 'config', + params = { + persist = 0.001 + }, + id = 1, + }, + { + method = 'add', + params = { + path = 'dummy', + value = 123 + }, + id = 2, + }, + { + method = 'fetch', + params = { + id = 'fetchy', + matches = {'.*'} + }, + id = 3, + }, + { + method = 'change', + params = { + path = 'dummy', + value = 234 + }, + id = 4, + }, + })) + end) + + it('a peer can configure persist and resume and missed messages are resend by daemon',function(done) + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + if #response > 0 then + response = response[1] + end + assert.is_string(response.result) + local resume_id = response.result + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response,{ + { + result = 5, -- five messages received by daemon yet (assoc. to the persist id) + id = 5, -- config.resume id + }, + { + result = true, -- fetch set up successfully + id = 3, + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 123, + event = 'add', + } + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 234, + event = 'change', + } + }, + { + result = true, -- change notification was success + id = 4 + } + }) + done() + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + -- pretend only config and add responses have been receveived + receivedCount = 2 + } + }, + id = 5, + })) + + end)) + message_socket:send(cjson.encode({ + { + method = 'config', + params = { + persist = 0.001 + }, + id = 1, + }, + { + method = 'add', + params = { + path = 'dummy', + value = 123 + }, + id = 2, + }, + { + method = 'fetch', + params = { + id = 'fetchy', + matches = {'.*'} + }, + id = 3, + }, + { + method = 'change', + params = { + path = 'dummy', + value = 234 + }, + id = 4, + }, + })) + end) + + local req_resp_test = function(desc) local requests = desc.requests local responses = desc.responses diff --git a/spec/peer_spec.lua b/spec/peer_spec.lua index 8d069fe..027e1d9 100644 --- a/spec/peer_spec.lua +++ b/spec/peer_spec.lua @@ -9,15 +9,16 @@ local dt = 0.05 setloop('ev') + + describe( - 'A peer basic tests', + 'When a daemon is running', function() local daemon local peer setup(function() daemon = jetdaemon.new{ port = port, - print = function() end } daemon:start() end) @@ -26,7 +27,7 @@ describe( daemon:stop() end) - it('provides the correct interface',function() + it('provides the correct interface',function(done) local peer = jetpeer.new{port = port} assert.is_true(type(peer) == 'table') assert.is_true(type(peer.state) == 'function') @@ -36,6 +37,10 @@ describe( assert.is_true(type(peer.fetch) == 'function') assert.is_true(type(peer.batch) == 'function') assert.is_true(type(peer.loop) == 'function') + peer:on_error(async(function(err) + assert.is_truthy(err:match('closed')) + done() + end)) peer:close() end) @@ -48,9 +53,95 @@ describe( done() end) } - -- finally(function() peer:close() end) end) + describe( + 'when using persist option', + function() + it('on_connect gets called when using persist option and close callback comes after > 2secs',function(done) + settimeout(3) -- closing takes > 2 secs + local ppeer + ppeer = jetpeer.new + { + port = port, + persist = 0.3, + on_connect = async(function(p) + assert.is_equal(ppeer,p) + ppeer:close(async(function() + done() + end)) + end) + } + end) + + it('daemon can start after peer',function(done) + settimeout(30) -- closing takes > 2 secs + local d2 = jetdaemon.new{ + port = port + 10 + } + local ppeer + ppeer = jetpeer.new + { + port = port + 10, + persist = 0.3, + on_connect = async(function(p) + assert.is_equal(ppeer,p) + ppeer:close(async(function() + done() + end)) + end) + } + ev.Timer.new(function() + d2:start() + end,0.2):start(loop) + finally(function() + d2:stop() + end) + end) + + + it('on_connect gets called when using persist option and close callback comes after > 2secs',function(done) + settimeout(4) -- closing takes > 2 secs + local ppeer + ppeer = jetpeer.new + { + port = port, + persist = 1, + on_connect = async(function(p) + assert.is_equal(ppeer,p) + ppeer:close(nil,true) -- just underlying socket, but dont quit resume loop + ev.Timer.new(async(function() + local some_state + some_state = ppeer:state({ + path = 'popopo', + value = 873 + },{ + error = async(function(err) + assert.is_nil(err or 'should not happen') + end), + success = async(function() + some_state:remove({ + error = async(function(err) + assert.is_nil(err or 'should not happen') + end), + success = async(function() + done() + ppeer:close() + + end) + }) + end) + }) + + end),1):start(loop) + end) + } + end) + + + end) + + it('can add a state',function(done) peer:state( { @@ -149,7 +240,7 @@ describe( end) after_each(function(done) - peer:close() + peer:close(async(done)) end) it( @@ -276,7 +367,7 @@ describe( async(function(fpath,fevent,fdata,fetcher) timer:stop(loop) fetcher:unfetch() - assert.is_falsy('should not happen'..fpath) + assert.is_falsy('should not happen '..fpath) done() end)) timer = ev.Timer.new(async(function() @@ -488,8 +579,8 @@ describe( } end) - after_each(function() - peer:close() + after_each(function(done) + peer:close(async(function() done() end)) end) it('set gets timeout error',function(done) @@ -1110,7 +1201,6 @@ if ipv6_localhost_addr then daemon = jetdaemon.new{ port = port, interface = ipv6_localhost_addr, - print = function() end } daemon:start() end) diff --git a/spec/utils_spec.lua b/spec/utils_spec.lua index 3403bc6..d94962b 100644 --- a/spec/utils_spec.lua +++ b/spec/utils_spec.lua @@ -48,5 +48,18 @@ describe( assert.is_same(map({person={age=32},hobby='guitar'}),{age=32,work='guitar'}) end) + it('remove works',function() + local t = {2,5,6} + local found = utils.remove(t,5) + assert.is_true(found) + assert.is_same({2,6},t) + + local t = {2,5,6} + local found = utils.remove(t,4) + assert.is_false(found) + assert.is_same({2,5,6},t) + + end) + end) diff --git a/src/jet.lua b/src/jet.lua index 930bb21..4121c89 100644 --- a/src/jet.lua +++ b/src/jet.lua @@ -4,7 +4,8 @@ local daemon = require'jet.daemon' local jet = { peer = peer, daemon = daemon, - new = peer.new + new = peer.new, + _VERSION = '0.10' } return jet diff --git a/src/jet/daemon.lua b/src/jet/daemon.lua index 6461ffc..3a42935 100644 --- a/src/jet/daemon.lua +++ b/src/jet/daemon.lua @@ -28,7 +28,6 @@ local response_timeout = jutils.response_timeout local internal_error = jutils.internal_error local parse_error = jutils.parse_error local method_not_found = jutils.method_not_found - local is_empty_table = jutils.is_empty_table --- creates and returns a new daemon instance. @@ -57,6 +56,7 @@ local create_daemon = function(options) -- with original id and receiver (peer) and request -- timeout timer. local routes = {} + local resumables = {} -- global for tracking the neccassity of lower casing -- paths on publish @@ -87,7 +87,7 @@ local create_daemon = function(options) local publish = function(path,event,value,element) local lpath = has_case_insensitives and path:lower() for fetcher in pairs(element.fetchers) do - local ok,err = pcall(fetcher,path,lpath,event,value) + local ok,err = pcall(fetcher.op,path,lpath,event,value) if not ok then crit('publish failed',err,path,event) end @@ -164,9 +164,9 @@ local create_daemon = function(options) local fetch = function(peer,message) local params = message.params local fetch_id = checked(params,'id','string') - local queue_notification + local params_ok,fetcher local notify = function(nparams) - queue_notification(nparams) + fetcher.queue(nparams) end local sorter_ok,sorter,flush = pcall(jsorter.new,params,notify) local initializing = true @@ -177,14 +177,14 @@ local create_daemon = function(options) sorter(nparams,initializing) end end - local params_ok,fetcher,is_case_insensitive = pcall(jfetcher.new,params,notify) + params_ok,fetcher = pcall(jfetcher.new,params,notify) if not params_ok then error(invalid_params({fetchParams = params, reason = fetcher})) end peer.fetchers[fetch_id] = fetcher - if is_case_insensitive then + if fetcher.is_case_insensitive then case_insensitives[fetcher] = true has_case_insensitives = true end @@ -198,16 +198,16 @@ local create_daemon = function(options) end end - local cq = peer.queue - queue_notification = function(nparams) - cq(peer,{ + fetcher.queue = function(nparams) + peer:queue({ method = fetch_id, params = nparams, }) end + local fetchop = fetcher.op for path,element in pairs(elements) do - local may_have_interest = fetcher(path,has_case_insensitives and path:lower(),'add',element.value) + local may_have_interest = fetchop(path,has_case_insensitives and path:lower(),'add',element.value) if may_have_interest then element.fetchers[fetcher] = true end @@ -257,12 +257,16 @@ local create_daemon = function(options) local timeout = optional(params,'timeout','number') or 5 local element = elements[path] if element then - local id local mid = message.id + local req = {} if mid then rcount = (rcount + 1) % 2^31 local timer = new_timer(function() - routes[id] = nil + local element = elements[path] + if element then + element.peer:cancel_request(req) + end + routes[req.id] = nil peer:queue({ id = mid, error = response_timeout(params), @@ -270,20 +274,16 @@ local create_daemon = function(options) peer:flush() end,timeout) timer:start(loop) - id = tostring(mid)..tostring(peer)..rcount - assert(not routes[id]) + req.id = tostring(mid)..tostring(peer)..rcount + assert(not routes[req.id]) -- save route to forward reply - routes[id] = { + routes[req.id] = { receiver = peer, id = mid, timer = timer, } end - local req = { - id = id,-- maybe nil - method = path, - } - + req.method = path local value = params.value if value ~= nil then req.params = {value = value} @@ -325,7 +325,7 @@ local create_daemon = function(options) -- don't depend on the value of the element). for peer in pairs(peers) do for _,fetcher in pairs(peer.fetchers) do - local ok,may_have_interest = pcall(fetcher,path,lpath,'add',value) + local ok,may_have_interest = pcall(fetcher.op,path,lpath,'add',value) if ok then if may_have_interest then element.fetchers[fetcher] = true @@ -355,21 +355,96 @@ local create_daemon = function(options) local config = function(peer,message) local params = message.params - if params.peer then - peer = nil - for peer_ in pairs(peers) do - if peer_.name == params.peer then - peer = peer_ - break + + if params.debug ~= nil then + if params.peer then + peer = nil + for peer_ in pairs(peers) do + if peer_.name == params.peer then + peer = peer_ + break + end + end + if not peer then + error('unknown peer') end end - if not peer then - error('unknown peer') - end + peer.debug = params.debug + return end + if params.name then peer.name = params.name + return end + + -- enables message history and makes this peer + -- resumable in case of close/error event + -- returns the unique persist id, which + -- must be used to resume the peer. + if params.persist ~= nil then + peer.message_history = {} + local persist_id = tostring(peer) + peer.persist_id = persist_id + peer.persist_time = tonumber(params.persist) or 120 + resumables[persist_id] = peer + return persist_id + end + + -- if valid resume parameters are passed in, + -- returns the last received message number (not id) + -- and resends all missed messages from history. + -- the peer must have been configured as persistant before. + if params.resume then + local persist_id = checked(params.resume,'id','string') + local received_count = checked(params.resume,'receivedCount','number') + local resumer = resumables[persist_id] + if not resumer then + error(invalid_params({invalidPersistId=persist_id})) + end + resumer.mediated = true + -- check if the daemon has already noticed, that the resumer died + if resumer.release_timer then + resumer.release_timer:stop(loop) + resumer.release_timer:clear_pending(loop) + resumer.release_timer = nil + else + resumer:close() + end + local missed_messages_count = resumer.message_count - received_count + local history = resumer.message_history + local start = #history-missed_messages_count + 1 + if start < 0 then + error(internal_error({historyNotAvailable=missed_messages_count})) + end + resumer:transfer_fetchers(peer) + resumer:transfer_elements(peer) + peer.receive_count = resumer.receive_count + peer.receive_count + if message.id then + peer:queue({ + id = message.id, + result = peer.receive_count, + }) + end + -- this will add messages to peer.message_history + -- during flush. + for i=start,#history do + peer:queue(history[i]) + end + peer.message_history = {} + resumables[persist_id] = peer + peer.persist_id = persist_id + peer.persist_time = resumer.persist_time + peer.flush() + -- the peer message_count must be set here + -- to mimic a continuously growing message_count with respect to receivedCount + peer.message_count = resumer.message_count + if message.id then + peer.message_count = peer.message_count + 1 + end + return nil,true -- set dont_auto_reply true + end + if params.encoding then if params.encoding == 'msgpack' then local ok,cmsgpack = pcall(require,'cmsgpack') @@ -389,9 +464,11 @@ local create_daemon = function(options) peer.encode = cmsgpack.pack peer.decode = cmsgpack.unpack return nil,true -- set dont_auto_reply true + else + error(invalid_params({encodingNotSupported=params.encoding})) end end - peer.debug = params.debug + end local sync = function(f) @@ -528,10 +605,12 @@ local create_daemon = function(options) error = invalid_request(message) }) elseif #message > 0 then + peer.receive_count = peer.receive_count + #message for i,message in ipairs(message) do dispatch_single_message(peer,message) end else + peer.receive_count = peer.receive_count + 1 dispatch_single_message(peer,message) end else @@ -547,56 +626,108 @@ local create_daemon = function(options) local create_peer = function(ops) local peer = {} + peer.receive_count = 0 + local release = function() + for _,fetcher in pairs(peer.fetchers) do + case_insensitives[fetcher] = nil + for _,element in pairs(elements) do + element.fetchers[fetcher] = nil + end + end + has_case_insensitives = not is_empty_table(case_insensitives) + peer.fetchers = {} + peers[peer] = nil + for path,element in pairs(elements) do + if element.peer == peer then + publish(path,'remove',element.value,element) + elements[path] = nil + end + end + flush_peers() + ops.close() + peer = nil + end + peer.transfer_fetchers = function(_,new_peer) + for fetch_id,fetcher in pairs(peer.fetchers) do + fetcher.queue = function(nparams) + new_peer:queue({ + method = fetch_id, + params = nparams + }) + end + end + end + peer.transfer_elements = function(_,new_peer) + for _,element in pairs(elements) do + element.peer = new_peer + end + end peer.release = function(_) if peer then - for _,fetcher in pairs(peer.fetchers) do - case_insensitives[fetcher] = nil - for _,element in pairs(elements) do - element.fetchers[fetcher] = nil - end - end - has_case_insensitives = not is_empty_table(case_insensitives) - peer.fetchers = {} - peers[peer] = nil - for path,element in pairs(elements) do - if element.peer == peer then - publish(path,'remove',element.value,element) - elements[path] = nil - end + if peer.message_history then + resumables[peer.persist_id] = peer + peer.release_timer = ev.Timer.new(function() + peer.release_timer = nil + if not peer.mediated then + resumables[peer.persist_id] = nil + release() + end + end,peer.persist_time or 1) + peer.release_timer:start(loop) + else + release() end - flush_peers() - ops.close() - peer = nil end end peer.close = function(_) peer:flush() ops.close() end + peer.messages = {} peer.queue = function(_,message) - if not peer.messages then - peer.messages = {} - end tinsert(peer.messages,message) end local send = ops.send + peer.message_count = 0 + peer.cancel_request = function(_,req) + -- maybe messages have not been flushed + jutils.remove(peer.messages,req) + -- messages are flushed, but maybe the peer + -- is persistant, so remove from history + local history = peer.message_history + if history then + local found = jutils.remove(history,req) + if found then + peer.message_count = peer.message_count - 1 + end + end + end peer.flush = function(_) - if peer.messages then - local num = #peer.messages - local message + local messages = peer.messages + local num = #messages + peer.message_count = peer.message_count + num + local history = peer.message_history + if history then + for _,message in ipairs(messages) do + tinsert(history,message) + end + local history_num = #history + -- limit history num to 100 + for i=1,(history_num-100) do + tremove(history,1) + end + assert(#history <= 100) + end + if num > 0 and not peer.release_timer then if num == 1 then - message = peer.messages[1] - elseif num > 1 then - message = peer.messages - else - assert(false,'messages must contain at least one element if not nil') + messages = messages[1] end if peer.debug then - debug(peer.name or 'unnamed peer','<-',jencode(message)) + debug(peer.name or 'unnamed peer','<-',jencode(messages)) end - send(peer.encode(message)) - peer.messages = nil + send(peer.encode(messages)) end + peer.messages = {} end peer.fetchers = {} peer.encode = cjson.encode diff --git a/src/jet/daemon/fetcher.lua b/src/jet/daemon/fetcher.lua index 12e7d32..781a6b4 100644 --- a/src/jet/daemon/fetcher.lua +++ b/src/jet/daemon/fetcher.lua @@ -104,9 +104,10 @@ local create_fetcher = function(options,notify) end end - local ci = options.path and options.path.caseInsensitive - - return fetchop,ci + return { + op = fetchop, + is_case_insensitive = options.path and options.path.caseInsensitive, + } end return { diff --git a/src/jet/peer.lua b/src/jet/peer.lua index d4c408e..afbd6cf 100644 --- a/src/jet/peer.lua +++ b/src/jet/peer.lua @@ -3,6 +3,7 @@ local socket = require'socket' local ev = require'ev' local cjson = require'cjson' local jutils = require'jet.utils' +local step = require'step' local tinsert = table.insert local tremove = table.remove @@ -23,13 +24,32 @@ local error_object = function(err) return error end -local new = function(config) +local eps = 2^-40 + +local detach = function(f,loop) + if ev.Idle then + ev.Idle.new(function(loop,io) + io:stop(loop) + f() + end):start(loop) + else + ev.Timer.new(function(loop,io) + io:stop(loop) + f() + end,eps):start(loop) + end +end + +local noop = function() end + +new = function(config) config = config or {} local log = config.log or noop local ip = config.ip or '127.0.0.1' -- localhost' local port = config.port or 11122 local encode = cjson.encode local decode = cjson.decode + local log = config.log or noop if config.sync then local sock = socket.connect(ip,port) if not sock then @@ -83,22 +103,60 @@ local new = function(config) local queue = function(message) tinsert(messages,message) end + local message_count = 0 + local message_history = {} + local pending local will_flush = true - local flush = function(reason) - local n = #messages - if n == 1 then - wsock:send(encode(messages[1])) - elseif n > 1 then - wsock:send(encode(messages)) + local flush + local is_persistant + + if not config.persist then + flush = function(reason) + local n = #messages + if n == 1 then + wsock:send(encode(messages[1])) + elseif n > 1 then + wsock:send(encode(messages)) + end + messages = {} + will_flush = false + end + else + flush = function(reason) + local num = #messages + if not is_persistant then + message_count = message_count + num + end + local history = message_history + if history then + for _,message in ipairs(messages) do + tinsert(history,message) + end + local history_num = #history + -- limit history num to 100 + for i=1,(history_num-100) do + tremove(history,1) + end + assert(#history <= 100) + end + if not pending then + if num == 1 then + wsock:send(encode(messages[1])) + elseif num > 1 then + wsock:send(encode(messages)) + end + end + messages = {} + will_flush = false end - messages = {} - will_flush = false end + local request_dispatchers = {} local response_dispatchers = {} local dispatch_response = function(self,message) local mid = message.id local callbacks = response_dispatchers[mid] + assert(mid,cjson.encode(message)) response_dispatchers[mid] = nil if callbacks then if message.result then @@ -144,6 +202,7 @@ local new = function(config) } end end + local received_count = 0 local dispatch_single_message = function(self,message) if message.method and message.params then dispatch_request(self,message) @@ -161,11 +220,16 @@ local new = function(config) end will_flush = true if message then - if #message > 0 then + local num = #message + if num > 0 then + -- The received count MUST be incremented here for arrays! + -- This is relevant for resuming... + received_count = received_count + num for i,message in ipairs(message) do dispatch_single_message(self,message) end else + received_count = received_count + 1 dispatch_single_message(self,message) end else @@ -180,9 +244,58 @@ local new = function(config) flush('dispatch_message') end wsock:on_message(dispatch_message) - wsock:on_error(log) - wsock:on_close(config.on_close or function() end) + wsock:on_error(config.on_error or noop) + local persist_id + local closing + local connect_sequence + local on_close + local try = {} local j = {} + on_close = function() + if not closing and config.persist and not pending then + messages = {} + pending = true + encode = cjson.encode + decode = cjson.decode + wsock = jsocket.new({ip = ip, port = port, loop = loop}) + wsock:on_message(dispatch_message) + wsock:on_error(config.on_error or noop) + wsock:on_close(on_close) + wsock:on_connect(function() + is_persistant = false + connect_sequence = step.new({ + try = try, + catch = function(err) + j:close() + end, + finally = function() + if config.on_connect then + config.on_connect(j) + config.on_connect = nil + end + flush('on_connect') + end + }) + + connect_sequence() + flush('resume') + end) + + ev.Timer.new(function(loop,io) + if pending and not closing then + wsock:connect() + else + io:stop(loop) + end + end,0.5,0.5):start(loop) + end + + if config.on_close then + config.on_close() + end + end + + wsock:on_close(on_close) j.loop = function() loop:loop() @@ -192,9 +305,30 @@ local new = function(config) on_no_dispatcher = f end - j.close = function(self,options) + j.on_error = function(_,f) + wsock:on_error(f) + end + + j.close = function(self,done,debug_resume) flush('close') wsock:close() + if debug_resume then + return + end + closing = true + if done then + if config.persist then + -- the daemon keeps states for config.persist seconds. + -- during this time, the states / paths are still blocked + -- by this peer. wait some seconds more and asume + -- all peer related resources are freed by the daemon. + ev.Timer.new(function() + done() + end,config.persist + 2):start(loop) + else + detach(done,loop) + end + end end local id = 0 @@ -579,30 +713,100 @@ local new = function(config) cmsgpack = require'cmsgpack' end - wsock:on_connect(function() - if config.name or config.encoding then - j:config({ - name = config.name, - encoding = config.encoding - },{ + if config.persist then + table.insert(try,function(step) + if not persist_id then + j:config({persist=config.persist},{ + success = function(pid) + persist_id = pid + is_persistant = true + step.success() + end, + error = function(err) + step.error(err) + end + }) + else + j:config({resume={ + id = persist_id, + receivedCount = received_count + }},{ + success = function(received_by_daemon_count) + flush('resume') + pending = false + is_persistant = true + local missed_messages_count = message_count - received_by_daemon_count + local history = message_history + -- the last message in history is "config.resume" + -- skip that! + local start = #history-missed_messages_count-1 + local stop = #history-1 + if start < 0 then + step.error(internal_error(historyNotAvailable)) + end + local missed = {} + for i=start,stop do + tinsert(missed,history[i]) + end + if #missed > 0 then + wsock:send(encode(missed)) + end + step.success() + end, + error = function(err) + step.error(err) + end + }) + end + end) + + end + + if config.name then + table.insert(try,function(step) + j:config({name=config.name},step) + flush('name') + end) + end + + if config.encoding then + table.insert(try,function(step) + j:config({encoding=config.encoding},{ success = function() - flush('config') + flush('encoding') if config.encoding then encode = cmsgpack.pack decode = cmsgpack.unpack end - if config.on_connect then - config.on_connect(j) - end + step.success() end, error = function(err) - j:close() + step.error(err) end }) - elseif config.on_connect then - config.on_connect(j) + end) + end + + connect_sequence = step.new({ + try = try, + catch = function(err) + if not config.persist then + j:close() + end + end, + finally = function() + if config.on_connect then + config.on_connect(j) + config.on_connect = nil + end + flush('on_connect') end - flush('on_connect') + }) + + + wsock:on_connect(function() + connect_sequence() + flush('config') end) wsock:connect() diff --git a/src/jet/socket.lua b/src/jet/socket.lua index c6e3c8f..9acbdc6 100644 --- a/src/jet/socket.lua +++ b/src/jet/socket.lua @@ -27,6 +27,7 @@ end local wrap = function(sock,args) assert(sock) args = args or {} + -- set non blocking sock:settimeout(0) -- send message asap diff --git a/src/jet/utils.lua b/src/jet/utils.lua index 5fe75f6..d8d03e0 100644 --- a/src/jet/utils.lua +++ b/src/jet/utils.lua @@ -132,6 +132,19 @@ local mapper = function(field_str_map) end end +-- searches an element in an array and removes +-- it if found. +-- if element has been removed returns true. +local remove = function(array,value) + for index,val in ipairs(array) do + if val == value then + table.remove(array,index) + return true + end + end + return false +end + return { noop = noop, is_empty_table = is_empty_table, @@ -144,5 +157,6 @@ return { access_field = access_field, equals_deep = equals_deep, mapper = mapper, + remove = remove, } diff --git a/test_persist.md b/test_persist.md new file mode 100644 index 0000000..4218642 --- /dev/null +++ b/test_persist.md @@ -0,0 +1,17 @@ +# Testing persistance feature + +For manually "testing" play with this (preferably in different terminals): + +```shell +$ jetd.lua +$ lua examples/fetch.lua '{}' 172.19.1.41 +$ lua examples/persistant_ticker.lua localhost +$ sudo ifconfig lo down +$ sudo ifconfig lo up +``` + +The idea is to force connections to be closed by shutting down network interfaces. +Both fetch.lua and persistant_ticker.lua are running with persist option and should +automatically reconnect to jetd. + +Better, automated test welcome!!!! \ No newline at end of file