Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #12687 - Buffer reusal in the BufferingResponseListener. #12782

Open
wants to merge 7 commits into
base: jetty-12.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.eclipse.jetty.client.Authentication;
import org.eclipse.jetty.client.AuthenticationStore;
import org.eclipse.jetty.client.BasicAuthentication;
import org.eclipse.jetty.client.BufferingResponseListener;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.Connection;
Expand All @@ -55,6 +54,7 @@
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.client.RetainingResponseListener;
import org.eclipse.jetty.client.RoundRobinConnectionPool;
import org.eclipse.jetty.client.Socks5;
import org.eclipse.jetty.client.Socks5Proxy;
Expand Down Expand Up @@ -460,15 +460,15 @@ public void futureResponseListener() throws Exception
// end::completableResponseListener[]
}

public void bufferingResponseListener() throws Exception
public void retainingResponseListener() throws Exception
{
HttpClient httpClient = new HttpClient();
httpClient.start();

// tag::bufferingResponseListener[]
// tag::retainingResponseListener[]
httpClient.newRequest("http://domain.com/path")
// Buffer response content up to 8 MiB
.send(new BufferingResponseListener(8 * 1024 * 1024)
// Accumulate response content up to 8 MiB.
.send(new RetainingResponseListener(8 * 1024 * 1024)
{
@Override
public void onComplete(Result result)
Expand All @@ -480,7 +480,7 @@ public void onComplete(Result result)
}
}
});
// end::bufferingResponseListener[]
// end::retainingResponseListener[]
}

public void inputStreamResponseListener() throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,14 +497,14 @@ include::code:example$src/main/java/org/eclipse/jetty/docs/programming/client/ht

If the response content length is exceeded, the response will be aborted, and an exception will be thrown by method `get(\...)`.

You can buffer the response content in memory also using the <<non-blocking,non-blocking APIs>>, via the `BufferingResponseListener` utility class:
You can accumulate the response content in memory also using the <<non-blocking,non-blocking APIs>>, via the `RetainingResponseListener` utility class:

[,java,indent=0]
----
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/client/http/HTTPClientDocs.java[tags=bufferingResponseListener]
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/client/http/HTTPClientDocs.java[tags=retainingResponseListener]
----

If you want to avoid buffering, you can wait for the response and then stream the content using the `InputStreamResponseListener` utility class:
If you want to avoid accumulation, you can wait for the response and then stream the content using the `InputStreamResponseListener` utility class:

[,java,indent=0]
----
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.client;

import java.util.Locale;

import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;

