diff --git a/README.md b/README.md index 3d101e5..0c068c9 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ com.github.tohodog qsrpc-starter - 1.0.1 + 1.0.2 ``` @@ -85,9 +85,9 @@ public String hello() { ``` ## Future - * 消息发送支持异步 - * 服务统计 + * 消息发送支持异步(WebFlux) * 断路器策略 + * 服务统计治理 * ... ## Test @@ -98,8 +98,48 @@ public String hello() { | i3-8100(4-core/4-thread)| 10w(8-thread) | 4331ms | 23089 | | i7-8700(6-core/12-thread) | 30w(24-thread) | 6878ms | 43617 | + + +## QSRPC项目技术选型及简介 +### 1.TCP通信 +#### 1.1 连接模式: + 本项目tcp通信使用长连接+全双工通信(两边可以同时收/发消息),可以保证更大的吞吐量/更少的连接数资源占用,理论上使用一个tcp连接即可满足通信(详见pool),如果使用http/1.1协议的请求-响应模式,同一个连接在同一个时刻只有有一个消息进行传输,如果有大量请求将会阻塞或者需要开更多tcp连接来解决 +#### 1.2 协议: +|TCP|长度|消息ID|协议号|加密/压缩|内容|包尾| +|:----:|:----:|:----:|:----:|:----:|:----:|:----:| +| Byte | 4 | 4 | 1 | 1(4bit+4bit) | n | 2 | + + 首先,使用长连接那就需要解决tcp粘包问题,常见的两种方式: + * 包头长度:优点最简单,也是最高效的,缺点是无法感知丢包,会导致后续所有包错乱 + * 特定包尾:优点能感知丢包,不影响后续包,缺点需要遍历所有字节,切不能与包内容冲突 +
+ 综上,本框架使用的是包头长度+特定包尾,结合了两者优点,避免了缺点,高效实用,检测到包错误会自动断开. +没有使用校检码转码等,因为需要考虑实际情况,内网里出错概率非常低,出错了也能重连,对于RPC框架追求性能来说是合适的,即使是外网,后续有需求可以增加校验加密协议 +
+ 其次,因为支持全双工那就需要解决消息回调问题,本协议使用了一个消息ID,由客户端生成,服务端返回消息带上;由于发送和接收是非线性的,所以客户端需要维护一个回调池,以ID为key,value为此次请求的context(callback),因为是异步的,请求有可能没有响应,所以池需要有超时机制 + +#### 1.3 压缩/加密: + 当出现带宽不足而CPU性能有余时,压缩就派上用场了,用时间换空间。目前支持了snappy/gzip两种压缩,snappy应用于google的rpc上,具有高速压缩速度和合理的压缩率,gzip速度次于snappy,但压缩率较高,根据实际情况配置,前提必须是带宽出现瓶颈/要求,否则不需要开启压缩 +
 加密功能计划中(加盐位算法) +#### 1.4 IO框架: +网络IO目前是基于netty搭建的,支持nio,zero-copy等特性,由于本框架连接模式使用长连接,连接数固定且较少,所以本框架性能对于IO框架(BIO/NIO/AIO)并不是很敏感,netty对于http,iot服务这种有大量连接数的优势就很大了 + + +### 2. Tcp pool + 前面说了一个tcp连接即可支撑通信,为啥又用pool了呢,原因有两个:1. netty工作线程对于同一个连接使用同一个线程来处理的,所以如果客户端发送大量请求时,服务端只有一个线程在处理导致性能问题,起初是想服务端再把消息分发到线程池,但后续测试发现此操作在高并发下会导致延迟增大,因为又把消息放回线程池排队了。2. 相对于一条tcp链接,使用pool会更加灵活,且连接数也很少,并没有性能影响; 本框架还基于pool实现了一个[请求-响应]的通信模式* +
