From 88b4ca8295da4fc31a9a84eb4996e4e116c55598 Mon Sep 17 00:00:00 2001 From: niean Date: Thu, 18 Jun 2015 04:02:13 +0000 Subject: [PATCH] add test/rpcclient.py for python client --- .gitignore | 3 ++- README.md | 2 +- receiver/rpc/rpc_transfer.go | 2 +- test/rpcclient.py | 49 ++++++++++++++++++++++++++++++++++++ 4 files changed, 53 insertions(+), 3 deletions(-) create mode 100644 test/rpcclient.py diff --git a/.gitignore b/.gitignore index c2134f6..1b87594 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,7 @@ _testmain.go /var /falcon-transfer* /cfg.json -/test +/test/build +/test/*.go gitversion diff --git a/README.md b/README.md index 544183d..d3d26d2 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ curl -s -X POST -d "[{\"metric\":\"$m\", \"endpoint\":\"$e\", \"timestamp\":$ts, - enable: true/false, 表示是否开启该jsonrpc数据接收端口, Agent发送数据使用的就是该端口 - listen: 表示监听的http端口 - socket + socket #即将被废弃,请避免使用 - enable: true/false, 表示是否开启该telnet方式的数据接收端口,这是为了方便用户一行行的发送数据给transfer - listen: 表示监听的http端口 diff --git a/receiver/rpc/rpc_transfer.go b/receiver/rpc/rpc_transfer.go index 0bce0e3..cc286b6 100644 --- a/receiver/rpc/rpc_transfer.go +++ b/receiver/rpc/rpc_transfer.go @@ -21,7 +21,7 @@ type TransferResp struct { } func (t *TransferResp) String() string { - s := fmt.Sprintf("TransferResp total=%d, err_invalid=%d, latency=%dus", + s := fmt.Sprintf("TransferResp total=%d, err_invalid=%d, latency=%dms", t.Total, t.ErrInvalid, t.Latency) if t.Msg != "" { s = fmt.Sprintf("%s, msg=%s", s, t.Msg) diff --git a/test/rpcclient.py b/test/rpcclient.py new file mode 100644 index 0000000..a2beade --- /dev/null +++ b/test/rpcclient.py @@ -0,0 +1,49 @@ +import json +import socket +import itertools +import time + +class RPCClient(object): + + def __init__(self, addr, codec=json): + self._socket = socket.create_connection(addr) + self._id_iter = itertools.count() + self._codec = codec + + def _message(self, name, *params): + return dict(id=self._id_iter.next(), + params=list(params), + method=name) + + def call(self, name, *params): + req = self._message(name, *params) + id = req.get('id') + + mesg = self._codec.dumps(req) + self._socket.sendall(mesg) + + # This will actually have to loop if resp is bigger + resp = self._socket.recv(4096) + resp = self._codec.loads(resp) + + if resp.get('id') != id: + raise Exception("expected id=%s, received id=%s: %s" + %(id, resp.get('id'), resp.get('error'))) + + if resp.get('error') is not None: + raise Exception(resp.get('error')) + + return resp.get('result') + + def close(self): + self._socket.close() + + +if __name__ == '__main__': + rpc = RPCClient(("127.0.0.1", 8433)) + for i in xrange(10000): + mv1 = dict(endpoint='host.niean', metric='metric.niean.1', value=i, step=60, + counterType='GAUGE', tags='tag=t'+str(i), timestamp=int(time.time())) + mv2 = dict(endpoint='host.niean', metric='metric.niean.2', value=i, step=60, + counterType='COUNTER', tags='tag=t'+str(i), timestamp=int(time.time())) + print rpc.call("Transfer.Update", [mv1, mv2])