public abstract class AbstractResponseListener implements Response.Listener
{
private final int maxLength;
private String encoding;
private String mediaType;

public AbstractResponseListener()
{
this(2 * 1024 * 1024);
}

public AbstractResponseListener(int maxLength)
{
if (maxLength < 0)
throw new IllegalArgumentException("Invalid max length " + maxLength);
this.maxLength = maxLength;
}

public String getEncoding()
{
return encoding;
}

public int getMaxLength()
{
return maxLength;
}

public String getMediaType()
{
return mediaType;
}

@Override
public void onHeaders(Response response)
{
Request request = response.getRequest();
HttpFields headers = response.getHeaders();
long length = headers.getLongField(HttpHeader.CONTENT_LENGTH);
if (HttpMethod.HEAD.is(request.getMethod()))
length = 0;
if (length > maxLength)
{
response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded"));
return;
}

String contentType = headers.get(HttpHeader.CONTENT_TYPE);
if (contentType != null)
{
String media = contentType;

String charset = "charset=";
int index = contentType.toLowerCase(Locale.ENGLISH).indexOf(charset);
if (index > 0)
{
media = contentType.substring(0, index);
String encoding = contentType.substring(index + charset.length());
// Sometimes charsets arrive with an ending semicolon.
int semicolon = encoding.indexOf(';');
if (semicolon > 0)
encoding = encoding.substring(0, semicolon).trim();
// Sometimes charsets are quoted.
int lastIndex = encoding.length() - 1;
if (encoding.charAt(0) == '"' && encoding.charAt(lastIndex) == '"')
encoding = encoding.substring(1, lastIndex).trim();
this.encoding = encoding;
}

int semicolon = media.indexOf(';');
if (semicolon > 0)
media = media.substring(0, semicolon).trim();
this.mediaType = media;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ else if (m.group("token68") != null)
return headerInfos;
}

private class AuthenticationListener extends BufferingResponseListener
private class AuthenticationListener extends RetainingResponseListener
{
private AuthenticationListener()
{
Expand All @@ -125,6 +125,7 @@ private AuthenticationListener()
@Override
public void onSuccess(Response response)
{
super.onSuccess(response);
// The request may still be sending content, stop it.
Request request = response.getRequest();
if (request.getBody() != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Locale;

import org.eclipse.jetty.client.Response.Listener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.util.BufferUtil;

/**
Expand All @@ -32,13 +28,16 @@
* <p>The content may be retrieved from {@link #onSuccess(Response)} or {@link #onComplete(Result)}
* via {@link #getContent()} or {@link #getContentAsString()}.</p>
* <p>Instances of this class are not reusable, so one must be allocated for each request.</p>
* <p>The implementation is not very efficient, as it copies (possibly multiple times)
* the response content into a buffer.
* Use {@link RetainingResponseListener} for a more efficient implementation.</p>
*
* @deprecated use {@link RetainingResponseListener} instead
*/
public abstract class BufferingResponseListener implements Listener
@Deprecated(since = "12.1.0", forRemoval = true)
public abstract class BufferingResponseListener extends AbstractResponseListener
{
private final int maxLength;
private ByteBuffer buffer;
private String mediaType;
private String encoding;

/**
* Creates an instance with a default maximum length of 2 MiB.
Expand All @@ -55,52 +54,7 @@ public BufferingResponseListener()
*/
public BufferingResponseListener(int maxLength)
{
if (maxLength < 0)
throw new IllegalArgumentException("Invalid max length " + maxLength);
this.maxLength = maxLength;
}

@Override
public void onHeaders(Response response)
{
Request request = response.getRequest();
HttpFields headers = response.getHeaders();
long length = headers.getLongField(HttpHeader.CONTENT_LENGTH);
if (HttpMethod.HEAD.is(request.getMethod()))
length = 0;
if (length > maxLength)
{
response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded"));
return;
}

String contentType = headers.get(HttpHeader.CONTENT_TYPE);
if (contentType != null)
{
String media = contentType;

String charset = "charset=";
int index = contentType.toLowerCase(Locale.ENGLISH).indexOf(charset);
if (index > 0)
{
media = contentType.substring(0, index);
String encoding = contentType.substring(index + charset.length());
// Sometimes charsets arrive with an ending semicolon.
int semicolon = encoding.indexOf(';');
if (semicolon > 0)
encoding = encoding.substring(0, semicolon).trim();
// Sometimes charsets are quoted.
int lastIndex = encoding.length() - 1;
if (encoding.charAt(0) == '"' && encoding.charAt(lastIndex) == '"')
encoding = encoding.substring(1, lastIndex).trim();
this.encoding = encoding;
}

int semicolon = media.indexOf(';');
if (semicolon > 0)
media = media.substring(0, semicolon).trim();
this.mediaType = media;
}
super(maxLength);
}

@Override
Expand All @@ -112,6 +66,7 @@ public void onContent(Response response, ByteBuffer content)
if (length > BufferUtil.space(buffer))
{
int remaining = buffer == null ? 0 : buffer.remaining();
int maxLength = getMaxLength();
if (remaining + length > maxLength)
response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded"));
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
Expand All @@ -124,24 +79,14 @@ public void onContent(Response response, ByteBuffer content)
@Override
public abstract void onComplete(Result result);

public String getMediaType()
{
return mediaType;
}

public String getEncoding()
{
return encoding;
}

/**
* @return the content as bytes
* @see #getContentAsString()
*/
public byte[] getContent()
{
if (buffer == null)
return new byte[0];
return BufferUtil.EMPTY_BYTES;
return BufferUtil.toArray(buffer);
}

Expand All @@ -152,7 +97,7 @@ public byte[] getContent()
*/
public String getContentAsString()
{
String encoding = this.encoding;
String encoding = getEncoding();
if (encoding == null)
return getContentAsString(StandardCharsets.UTF_8);
return getContentAsString(encoding);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.eclipse.jetty.client.internal.HttpContentResponse;

/**
* <p>A {@link BufferingResponseListener} that sends a {@link Request}
* <p>A {@link RetainingResponseListener} that sends a {@link Request}
* and returns a {@link CompletableFuture} that is completed when
* {@link #onComplete(Result)} is called.</p>
* <p>Typical usage:</p>
Expand All @@ -34,7 +34,7 @@
* ContentResponse response = completable.get(5, TimeUnit.SECONDS);
* }</pre>
*/
public class CompletableResponseListener extends BufferingResponseListener
public class CompletableResponseListener extends RetainingResponseListener
{
private final CompletableFuture<ContentResponse> completable = new CompletableFuture<>();
private final Request request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ protected Runnable onContinue(Request request)
return null;
}

protected class ContinueListener extends BufferingResponseListener
protected class ContinueListener extends RetainingResponseListener
{
@Override
public void onSuccess(Response response)
{
super.onSuccess(response);

// Handling of success must be done here and not from onComplete(),
// since the onComplete() is not invoked because the request is not completed yet.

Expand Down Expand Up @@ -104,6 +106,8 @@ public void onSuccess(Response response)
@Override
public void onFailure(Response response, Throwable failure)
{
super.onFailure(response, failure);

HttpConversation conversation = ((HttpRequest)response.getRequest()).getConversation();
// Mark the 100 Continue response as handled
conversation.setAttribute(ATTRIBUTE, Boolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ protected void onEarlyHints(Request request, HttpFields responseHeaders)
{
}

private class EarlyHintsListener extends BufferingResponseListener
private class EarlyHintsListener extends RetainingResponseListener
{
@Override
public void onSuccess(Response response)
{
super.onSuccess(response);

Request request = response.getRequest();
HttpConversation conversation = ((HttpRequest)request).getConversation();

Expand All @@ -72,6 +74,8 @@ public void onSuccess(Response response)
@Override
public void onFailure(Response response, Throwable failure)
{
super.onFailure(response, failure);

HttpConversation conversation = ((HttpRequest)response.getRequest()).getConversation();
// Reset the conversation listeners to allow the conversation to be completed.
conversation.updateResponseListeners(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ protected void doStart() throws Exception
if (decoderFactories.isEmpty())
{
TypeUtil.serviceStream(ServiceLoader.load(Compression.class))
.peek(c -> c.setByteBufferPool(getByteBufferPool()))
.forEach(c -> decoderFactories.put(new CompressionContentDecoderFactory(c)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Result redirect(Request request, Response response) throws InterruptedExc
{
AtomicReference<Result> resultRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Request redirect = redirect(request, response, new BufferingResponseListener()
Request redirect = redirect(request, response, new RetainingResponseListener()
{
@Override
public void onComplete(Result result)
Expand Down
Loading