Skip to content

Commit

Permalink
Merge branch 'master' into master_github
Browse files Browse the repository at this point in the history
# Conflicts:
#	README.md
  • Loading branch information
tohodog committed Nov 26, 2020
2 parents 372f762 + f5f704b commit 6e95294
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 29 deletions.
50 changes: 45 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<dependency>
<groupId>com.github.tohodog</groupId>
<artifactId>qsrpc-starter</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
</dependency>
```

Expand Down Expand Up @@ -85,9 +85,9 @@ public String hello() {
```

## Future
* 消息发送支持异步
* 服务统计
* 消息发送支持异步(WebFlux)
* 断路器策略
* 服务统计治理
* ...

## Test
Expand All @@ -98,8 +98,48 @@ public String hello() {
| i3-8100(4-core/4-thread)| 10w(8-thread) | 4331ms | 23089 |
| i7-8700(6-core/12-thread) | 30w(24-thread) | 6878ms | 43617 |



## QSRPC项目技术选型及简介
### 1.TCP通信
#### 1.1 连接模式:
 本项目tcp通信使用长连接+全双工通信(两边可以同时收/发消息),可以保证更大的吞吐量/更少的连接数资源占用,理论上使用一个tcp连接即可满足通信(详见pool),如果使用http/1.1协议的请求-响应模式,同一个连接在同一个时刻只有有一个消息进行传输,如果有大量请求将会阻塞或者需要开更多tcp连接来解决
#### 1.2 协议:
|TCP|长度|消息ID|协议号|加密/压缩|内容|包尾|
|:----:|:----:|:----:|:----:|:----:|:----:|:----:|
| Byte | 4 | 4 | 1 | 1(4bit+4bit) | n | 2 |

 首先,使用长连接那就需要解决tcp粘包问题,常见的两种方式:
* 包头长度:优点最简单,也是最高效的,缺点是无法感知丢包,会导致后续所有包错乱
* 特定包尾:优点能感知丢包,不影响后续包,缺点需要遍历所有字节,切不能与包内容冲突
<br/>
 综上,本框架使用的是包头长度+特定包尾,结合了两者优点,避免了缺点,高效实用,检测到包错误会自动断开.
没有使用校检码转码等,因为需要考虑实际情况,内网里出错概率非常低,出错了也能重连,对于RPC框架追求性能来说是合适的,即使是外网,后续有需求可以增加校验加密协议
<br/>
 其次,因为支持全双工那就需要解决消息回调问题,本协议使用了一个消息ID,由客户端生成,服务端返回消息带上;由于发送和接收是非线性的,所以客户端需要维护一个回调池,以ID为key,value为此次请求的context(callback),因为是异步的,请求有可能没有响应,所以池需要有超时机制

#### 1.3 压缩/加密:
 当出现带宽不足而CPU性能有余时,压缩就派上用场了,用时间换空间。目前支持了snappy/gzip两种压缩,snappy应用于google的rpc上,具有高速压缩速度和合理的压缩率,gzip速度次于snappy,但压缩率较高,根据实际情况配置,前提必须是带宽出现瓶颈/要求,否则不需要开启压缩
<br/> 加密功能计划中(加盐位算法)
#### 1.4 IO框架:
网络IO目前是基于netty搭建的,支持nio,zero-copy等特性,由于本框架连接模式使用长连接,连接数固定且较少,所以本框架性能对于IO框架(BIO/NIO/AIO)并不是很敏感,netty对于http,iot服务这种有大量连接数的优势就很大了


### 2. Tcp pool
 前面说了一个tcp连接即可支撑通信,为啥又用pool了呢,原因有两个:1. netty工作线程对于同一个连接使用同一个线程来处理的,所以如果客户端发送大量请求时,服务端只有一个线程在处理导致性能问题,起初是想服务端再把消息分发到线程池,但后续测试发现此操作在高并发下会导致延迟增大,因为又把消息放回线程池排队了。2. 相对于一条tcp链接,使用pool会更加灵活,且连接数也很少,并没有性能影响; 本框架还基于pool实现了一个[请求-响应]的通信模式*
<br>
 客户端Pool的maxIdle(maxActive)=服务节点配置的CPU线程数*2=服务节点netty的工作线程数,pool采用FIFO先行先出的策略,可以保证在高并发下均匀的使用tcp连接,服务端就不用再次分发消息了
### 3. 服务注册发现
 分布式系统中都需要一个配置/服务中心,才能进行统一管理.本框架目前使用zookeeper(后面会支持nacos)进行服务注册,zookeeper是使用类似文件目录的结构,每个目录都可以存一个data
<br> 节点注册是使用[IP:PROT_TIME]作为目录名,data存了节点的json数据,创建模式为EPHEMERAL_SEQUENTIAL(断开后会删除该目录),这样就达到了自动监听节点上下线的效果,加入时间戳是为了解决当节点快速重启时,注册了两个目录,便于进行区分处理
<br> 客户端通过watch目录变化信息,从而获取到所有服务节点信息,同步一个副本到本地Map里(需加上读写锁),客户端就可以实现调用对应的服务了



## Log
### v1.0.2(2020-11-26)
* 客户端支持选择调用指定节点
* 异常处理优化
### v1.0.1(2020-11-23)
* Upgrade dependencies
### v0.1.0(2020-11-16)
Expand All @@ -118,7 +158,7 @@ public String hello() {
[starsvg]: https://img.shields.io/github/stars/tohodog/QSRPC-starter.svg?style=social&label=Stars
[star]: https://github.com/tohodog/QSRPC-starter

[QSRPCsvg]: https://img.shields.io/badge/QSRPC-1.1.1-blue.svg
[QSRPCsvg]: https://img.shields.io/badge/QSRPC-1.1.2-blue.svg
[QSRPC]: https://github.com/tohodog/QSRPC

[QSRPCstarter-svg]: https://img.shields.io/badge/QSRPC%20starter-1.0.1-origen.svg
[QSRPCstarter-svg]: https://img.shields.io/badge/QSRPC%20starter-1.0.2-origen.svg
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@

<groupId>com.github.tohodog</groupId>
<artifactId>qsrpc-starter</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>

<name>qsrpc-starter</name>
<description>qsrpc-starter</description>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.qsrpc>1.1.1</version.qsrpc>
<version.qsrpc>1.1.2</version.qsrpc>
<version.protostuff>1.7.2</version.protostuff>
<version.objenesis>2.1</version.objenesis>
<version.cglib>3.1</version.cglib>
Expand Down
25 changes: 16 additions & 9 deletions src/main/java/com/qinsong/rpc/client/QSRpcPorxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ public class QSRpcPorxy implements MethodInterceptor {
private Class target; // 代理对象接口

private QSRpcReference qsRpcReference;
private String interfaceName;
private String interfaceName, version;
private int timeout;
private String action;//选择action

private ISerialize iSerialize;

Expand All @@ -31,9 +32,20 @@ public QSRpcPorxy(Class target, QSRpcReference qsRpcReference, ISerialize iSeria
this.target = target;
this.qsRpcReference = qsRpcReference;
this.iSerialize = iSerialize;
//初始化数据
interfaceName = target.getName();
if (qsRpcReference.value().isEmpty()) {
version = qsRpcReference.version();
} else {
version = qsRpcReference.value();
}
if (qsRpcReference.ip_port().isEmpty()) {
action = interfaceName + qsRpcReference.value();
} else {
action = qsRpcReference.ip_port();
}

timeout = qsRpcReference.timeout();
interfaceName = target.getName();
if (timeout <= 0) timeout = RPCClientManager.RpcTimeout;
}

Expand All @@ -49,16 +61,11 @@ public Object intercept(Object o, Method method, Object[] objects, MethodProxy m

Request request = new Request();
request.setInterfaceName(interfaceName);
if (qsRpcReference.value().isEmpty())
request.setVersion(qsRpcReference.version());
else
request.setVersion(qsRpcReference.value());
request.setVersion(version);
request.setMethodName(method.toString());
request.setParameters(objects);


byte[] bytes = RPCClientManager.getInstance().sendSync(request.getInterfaceName() + request.getVersion(),
iSerialize.serialize(request), timeout);
byte[] bytes = RPCClientManager.getInstance().sendSync(action, iSerialize.serialize(request), timeout);
Response response = iSerialize.deserialize(bytes, Response.class);
if (response.getException() != null) {
throw response.getException();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/qinsong/rpc/client/QSRpcReference.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@

int timeout() default -1;//默认60s

String ip_port() default "";//指定要请求的服务端127.0.0.1:8080

}
14 changes: 14 additions & 0 deletions src/main/java/com/qinsong/rpc/common/exp/NotFoundException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.qinsong.rpc.common.exp;

/**
* Created by QSong
* Contact github.com/tohodog
* Date 2020/11/25
* 未发现服务接口
*/
public class NotFoundException extends Exception {

public NotFoundException() {
super("NotFoundException");
}
}
13 changes: 13 additions & 0 deletions src/main/java/com/qinsong/rpc/common/exp/UnavailableException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.qinsong.rpc.common.exp;

/**
* Created by QSong
* Contact github.com/tohodog
* Date 2020/11/25
* 拒绝处理
*/
public class UnavailableException extends Exception {
public UnavailableException() {
super("UnavailableException");
}
}
15 changes: 4 additions & 11 deletions src/main/java/com/qinsong/rpc/server/CacheResponse.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.qinsong.rpc.server;

import com.qinsong.rpc.common.exp.NotFoundException;
import com.qinsong.rpc.common.exp.UnavailableException;
import com.qinsong.rpc.common.serialize.Response;
import com.qinsong.rpc.common.serialize.ISerialize;

Expand All @@ -24,11 +26,11 @@ public CacheResponse(ISerialize iSerialize) {

private void init() {
Response nofound = new Response();
nofound.setException(new Exception("nofound"));
nofound.setException(new NotFoundException());
map.put("nofound", iSerialize.serialize(nofound));

Response unavailable = new Response();
unavailable.setException(new Exception("unavailable"));
unavailable.setException(new UnavailableException());
map.put("unavailable", iSerialize.serialize(unavailable));

Response empty = new Response();
Expand All @@ -47,13 +49,4 @@ public byte[] empty() {
return map.get("empty");
}

public byte[] error(Exception e) {
byte[] bytes = map.get(e.toString());
if (bytes == null) {
Response err = new Response();
err.setException(e);
map.put(e.toString(), bytes = iSerialize.serialize(err));
}
return bytes;
}
}
12 changes: 10 additions & 2 deletions src/main/java/com/qinsong/rpc/server/RpcServiceLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,17 @@ public byte[] onMessage(Async async, byte[] message) {
}
try {
return onHandle(message);
} catch (Exception e) {
} catch (Throwable e) {
e.printStackTrace();
return cacheResponse.error(e);
//获取业务抛出的异常
if (e instanceof InvocationTargetException && e.getCause() != null) e = e.getCause();

String msg = e.getMessage();
if (msg == null || msg.isEmpty()) msg = e.toString();
Response err = new Response();
//统一返回Exception,防止客户端没有这个错误类,序列化失败
err.setException(new Exception(msg));
return iSerialize.serialize(err);
}
}
});
Expand Down

0 comments on commit 6e95294

Please sign in to comment.