+ 客户端Pool的maxIdle(maxActive)=服务节点配置的CPU线程数*2=服务节点netty的工作线程数,pool采用FIFO先行先出的策略,可以保证在高并发下均匀的使用tcp连接,服务端就不用再次分发消息了 +### 3. 服务注册发现 + 分布式系统中都需要一个配置/服务中心,才能进行统一管理.本框架目前使用zookeeper(后面会支持nacos)进行服务注册,zookeeper是使用类似文件目录的结构,每个目录都可以存一个data +
 节点注册是使用[IP:PROT_TIME]作为目录名,data存了节点的json数据,创建模式为EPHEMERAL_SEQUENTIAL(断开后会删除该目录),这样就达到了自动监听节点上下线的效果,加入时间戳是为了解决当节点快速重启时,注册了两个目录,便于进行区分处理 +
 客户端通过watch目录变化信息,从而获取到所有服务节点信息,同步一个副本到本地Map里(需加上读写锁),客户端就可以实现调用对应的服务了 + + ## Log +### v1.0.2(2020-11-26) + * 客户端支持选择调用指定节点 + * 异常处理优化 ### v1.0.1(2020-11-23) * Upgrade dependencies ### v0.1.0(2020-11-16) @@ -118,7 +158,7 @@ public String hello() { [starsvg]: https://img.shields.io/github/stars/tohodog/QSRPC-starter.svg?style=social&label=Stars [star]: https://github.com/tohodog/QSRPC-starter -[QSRPCsvg]: https://img.shields.io/badge/QSRPC-1.1.1-blue.svg +[QSRPCsvg]: https://img.shields.io/badge/QSRPC-1.1.2-blue.svg [QSRPC]: https://github.com/tohodog/QSRPC -[QSRPCstarter-svg]: https://img.shields.io/badge/QSRPC%20starter-1.0.1-origen.svg +[QSRPCstarter-svg]: https://img.shields.io/badge/QSRPC%20starter-1.0.2-origen.svg diff --git a/pom.xml b/pom.xml index c121967..677e2e8 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.tohodog qsrpc-starter - 1.0.1 + 1.0.2 qsrpc-starter qsrpc-starter @@ -14,7 +14,7 @@ UTF-8 - 1.1.1 + 1.1.2 1.7.2 2.1 3.1 diff --git a/src/main/java/com/qinsong/rpc/client/QSRpcPorxy.java b/src/main/java/com/qinsong/rpc/client/QSRpcPorxy.java index c35ea06..20ebc26 100644 --- a/src/main/java/com/qinsong/rpc/client/QSRpcPorxy.java +++ b/src/main/java/com/qinsong/rpc/client/QSRpcPorxy.java @@ -21,8 +21,9 @@ public class QSRpcPorxy implements MethodInterceptor { private Class target; // 代理对象接口 private QSRpcReference qsRpcReference; - private String interfaceName; + private String interfaceName, version; private int timeout; + private String action;//选择action private ISerialize iSerialize; @@ -31,9 +32,20 @@ public QSRpcPorxy(Class target, QSRpcReference qsRpcReference, ISerialize iSeria this.target = target; this.qsRpcReference = qsRpcReference; this.iSerialize = iSerialize; + //初始化数据 + interfaceName = target.getName(); + if (qsRpcReference.value().isEmpty()) { + version = qsRpcReference.version(); + } else { + version = qsRpcReference.value(); + } + if (qsRpcReference.ip_port().isEmpty()) { + action = interfaceName + qsRpcReference.value(); + } else { + action = qsRpcReference.ip_port(); + } timeout = qsRpcReference.timeout(); - interfaceName = target.getName(); if (timeout <= 0) timeout = RPCClientManager.RpcTimeout; } @@ -49,16 +61,11 @@ public Object intercept(Object o, Method method, Object[] objects, MethodProxy m Request request = new Request(); request.setInterfaceName(interfaceName); - if (qsRpcReference.value().isEmpty()) - request.setVersion(qsRpcReference.version()); - else - request.setVersion(qsRpcReference.value()); + request.setVersion(version); request.setMethodName(method.toString()); request.setParameters(objects); - - byte[] bytes = RPCClientManager.getInstance().sendSync(request.getInterfaceName() + request.getVersion(), - iSerialize.serialize(request), timeout); + byte[] bytes = RPCClientManager.getInstance().sendSync(action, iSerialize.serialize(request), timeout); Response response = iSerialize.deserialize(bytes, Response.class); if (response.getException() != null) { throw response.getException(); diff --git a/src/main/java/com/qinsong/rpc/client/QSRpcReference.java b/src/main/java/com/qinsong/rpc/client/QSRpcReference.java index a9cecb9..cf1ec32 100644 --- a/src/main/java/com/qinsong/rpc/client/QSRpcReference.java +++ b/src/main/java/com/qinsong/rpc/client/QSRpcReference.java @@ -25,4 +25,6 @@ int timeout() default -1;//默认60s + String ip_port() default "";//指定要请求的服务端127.0.0.1:8080 + } diff --git a/src/main/java/com/qinsong/rpc/common/exp/NotFoundException.java b/src/main/java/com/qinsong/rpc/common/exp/NotFoundException.java new file mode 100644 index 0000000..52c65ce --- /dev/null +++ b/src/main/java/com/qinsong/rpc/common/exp/NotFoundException.java @@ -0,0 +1,14 @@ +package com.qinsong.rpc.common.exp; + +/** + * Created by QSong + * Contact github.com/tohodog + * Date 2020/11/25 + * 未发现服务接口 + */ +public class NotFoundException extends Exception { + + public NotFoundException() { + super("NotFoundException"); + } +} diff --git a/src/main/java/com/qinsong/rpc/common/exp/UnavailableException.java b/src/main/java/com/qinsong/rpc/common/exp/UnavailableException.java new file mode 100644 index 0000000..f6e862f --- /dev/null +++ b/src/main/java/com/qinsong/rpc/common/exp/UnavailableException.java @@ -0,0 +1,13 @@ +package com.qinsong.rpc.common.exp; + +/** + * Created by QSong + * Contact github.com/tohodog + * Date 2020/11/25 + * 拒绝处理 + */ +public class UnavailableException extends Exception { + public UnavailableException() { + super("UnavailableException"); + } +} diff --git a/src/main/java/com/qinsong/rpc/server/CacheResponse.java b/src/main/java/com/qinsong/rpc/server/CacheResponse.java index 7281501..f0672d4 100644 --- a/src/main/java/com/qinsong/rpc/server/CacheResponse.java +++ b/src/main/java/com/qinsong/rpc/server/CacheResponse.java @@ -1,5 +1,7 @@ package com.qinsong.rpc.server; +import com.qinsong.rpc.common.exp.NotFoundException; +import com.qinsong.rpc.common.exp.UnavailableException; import com.qinsong.rpc.common.serialize.Response; import com.qinsong.rpc.common.serialize.ISerialize; @@ -24,11 +26,11 @@ public CacheResponse(ISerialize iSerialize) { private void init() { Response nofound = new Response(); - nofound.setException(new Exception("nofound")); + nofound.setException(new NotFoundException()); map.put("nofound", iSerialize.serialize(nofound)); Response unavailable = new Response(); - unavailable.setException(new Exception("unavailable")); + unavailable.setException(new UnavailableException()); map.put("unavailable", iSerialize.serialize(unavailable)); Response empty = new Response(); @@ -47,13 +49,4 @@ public byte[] empty() { return map.get("empty"); } - public byte[] error(Exception e) { - byte[] bytes = map.get(e.toString()); - if (bytes == null) { - Response err = new Response(); - err.setException(e); - map.put(e.toString(), bytes = iSerialize.serialize(err)); - } - return bytes; - } } diff --git a/src/main/java/com/qinsong/rpc/server/RpcServiceLauncher.java b/src/main/java/com/qinsong/rpc/server/RpcServiceLauncher.java index bf3c48b..99435ba 100644 --- a/src/main/java/com/qinsong/rpc/server/RpcServiceLauncher.java +++ b/src/main/java/com/qinsong/rpc/server/RpcServiceLauncher.java @@ -69,9 +69,17 @@ public byte[] onMessage(Async async, byte[] message) { } try { return onHandle(message); - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); - return cacheResponse.error(e); + //获取业务抛出的异常 + if (e instanceof InvocationTargetException && e.getCause() != null) e = e.getCause(); + + String msg = e.getMessage(); + if (msg == null || msg.isEmpty()) msg = e.toString(); + Response err = new Response(); + //统一返回Exception,防止客户端没有这个错误类,序列化失败 + err.setException(new Exception(msg)); + return iSerialize.serialize(err); } } });