Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
unknown committed Sep 22, 2013
0 parents commit 2ddbfd0
Show file tree
Hide file tree
Showing 26 changed files with 1,146 additions and 0 deletions.
Empty file added README.md
Empty file.
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taobao.danchen</groupId>
<artifactId>rpc-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rpc-demo</name>

<dependencies>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.7</version>
</dependency>

<dependency>
<groupId>hessian</groupId>
<artifactId>hessian</artifactId>
<version>3.2.1</version>
</dependency>

</dependencies>
</project>
82 changes: 82 additions & 0 deletions src/main/java/com/taobao/danchen/rpc/RPCInvocationHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.taobao.danchen.rpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.LinkedList;

import com.taobao.danchen.rpc.command.ServiceRequestCommand;
import com.taobao.danchen.rpc.mina.TMinaClient;
import com.taobao.danchen.rpc.mina.handler.ClientHandler;
import com.taobao.danchen.rpc.object.CachedTMinaClient;
import com.taobao.danchen.rpc.object.LocalRPCWaitCache;
import com.taobao.danchen.rpc.object.PackageData;
import com.taobao.danchen.rpc.object.RPCConsumerMetaData;
import com.taobao.danchen.rpc.object.RPCResult;
import com.taobao.danchen.rpc.serialize.HessianSerialUtil;

/**
*
* @author danchen
*
*/
public class RPCInvocationHandler implements InvocationHandler{

private RPCConsumerMetaData rpcMetaData;

public RPCInvocationHandler(RPCConsumerMetaData rpcMetaData) {
this.rpcMetaData = rpcMetaData;
}

public Object invoke(Object proxy, Method method, Object[] params)
throws Throwable {
//封装数据content
PackageData packageData = new PackageData();
packageData.setRpcMetaData(rpcMetaData);
packageData.setMethodName(method.getName());
LinkedList<String> listParameters = new LinkedList<String>();
Class<?>[] parameterTypes = method.getParameterTypes();
for(int i=0;i<parameterTypes.length;i++){
listParameters.add(parameterTypes[i].getName());
}
packageData.setListParameters(listParameters);
packageData.setListValues(params);

//结果
long id = LocalRPCWaitCache.getUuid();
RPCResult rpcResult = new RPCResult();
LocalRPCWaitCache.hashMap.put(id, rpcResult);

//封装数据传送命令
ServiceRequestCommand serviceRequestCommand = new ServiceRequestCommand();
serviceRequestCommand.setSerialType((short)1);
serviceRequestCommand.setMessageId(id);
serviceRequestCommand.setContent(HessianSerialUtil.serialize(packageData));

//数据传送
if(CachedTMinaClient.tMinaClientHashMap.get(rpcMetaData.getTargetHost())==null){
TMinaClient tMinaClient = new TMinaClient(rpcMetaData.getTargetHost());
tMinaClient.setHandler(new ClientHandler());
tMinaClient.init();
CachedTMinaClient.tMinaClientHashMap.put(rpcMetaData.getTargetHost(), tMinaClient);
//这里不等太快,强制等待1000ms,要不然后面写数据会报错
Thread.sleep(1000);
tMinaClient.writeData(serviceRequestCommand.encoder());
}else{
CachedTMinaClient.tMinaClientHashMap.get(rpcMetaData.getTargetHost()).writeData(serviceRequestCommand.encoder());
}


//同步等待结果
if(rpcResult.getValid()==0){
LocalRPCWaitCache.hashMap.remove(id);
if(rpcResult.isSuccess()){
return rpcResult.getData();
}else{
return new Exception(rpcResult.getException());
}
}

//无法等待到远程调用的结果返回
return null;
}
}
54 changes: 54 additions & 0 deletions src/main/java/com/taobao/danchen/rpc/TRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.taobao.danchen.rpc;

import java.lang.reflect.Proxy;

import com.taobao.danchen.rpc.mina.TMinaServer;
import com.taobao.danchen.rpc.mina.handler.ServerHandler;
import com.taobao.danchen.rpc.object.LocalServiceRepository;
import com.taobao.danchen.rpc.object.RPCConsumerMetaData;
import com.taobao.danchen.rpc.object.RPCProviderMetaData;

public class TRegistry {
/**
* 声明要远程调用的服务
* @param serviceInterface
* @return
*/
@SuppressWarnings("unchecked")
public <T> T lookup(Class<T> serviceInterface) {
String serviceName = serviceInterface.getName();
RPCConsumerMetaData rpcMetaData = new RPCConsumerMetaData();
rpcMetaData.setServiceName(serviceName);
rpcMetaData.setServiceVersion("1.0.0");
rpcMetaData.setTargetHost("127.0.0.1");
/**
* 后续服务方法调用处理的handle
*/
RPCInvocationHandler handler = new RPCInvocationHandler(rpcMetaData);
/**
* 返回一个服务代理
*/
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
new Class[] { serviceInterface }, handler);
}

