MY-RPC 是一个RPC框架,支持接入多种服务管理平台(目前接入Nacos)、多种序列化算法、多种负载均衡算法,使用 Java 原生 Socket 于 Natty 实现了两套网络传输模块。
- 接口设计合理,模块之间低耦合,可以灵活配置诸如序列化算法、负载均衡算法等。
- 实现通过配置文件设置参数,包括日志级别、序列化算法、负载均衡算法、IP地址等。
- 实现基于 Java 原生 Socket 和 Netty 两套网络传输方式。
- 使用 Netty 的 帧解码器(ProtocolFrameDecoder)配合协议长度字段,解决粘包半包问题。
- Netty 自定义编解码器和请求响应 handler 使用 Sharable 设计进行 handler 复用,避免不必要的实例化。
- Netty 客户端采用 Channel 池进行连接复用,避免重复连接同一服务器。
- Netty 采用心跳机制避免客户端假死占用服务端资源。
- 客户端接收 response 使用生产者消费者模型,配合 CompletableFuture 实现客户端同时多次 rpc 请求间不会相互阻塞。
- 解决 json 类序列化 Object 集合中类型信息丢失问题(通过接口内部类实现公共方法)。
- 可以方便的接入不同序列化算法,目前接入:
- Gson
- Hessian
- Jackson
- Kryo
- Java原生
- Protostuff (Protobuf)
- 可以方便的接入不同的负载均衡算法,目前实现:
- 随机算法
- 轮询算法
- 使用 Nacos 作为服务注册和发现平台。
- 服务端下线通过回调钩子自动向 Nacos 注销对应服务。
- 可通过注解自动扫描服务实现并注册。
- 实现单例工厂(用于),支持无参构造,有参构造和构造工厂三种方式。
- 项目注释完整,逻辑清晰。
- rpc-api: 通用接口包
- rpc-common: 消息实体对象、工具类等共用类
- rpc-core: rpc核心
- test-client: 测试用服务端
- test-server: 测试用客户端
+---------------+---------------+-----------------+-------------+
| Magic Number | Package Type | Serializer Type | Data Length |
| 4 bytes | 4 bytes | 4 bytes | 4 bytes |
+---------------+---------------+-----------------+-------------+
| Data Bytes |
| Length: ${Data Length} |
+---------------------------------------------------------------+
字段 | 解释 |
---|---|
Magic Number | 标识一个数据包是否为MFF协议 |
Package Type | 标识数据是一个请求还是一个响应 |
Serializer Type | 标识数据包内容的序列化方式 |
Data Length | 标识数据字段的字节数 |
Data Bytes | 传输对象,RpcRequestMessage或RpcResponseMessage对象 |
接口定义:
package top.mffseal.rpc.api;
/**
* 测试用某服务调用接口。
*
* @author mffseal
*/
public interface HelloService {
/**
* 测试服务
*
* @param object 测试用的调用参数
* @return 测试用的调用结果
*/
String hello(HelloObject object);
String bye(HelloObject object);
}
接口参数定义:
package top.mffseal.rpc.api;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
/**
* 测试调用接口中,客户端向服务端传递的参数对应的类。
*
* @author mffseal
*/
@Data
@AllArgsConstructor
public class HelloObject implements Serializable {
private Integer id;
private String message;
public HelloObject() {
}
}
package top.mffseal.test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.mffseal.rpc.api.HelloObject;
import top.mffseal.rpc.api.HelloService;
/**
* 服务端测试用服务实现。
*
* @author mffseal
*/
public class HelloServiceImpl implements HelloService {
private final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
@Override
public String hello(HelloObject object) {
logger.info("接收到: {}", object.getMessage());
return "hello 这是rpc调用的返回值, id=" + object.getId();
}
@Override
public String bye(HelloObject object) {
logger.info("接收到: {}", object.getMessage());
return "bye 这是rpc调用的返回值, id=" + object.getId();
}
}
Netty为例:
package top.mffseal.test;
import top.mffseal.rpc.api.HelloService;
import top.mffseal.rpc.transport.RpcServer;
import top.mffseal.rpc.transport.netty.server.NettyServer;
/**
* 测试用Netty服务端
*
* @author mffseal
*/
public class NettyTestServer {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
RpcServer rpcServer = new NettyServer();
rpcServer.publishService(helloService, HelloService.class);
rpcServer.start();
}
}
Netty为例:
package top.mffseal.test;
import top.mffseal.rpc.api.HelloObject;
import top.mffseal.rpc.api.HelloService;
import top.mffseal.rpc.transport.RpcClient;
import top.mffseal.rpc.transport.RpcClientProxy;
import top.mffseal.rpc.transport.netty.client.NettyClient;
/**
* 测试用Netty客户端。
*
* @author mffseal
*/
public class NettyTestClient {
public static void main(String[] args) {
RpcClient client = new NettyClient();
RpcClientProxy rpcClientProxy = new RpcClientProxy(client);
HelloService helloServer = rpcClientProxy.getProxy(HelloService.class);
HelloObject helloObject = new HelloObject(12, "this is a message");
// 轮询算法的效果要同一个客户端多次调用同一个接口才能看出
String res = helloServer.hello(helloObject);
String res2 = helloServer.hello(helloObject);
String res3 = helloServer.hello(helloObject);
System.out.println(res);
}
}
- 序列化算法选用Gson
- 负载均衡算法选择轮询算法
serializer.library=Gson
loadBalancer=RoundRobin
serializer.kryo.compress=true
netty.loglevel=INFO
netty.retry=5
netty.timeout=5000
namingServer.platform=Nacos
namingServer.host=Nacos服务器ip地址
namingServer.port=Nacos服务器端口
- 序列化算法选择Protostuff (Protobuf)
serializer.library=Protostuff
serializer.kryo.compress=true
host=localhost
port=8080
netty.loglevel=INFO
namingServer.platform=Nacos
namingServer.host=Nacos服务器ip地址
namingServer.port=Nacos服务器端口
- 启动前,请确保服务端和客户端均能连接到Nacos服务器,并且在服务端和客户端的配置文件中配置了Nacos服务器地址和端口。
- 分别启动服务端和客户端,观察控制台输出内容。
- 客户端通过动态代理,实际调用到RpcClientProxy的invoke方法。
- 该方法会调用NettyClient的sendRequest方法,并通过completableFuture.get()方法阻塞等待结果。
- NettyClient发送rpc请求,并向ResponseLocker存入一个CompletableFuture。
- 这样NettyClient就可以接着执行其它的invoke调用,不用等待response阻塞。
- 服务器收到请求后会调用方法,并返回一个RpcResponse。
- 客户端的ResponseHandler收到RpcResponse后会向ResponseLocker对应位置的CompletableFuture调用complete(填充response)。
- complete会唤醒RpcClientProxy调用的get处,并将结果传递过去。
通过注解扫描的方式自动注册服务:
- 扫描当前包下所有Class类。
- 遍历查找含有@Service注解的类对象。
- 检查@Service的name属性是否为空。
- 不为空则使用指定的name值作为所实现接口进行发布。
- 为空则通过反射获取该类所实现的接口,进行发布。
通过双重检查锁实现单例模式的创建,单例工厂类通过Map维护所有已创建的单例对象。
支持三种方式的创建:
- 无参构造
- 通过
clazz.newInstance()
实现。
- 通过
- 有参构造
- 通过
clazz.getDeclaredConstructor(paramTypes)
获取有参构造器。
- 通过
- 工厂方法
- 通过
Factory.getMethod(methodName, paramTypes)
获取工厂方法再 invoke。
- 通过
My-RPC-Framework is under the MIT license. See the LICENSE file for details.