Skip to content

Commit

Permalink
Made thread pool optional and configurable (#898)
Browse files Browse the repository at this point in the history
Made thread pool optional and configurable

Co-Authored-By: lukasniemeier-zalando <[email protected]>
  • Loading branch information
Willi Schönborn and lukasniemeier-zalando authored Feb 12, 2020
1 parent 4fbb181 commit 6905531
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 39 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ In order to configure the thread pool correctly, please refer to

### Non-blocking IO

Riptide has the notion of an *executor* and a *request factory*. There are two different kinds of request factories:
Riptide supports two different kinds of request factories:

**[`ClientHttpRequestFactory`](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/http/client/ClientHttpRequestFactory.html)**

Expand All @@ -244,12 +244,12 @@ The following implementations offer non-blocking IO:
- [`Netty4ClientHttpRequestFactory`](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/http/client/Netty4ClientHttpRequestFactory.html), using [Netty](https://netty.io/)
- [`HttpComponentsAsyncClientHttpRequestFactory`](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/http/client/HttpComponentsAsyncClientHttpRequestFactory.html), using [Apache HTTP Async Client](https://hc.apache.org/httpcomponents-asyncclient-4.1.x/index.html)

Non-blocking IO is asynchronous by nature. In order to provide asynchrony for blocking IO you need to register an executor. For synchronous operations it's possible to just pass `Runnable::run`.
Non-blocking IO is asynchronous by nature. In order to provide asynchrony for blocking IO you need to register an executor. Not passing an executor will make all network communication synchronous, i.e. all futures returned by Riptide will already be completed.

| | Synchronous | Asynchronous |
|-----------------|----------------------------------------------|---------------------------------------------------|
| Blocking IO | `Runnable::run` + `ClientHttpRequestFactory` | `Executor` + `ClientHttpRequestFactory` |
| Non-blocking IO | n/a | `AsyncClientHttpRequestFactory` |
| | Synchronous | Asynchronous |
|-----------------|----------------------------|-----------------------------------------|
| Blocking IO | `ClientHttpRequestFactory` | `Executor` + `ClientHttpRequestFactory` |
| Non-blocking IO | n/a | `AsyncClientHttpRequestFactory` |

## Usage

Expand Down Expand Up @@ -371,7 +371,7 @@ We basically use an intermediate `RestTemplate` as a holder of the special `Clie
`MockRestServiceServer` manages.
If you are using the [Spring Boot Starter](riptide-spring-boot-starter) the test setup is provided by a convenient annotation `@RiptideClientTest`,
see [here](riptide-spring-boot-starter/README.md#testing).
see [here](riptide-spring-boot-starter#testing).
## Getting help
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableList;
import lombok.AllArgsConstructor;
import lombok.With;
import org.organicdesign.fp.collections.ImList;
import org.organicdesign.fp.collections.PersistentVector;
import org.springframework.http.client.AsyncClientHttpRequestFactory;
Expand All @@ -24,9 +25,11 @@

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static lombok.AccessLevel.PRIVATE;
import static org.zalando.riptide.Plugin.composite;

@AllArgsConstructor
@With(PRIVATE)
@AllArgsConstructor(access = PRIVATE)
final class DefaultHttpBuilder implements ExecutorStage, RequestFactoryStage, ConfigurationStage, FinalStage {

private static class Converters {
Expand Down Expand Up @@ -67,16 +70,13 @@ public RequestFactoryStage executor(final Executor executor) {

@Override
public ConfigurationStage requestFactory(final ClientHttpRequestFactory factory) {
final IO io = new BlockingIO(factory);
return new DefaultHttpBuilder(executor, io, converters, baseUrl, resolution, plugins);
return withIo(new BlockingIO(factory));
}

@Override
@SuppressWarnings("deprecation")
public ConfigurationStage asyncRequestFactory(final AsyncClientHttpRequestFactory factory) {
final Executor executor = Runnable::run;
final IO io = new NonBlockingIO(factory);
return new DefaultHttpBuilder(executor, io, converters, baseUrl, resolution, plugins);
return withIo(new NonBlockingIO(factory));
}

@Override
Expand All @@ -86,12 +86,12 @@ public ConfigurationStage defaultConverters() {

@Override
public ConfigurationStage converters(@Nonnull final Iterable<HttpMessageConverter<?>> converters) {
return new DefaultHttpBuilder(executor, io, this.converters.concat(converters), baseUrl, resolution, plugins);
return withConverters(this.converters.concat(converters));
}

@Override
public ConfigurationStage converter(final HttpMessageConverter<?> converter) {
return new DefaultHttpBuilder(executor, io, converters.append(converter), baseUrl, resolution, plugins);
return withConverters(converters.append(converter));
}

@Override
Expand All @@ -107,8 +107,7 @@ public ConfigurationStage baseUrl(@Nullable final URI baseUrl) {

@Override
public ConfigurationStage baseUrl(final Supplier<URI> baseUrl) {
return new DefaultHttpBuilder(executor, io, converters,
() -> checkAbsoluteBaseUrl(baseUrl.get()), resolution, plugins);
return withBaseUrl(() -> checkAbsoluteBaseUrl(baseUrl.get()));
}

private URI checkAbsoluteBaseUrl(@Nullable final URI baseUrl) {
Expand All @@ -118,8 +117,7 @@ private URI checkAbsoluteBaseUrl(@Nullable final URI baseUrl) {

@Override
public ConfigurationStage urlResolution(@Nullable final UrlResolution resolution) {
return new DefaultHttpBuilder(executor, io, converters, baseUrl,
firstNonNull(resolution, DEFAULT_RESOLUTION), plugins);
return withResolution(firstNonNull(resolution, DEFAULT_RESOLUTION));
}

@Override
Expand All @@ -129,22 +127,23 @@ public ConfigurationStage defaultPlugins() {

@Override
public ConfigurationStage plugins(final Iterable<Plugin> plugins) {
return new DefaultHttpBuilder(executor, io, converters, baseUrl, resolution,
this.plugins.concat(plugins));
return withPlugins(this.plugins.concat(plugins));
}

@Override
public ConfigurationStage plugin(final Plugin plugin) {
return new DefaultHttpBuilder(executor, io, converters, baseUrl, resolution,
plugins.append(plugin));
return withPlugins(plugins.append(plugin));
}

@Override
public Http build() {
final List<HttpMessageConverter<?>> converters = converters();

final List<Plugin> plugins = new ArrayList<>();
plugins.add(new AsyncPlugin(executor));

if (executor != null) {
plugins.add(new AsyncPlugin(executor));
}

plugins.add(new DispatchPlugin(new DefaultMessageReader(converters)));
plugins.add(new SerializationPlugin(new DefaultMessageWriter(converters)));
plugins.addAll(plugins());
Expand Down
2 changes: 1 addition & 1 deletion riptide-core/src/main/java/org/zalando/riptide/Http.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ static ExecutorStage builder() {
return new DefaultHttpBuilder();
}

interface ExecutorStage {
interface ExecutorStage extends RequestFactoryStage {
RequestFactoryStage executor(Executor executor);
ConfigurationStage asyncRequestFactory(AsyncClientHttpRequestFactory asyncRequestFactory);
}
Expand Down
6 changes: 3 additions & 3 deletions riptide-core/src/main/java/org/zalando/riptide/Requester.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ public HeaderStage contentType(final MediaType contentType) {

@Override
public HeaderStage ifModifiedSince(final OffsetDateTime since) {
return header(IF_MODIFIED_SINCE, toHttpdate(since));
return header(IF_MODIFIED_SINCE, toHttpDate(since));
}

@Override
public HeaderStage ifUnmodifiedSince(final OffsetDateTime since) {
return header(IF_UNMODIFIED_SINCE, toHttpdate(since));
return header(IF_UNMODIFIED_SINCE, toHttpDate(since));
}

private String toHttpdate(final OffsetDateTime dateTime) {
private String toHttpDate(final OffsetDateTime dateTime) {
return RFC_1123_DATE_TIME.format(dateTime.atZoneSameInstant(UTC));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ void shouldFailOnNonAbsoluteBaseUri() {
@Test
void shouldFailOnProvisioningOfNonAbsoluteBaseUri() {
final Http unit = Http.builder()
.executor(Runnable::run)
.requestFactory(new SimpleClientHttpRequestFactory())
.baseUrl(() -> URI.create(""))
.build();
Expand Down
2 changes: 2 additions & 0 deletions riptide-spring-boot-autoconfigure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ For a complete overview of available properties, they type and default value ple
| `│   ├── stack-trace-preservation` | | |
| `│   │   └── enabled` | `boolean` | `true` |
| `│   ├── threads` | | |
| `│   │   ├── enabled` | `boolean` | `true` |
| `│   │   ├── min-size` | `int` | `1` |
| `│   │   ├── max-size` | `int` | same as `connections.max-total` |
| `│   │   ├── keep-alive` | `TimeSpan` | `1 minute` |
Expand Down Expand Up @@ -480,6 +481,7 @@ For a complete overview of available properties, they type and default value ple
| `        ├── stack-trace-preservation` | | |
| `        │   └── enabled` | `boolean` | see `defaults` |
| `        ├── threads` | | |
| `        │   ├── enabled` | `boolean` | see `defaults` |
| `        │   ├── min-size` | `int` | see `defaults` |
| `        │   ├── max-size` | `int` | see `defaults` |
| `        │   ├── keep-alive` | `TimeSpan` | see `defaults` |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import static org.zalando.riptide.autoconfigure.RiptideProperties.Chaos.ErrorResponses;
import static org.zalando.riptide.autoconfigure.RiptideProperties.Chaos.Exceptions;
import static org.zalando.riptide.autoconfigure.RiptideProperties.Chaos.Latency;
import static org.zalando.riptide.autoconfigure.RiptideProperties.Threads;
import static org.zalando.riptide.autoconfigure.ValueConstants.LOGBOOK_REF;
import static org.zalando.riptide.autoconfigure.ValueConstants.METER_REGISTRY_REF;
import static org.zalando.riptide.autoconfigure.ValueConstants.TRACER_REF;
Expand All @@ -110,7 +109,7 @@ private String registerHttp(final String id, final Client client) {

return genericBeanDefinition(HttpFactory.class)
.setFactoryMethod("create")
.addConstructorArgReference(registerExecutor(id, client))
.addConstructorArgValue(createExecutor(id, client))
.addConstructorArgReference(registerClientHttpRequestFactory(id, client))
.addConstructorArgValue(client.getBaseUrl())
.addConstructorArgValue(client.getUrlResolution())
Expand Down Expand Up @@ -140,9 +139,15 @@ private String registerClientHttpRequestFactory(final String id, final Client cl
});
}

private String registerExecutor(final String id, final Client client) {
// TODO support maxSize = 0 => direct executor?
private Object createExecutor(final String id, final Client client) {
if (client.getThreads().getEnabled()) {
return ref(registerExecutor(id, client));
}

return null;
}

private String registerExecutor(final String id, final Client client) {
final String executorId = registry.registerIfAbsent(id, ExecutorService.class, () ->
genericBeanDefinition(ThreadPoolFactory.class)
.addConstructorArgValue(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private static Connections merge(final Connections base, final Connections defau

private static Threads merge(final Threads base, final Threads defaults) {
return new Threads(
either(base.getEnabled(), defaults.getEnabled()),
either(base.getMinSize(), defaults.getMinSize()),
either(base.getMaxSize(), defaults.getMaxSize()),
either(base.getKeepAlive(), defaults.getKeepAlive()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;
import org.zalando.riptide.Http;
import org.zalando.riptide.Http.RequestFactoryStage;
import org.zalando.riptide.Plugin;
import org.zalando.riptide.UrlResolution;

import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.Executor;

Expand All @@ -17,15 +19,15 @@ private HttpFactory() {
}

public static Http create(
final Executor executor,
@Nullable final Executor executor,
final ClientHttpRequestFactory requestFactory,
final String baseUrl,
final UrlResolution urlResolution,
final List<HttpMessageConverter<?>> converters,
final List<Plugin> plugins) {

return Http.builder()
.executor(executor)

return configure(executor)
.requestFactory(requestFactory)
.baseUrl(baseUrl)
.urlResolution(urlResolution)
Expand All @@ -34,4 +36,15 @@ public static Http create(
.build();
}

private static RequestFactoryStage configure(
@Nullable final Executor executor) {

if (executor == null) {
return Http.builder();
}

return Http.builder()
.executor(executor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static final class Defaults {
);

@NestedConfigurationProperty
private Threads threads = new Threads(1, null, TimeSpan.of(1, MINUTES), 0);
private Threads threads = new Threads(true, 1, null, TimeSpan.of(1, MINUTES), 0);

@NestedConfigurationProperty
private Auth auth = new Auth(false, Paths.get("/meta/credentials"));
Expand Down Expand Up @@ -214,6 +214,7 @@ public static final class Connections {
@NoArgsConstructor
@AllArgsConstructor
public static final class Threads {
private Boolean enabled;
private Integer minSize;
private Integer maxSize;
private TimeSpan keepAlive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void shouldOverrideUnprovidedDefaultThreadsMaxSizeToConnectionsMaxTotal() {
void shouldNotOverwriteProvidedDefaultThreadsMaxSizeWithConnectionsMaxTotal() {
final RiptideProperties properties = new RiptideProperties();
final Defaults defaults = new Defaults();
defaults.setThreads(new Threads(null, 10, null, null));
defaults.setThreads(new Threads(true, null, 10, null, null));
properties.setDefaults(defaults);
final RiptideProperties actual = Defaulting.withDefaults(properties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ riptide:
ecb:
base-url: http://www.ecb.europa.eu
request-compression.enabled: true
threads:
enabled: false
backup-request:
enabled: true
delay: 100 milliseconds
Expand Down

0 comments on commit 6905531

Please sign in to comment.