/**
* 发布服务
* @param serviceInstance
*/
public void register(RPCProviderMetaData rpcMetaData) {
// 登记服务
String key = rpcMetaData.getServiceName()+rpcMetaData.getServiceVersion();
LocalServiceRepository.providerHashMap.put(key, rpcMetaData);

//这里服务可以发布到一个全局的配置服务器

TMinaServer tMinaServer = new TMinaServer();
tMinaServer.setHandler(new ServerHandler());
try {
tMinaServer.init();
} catch (Exception e) {
e.printStackTrace();
}
}
}
14 changes: 14 additions & 0 deletions src/main/java/com/taobao/danchen/rpc/command/IMessageCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.taobao.danchen.rpc.command;

public interface IMessageCommand {
/**
* ±àÂë
* @return
*/
public byte[] encoder();
/**
* ½âÂë
* @throws Exception
*/
public void decoder(byte[] data) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.taobao.danchen.rpc.command;

import java.nio.ByteBuffer;

public class ServiceRequestCommand implements IMessageCommand{

private short serialType;

private long messageId;

private int contentLength;

private byte[] content;


public short getSerialType() {
return serialType;
}
public void setSerialType(short serialType) {
this.serialType = serialType;
}
public int getContentLength() {
return contentLength;
}
public void setContentLength(int contentLength) {
this.contentLength = contentLength;
}

public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
this.contentLength = content.length;
}
public long getMessageId() {
return messageId;
}
public void setMessageId(long messageId) {
this.messageId = messageId;
}
public byte[] encoder() {
ByteBuffer bytebuffer = ByteBuffer.allocate(14+content.length);
bytebuffer.putShort(serialType);
bytebuffer.putLong(messageId);
bytebuffer.putInt(contentLength);
bytebuffer.put(content);
bytebuffer.flip();
return bytebuffer.array();
}
public void decoder(byte[] data) throws Exception {
ByteBuffer bytebuffer = ByteBuffer.allocate(data.length);
bytebuffer.put(data);
bytebuffer.flip();
this.serialType = bytebuffer.getShort();
this.messageId = bytebuffer.getLong();
this.contentLength = bytebuffer.getInt();
this.content = new byte[contentLength];
bytebuffer.get(content);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.taobao.danchen.rpc.command;

import java.nio.ByteBuffer;

/**
* 服务端返回的命令
* @author danchen
*
*/
public class ServiceResponseCommand implements IMessageCommand{
/**
* -1表示失败,0表示成功
*/
private short success;
/**
* 序列化类型
*/
private short serialType;
/**
* 请求的ID
*/
private long messageId;
/**
* 返回的数据长度
*/
private int contentLength;
/**
* 返回的数据内容
*/
private byte[] content;
/**
* 返回的对象类型
*/
private Class<?> returnType;

public short getSuccess() {
return success;
}

public void setSuccess(short success) {
this.success = success;
}

public short getSerialType() {
return serialType;
}

public void setSerialType(short serialType) {
this.serialType = serialType;
}

public long getMessageId() {
return messageId;
}

public void setMessageId(long messageId) {
this.messageId = messageId;
}

public int getContentLength() {
return contentLength;
}

public void setContentLength(int contentLength) {
this.contentLength = contentLength;
}

public byte[] getContent() {
return content;
}

public void setContent(byte[] content) {
this.content = content;
}



public Class<?> getReturnType() {
return returnType;
}

public void setReturnType(Class<?> returnType) {
this.returnType = returnType;
}

public byte[] encoder() {
ByteBuffer bytebuffer = ByteBuffer.allocate(16+content.length);
bytebuffer.putShort(success);
bytebuffer.putShort(serialType);
bytebuffer.putLong(messageId);
bytebuffer.putInt(contentLength);
bytebuffer.put(content);
bytebuffer.flip();
return bytebuffer.array();
}

public void decoder(byte[] data) throws Exception {
ByteBuffer bytebuffer = ByteBuffer.allocate(data.length);
bytebuffer.put(data);
bytebuffer.flip();
this.success = bytebuffer.getShort();
this.serialType = bytebuffer.getShort();
this.messageId = bytebuffer.getLong();
this.contentLength = bytebuffer.getInt();
this.content = new byte[contentLength];
bytebuffer.get(content);
}

}
Loading

0 comments on commit 2ddbfd0

Please sign in to comment.