Skip to content

Commit

Permalink
Reject if response do not match any request (#11882)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ committed Mar 25, 2023
1 parent 98b530f commit d1f0f0a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
* ExchangeCodec.
Expand Down Expand Up @@ -169,7 +171,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
}
} else {
data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, res, id));
}
res.setResult(data);
} else {
Expand Down Expand Up @@ -211,16 +213,21 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
}

protected Object getRequestData(long id) {
protected Object getRequestData(Channel channel, Response response, long id) {
DefaultFuture future = DefaultFuture.getFuture(id);
if (future == null) {
return null;
}
Request req = future.getRequest();
if (req == null) {
return null;
if (future != null) {
Request req = future.getRequest();
if (req != null) {
return req.getData();
}
}
return req.getData();

logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response status is " + response.getStatus() + ", response id is " + response.getId()
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
throw new IllegalArgumentException("Failed to find any request match the response, response id: " + id);
}

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Expand Down Expand Up @@ -429,7 +436,7 @@ protected Object decodeEventData(Channel channel, ObjectInput in, byte[] eventBy
try {
if (eventBytes != null) {
int dataLen = eventBytes.length;
int threshold = ConfigurationUtils.getSystemConfiguration().getInt("deserialization.event.size", 50);
int threshold = ConfigurationUtils.getSystemConfiguration().getInt("deserialization.event.size", 15);
if (dataLen > threshold) {
throw new IllegalArgumentException("Event data too long, actual size " + dataLen + ", threshold " + threshold + " rejected for security consideration.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -137,6 +139,8 @@ public void test_Decode_Error_MagicNum() throws IOException {

@Test
public void test_Decode_Error_Length() throws IOException {
DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null);

byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 0x02, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
byte[] request = getRequestBytes(person, header);
Expand All @@ -148,6 +152,8 @@ public void test_Decode_Error_Length() throws IOException {
Assertions.assertEquals(person, obj.getResult());
// only decode necessary bytes
Assertions.assertEquals(request.length, buffer.readerIndex());

future.cancel();
}

@Test
Expand Down Expand Up @@ -226,6 +232,7 @@ public void test_Decode_MigicCodec_Contain_ExchangeHeader() throws IOException {

@Test
public void test_Decode_Return_Response_Person() throws IOException {
DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null);
// 00000010-response/oneway/hearbeat=false/hessian |20-stats=ok|id=0|length=0
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 2, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
Expand All @@ -235,6 +242,8 @@ public void test_Decode_Return_Response_Person() throws IOException {
Assertions.assertEquals(20, obj.getStatus());
Assertions.assertEquals(person, obj.getResult());
System.out.println(obj);

future.cancel();
}

@Test //The status input has a problem, and the read information is wrong when the serialization is serialized.
Expand Down Expand Up @@ -324,6 +333,7 @@ public void test_Decode_Error_Request_Object() throws IOException {

@Test
public void test_Header_Response_NoSerializationFlag() throws IOException {
DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null);
// 00000010-response/oneway/hearbeat=false/noset |20-stats=ok|id=0|length=0
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte) 0x02, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
Expand All @@ -333,10 +343,13 @@ public void test_Header_Response_NoSerializationFlag() throws IOException {
Assertions.assertEquals(20, obj.getStatus());
Assertions.assertEquals(person, obj.getResult());
System.out.println(obj);

future.cancel();
}

@Test
public void test_Header_Response_Heartbeat() throws IOException {
DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null);
// 00000010-response/oneway/hearbeat=true |20-stats=ok|id=0|length=0
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 0x02, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
Expand All @@ -346,6 +359,8 @@ public void test_Header_Response_Heartbeat() throws IOException {
Assertions.assertEquals(20, obj.getStatus());
Assertions.assertEquals(person, obj.getResult());
System.out.println(obj);

future.cancel();
}

@Test
Expand All @@ -371,6 +386,7 @@ public void test_Encode_Request() throws IOException {

@Test
public void test_Encode_Response() throws IOException {
DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(1001), 100000, null);
ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024);
Channel channel = getCliendSideChannel(url);
Response response = new Response();
Expand All @@ -396,6 +412,7 @@ public void test_Encode_Response() throws IOException {
// encode response verson ??
// Assertions.assertEquals(response.getProtocolVersion(), obj.getVersion());

future.cancel();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
DecodeableRpcResult result;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
(Invocation) getRequestData(channel, res, id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
(Invocation) getRequestData(channel, res, id), proto);
}
data = result;
}
Expand Down

0 comments on commit d1f0f0a

Please sign in to comment.