From 13dbb2a62f87e2649637362550a1e091f64645c2 Mon Sep 17 00:00:00 2001 From: chrisgresty Date: Thu, 14 May 2020 16:36:10 +0100 Subject: [PATCH] Store target origin in request context (#659) Origin ID stored under key "styx.originid" as type com.hotels.styx.api.Id --- .../hotels/styx/client/OriginsInventory.java | 2 +- .../styx/client/StyxHostHttpClient.java | 10 +++- .../client/StyxBackendServiceClientTest.java | 33 +++++++------ .../styx/client/StyxHostHttpClientTest.java | 49 ++++++++++++++++--- .../styx/routing/handlers/HostProxy.java | 2 +- .../StyxBackendServiceClientFactoryTest.java | 3 +- .../styx/routing/handlers/HostProxyTest.kt | 11 +++-- .../styx/plugins/OnCompleteErrorPlugin.java | 2 +- 8 files changed, 78 insertions(+), 34 deletions(-) diff --git a/components/client/src/main/java/com/hotels/styx/client/OriginsInventory.java b/components/client/src/main/java/com/hotels/styx/client/OriginsInventory.java index 6a73ae84ec..31e0632828 100644 --- a/components/client/src/main/java/com/hotels/styx/client/OriginsInventory.java +++ b/components/client/src/main/java/com/hotels/styx/client/OriginsInventory.java @@ -397,7 +397,7 @@ private Collection pools(OriginState state) { return origins.values().stream() .filter(origin -> origin.state().equals(state)) .map(origin -> { - HttpHandler hostClient = (request, context) -> new Eventual<>(origin.hostClient.sendRequest(request)); + HttpHandler hostClient = (request, context) -> new Eventual<>(origin.hostClient.sendRequest(request, context)); return remoteHost(origin.origin, hostClient, origin.hostClient); }) .collect(toList()); diff --git a/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java b/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java index 857909879e..22176d1ade 100644 --- a/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java +++ b/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java @@ -1,5 +1,5 @@ /* - Copyright (C) 2013-2019 Expedia Inc. + Copyright (C) 2013-2020 Expedia Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ package com.hotels.styx.client; +import com.hotels.styx.api.HttpInterceptor.Context; import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; import com.hotels.styx.api.ResponseEventListener; @@ -30,6 +31,8 @@ * A Styx HTTP Client for proxying to an individual origin host. */ public class StyxHostHttpClient implements LoadBalancingMetricSupplier { + public static final String ORIGINID_CONTEXT_KEY = "styx.originid"; + private final ConnectionPool pool; StyxHostHttpClient(ConnectionPool pool) { @@ -40,7 +43,10 @@ public static StyxHostHttpClient create(ConnectionPool pool) { return new StyxHostHttpClient(pool); } - public Publisher sendRequest(LiveHttpRequest request) { + public Publisher sendRequest(LiveHttpRequest request, Context context) { + if (context != null) { + context.add(ORIGINID_CONTEXT_KEY, pool.getOrigin().id()); + } return Flux.from(pool.borrowConnection()) .flatMap(connection -> { diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/StyxBackendServiceClientTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/StyxBackendServiceClientTest.java index 3a469abe84..9fa521406b 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/StyxBackendServiceClientTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/StyxBackendServiceClientTest.java @@ -18,6 +18,7 @@ import com.google.common.net.HostAndPort; import com.hotels.styx.api.Eventual; import com.hotels.styx.api.HttpHandler; +import com.hotels.styx.api.HttpInterceptor.Context; import com.hotels.styx.api.Id; import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; @@ -112,7 +113,7 @@ public void sendsRequestToHostChosenByLoadBalancer() { LiveHttpResponse response = Mono.from(styxHttpClient.sendRequest(SOME_REQ, requestContext())).block(); assertThat(response.status(), is(OK)); - verify(hostClient).sendRequest(eq(SOME_REQ)); + verify(hostClient).sendRequest(eq(SOME_REQ), any(Context.class)); } @Test @@ -188,8 +189,8 @@ public void retriesWhenRetryPolicyTellsToRetry() { assertThat(response.status(), is(OK)); InOrder ordered = inOrder(firstClient, secondClient); - ordered.verify(firstClient).sendRequest(eq(SOME_REQ)); - ordered.verify(secondClient).sendRequest(eq(SOME_REQ)); + ordered.verify(firstClient).sendRequest(eq(SOME_REQ), any(Context.class)); + ordered.verify(secondClient).sendRequest(eq(SOME_REQ), any(Context.class)); } @Test @@ -213,9 +214,9 @@ public void stopsRetriesWhenRetryPolicyTellsToStop() { .verifyError(OriginUnreachableException.class); InOrder ordered = inOrder(firstClient, secondClient, thirdClient); - ordered.verify(firstClient).sendRequest(eq(SOME_REQ)); - ordered.verify(secondClient).sendRequest(eq(SOME_REQ)); - ordered.verify(thirdClient, never()).sendRequest(eq(SOME_REQ)); + ordered.verify(firstClient).sendRequest(eq(SOME_REQ), any(Context.class)); + ordered.verify(secondClient).sendRequest(eq(SOME_REQ), any(Context.class)); + ordered.verify(thirdClient, never()).sendRequest(eq(SOME_REQ), any(Context.class)); } @Test @@ -242,10 +243,10 @@ public void retriesAtMost3Times() { .verifyError(NoAvailableHostsException.class); InOrder ordered = inOrder(firstClient, secondClient, thirdClient, fourthClient); - ordered.verify(firstClient).sendRequest(eq(SOME_REQ)); - ordered.verify(secondClient).sendRequest(eq(SOME_REQ)); - ordered.verify(thirdClient).sendRequest(eq(SOME_REQ)); - ordered.verify(fourthClient, never()).sendRequest(any(LiveHttpRequest.class)); + ordered.verify(firstClient).sendRequest(eq(SOME_REQ), any(Context.class)); + ordered.verify(secondClient).sendRequest(eq(SOME_REQ), any(Context.class)); + ordered.verify(thirdClient).sendRequest(eq(SOME_REQ), any(Context.class)); + ordered.verify(fourthClient, never()).sendRequest(any(LiveHttpRequest.class), any(Context.class)); } @@ -262,7 +263,7 @@ public void incrementsResponseStatusMetricsForBadResponse() { LiveHttpResponse response = Mono.from(styxHttpClient.sendRequest(SOME_REQ, requestContext())).block(); assertThat(response.status(), is(BAD_REQUEST)); - verify(hostClient).sendRequest(eq(SOME_REQ)); + verify(hostClient).sendRequest(eq(SOME_REQ), any(Context.class)); assertThat(metricRegistry.counter("origins.response.status.400").getCount(), is(1L)); } @@ -280,7 +281,7 @@ public void incrementsResponseStatusMetricsFor401() { LiveHttpResponse response = Mono.from(styxHttpClient.sendRequest(SOME_REQ, requestContext())).block(); assertThat(response.status(), is(UNAUTHORIZED)); - verify(hostClient).sendRequest(eq(SOME_REQ)); + verify(hostClient).sendRequest(eq(SOME_REQ), any(Context.class)); assertThat(metricRegistry.counter("origins.response.status.401").getCount(), is(1L)); } @@ -298,7 +299,7 @@ public void incrementsResponseStatusMetricsFor500() { LiveHttpResponse response = Mono.from(styxHttpClient.sendRequest(SOME_REQ, requestContext())).block(); assertThat(response.status(), is(INTERNAL_SERVER_ERROR)); - verify(hostClient).sendRequest(eq(SOME_REQ)); + verify(hostClient).sendRequest(eq(SOME_REQ), any(Context.class)); assertThat(metricRegistry.counter("origins.response.status.500").getCount(), is(1L)); } @@ -315,7 +316,7 @@ public void incrementsResponseStatusMetricsFor501() { LiveHttpResponse response = Mono.from(styxHttpClient.sendRequest(SOME_REQ, requestContext())).block(); assertThat(response.status(), is(NOT_IMPLEMENTED)); - verify(hostClient).sendRequest(SOME_REQ); + verify(hostClient).sendRequest(eq(SOME_REQ), any(Context.class)); assertThat(metricRegistry.counter("origins.response.status.501").getCount(), is(1L)); } @@ -452,7 +453,7 @@ public void prefersRestrictedOriginsOverStickyOriginsWhenBothAreConfigured() { } private HttpHandler toHandler(StyxHostHttpClient hostClient) { - return (request, ctx) -> new Eventual<>(hostClient.sendRequest(request)); + return (request, ctx) -> new Eventual<>(hostClient.sendRequest(request, ctx)); } private RetryPolicy mockRetryPolicy(Boolean first, Boolean... outcomes) { @@ -485,7 +486,7 @@ private LoadBalancer mockLoadBalancer(Optional first, Optional responsePublisher) { StyxHostHttpClient secondClient = mock(StyxHostHttpClient.class); - when(secondClient.sendRequest(any(LiveHttpRequest.class))).thenReturn(responsePublisher); + when(secondClient.sendRequest(any(LiveHttpRequest.class), any(Context.class))).thenReturn(responsePublisher); return secondClient; } diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java index 21e9c9fc83..d7d72ebcff 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java @@ -17,11 +17,17 @@ import com.hotels.styx.api.Buffer; import com.hotels.styx.api.ByteStream; +import com.hotels.styx.api.HttpInterceptor; +import com.hotels.styx.api.HttpInterceptor.Context; import com.hotels.styx.api.HttpRequest; import com.hotels.styx.api.HttpResponse; +import com.hotels.styx.api.Id; import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; +import com.hotels.styx.api.extension.Origin; import com.hotels.styx.client.connectionpool.ConnectionPool; +import com.hotels.styx.server.HttpInterceptorContext; +import com.hotels.styx.support.Support; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; @@ -34,6 +40,8 @@ import java.util.concurrent.atomic.AtomicReference; import static com.hotels.styx.api.HttpResponseStatus.OK; +import static com.hotels.styx.client.StyxHostHttpClient.ORIGINID_CONTEXT_KEY; +import static com.hotels.styx.support.Support.requestContext; import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -62,10 +70,11 @@ public void setUp() { public void returnsConnectionBackToPool() { Connection connection = mockConnection(just(response)); ConnectionPool pool = mockPool(connection); + Context context = mockContext(); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); - StepVerifier.create(hostClient.sendRequest(request)) + StepVerifier.create(hostClient.sendRequest(request, context)) .consumeNextWith(response -> response.consume()) .expectComplete() .verify(); @@ -73,6 +82,7 @@ public void returnsConnectionBackToPool() { verify(pool).borrowConnection(); verify(connection).write(any(LiveHttpRequest.class)); verify(pool).returnConnection(any(Connection.class)); + verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin")); } @Test @@ -82,13 +92,14 @@ public void ignoresCancelledHeaders() { // been published. Connection connection = mockConnection(just(response)); ConnectionPool pool = mockPool(connection); + Context context = mockContext(); AtomicReference transformedResponse = new AtomicReference<>(); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); // The StepVerifier consumes the response event and then unsubscribes // from the response observable. - StepVerifier.create(hostClient.sendRequest(request)) + StepVerifier.create(hostClient.sendRequest(request, context)) .consumeNextWith(transformedResponse::set) .verifyComplete(); @@ -101,17 +112,20 @@ public void ignoresCancelledHeaders() { // Finally, the connection is returned after the response body is fully consumed: verify(pool).returnConnection(any(Connection.class)); + + verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin")); } @Test public void releasesIfRequestIsCancelledBeforeHeaders() { Connection connection = mockConnection(EmitterProcessor.create()); ConnectionPool pool = mockPool(connection); + Context context = mockContext(); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); AtomicReference subscription = new AtomicReference<>(); - Flux.from(hostClient.sendRequest(request)) + Flux.from(hostClient.sendRequest(request, context)) .subscribe(new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription s) { @@ -123,17 +137,19 @@ protected void hookOnSubscribe(Subscription s) { subscription.get().cancel(); verify(pool).closeConnection(any(Connection.class)); + verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin")); } @Test public void ignoresResponseObservableErrorsAfterHeaders() { Connection connection = mockConnection(responseProvider); ConnectionPool pool = mockPool(connection); + Context context = mockContext(); AtomicReference newResponse = new AtomicReference<>(); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); - StepVerifier.create(hostClient.sendRequest(request)) + StepVerifier.create(hostClient.sendRequest(request, context)) .then(() -> { responseProvider.onNext(response); responseProvider.onError(new RuntimeException("oh dear ...")); @@ -145,6 +161,7 @@ public void ignoresResponseObservableErrorsAfterHeaders() { newResponse.get().consume(); verify(pool).returnConnection(any(Connection.class)); + verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin")); } @Test @@ -152,31 +169,35 @@ public void terminatesConnectionWhenResponseObservableCompletesWithoutHeaders() // A connection that yields no response: Connection connection = mockConnection(Flux.empty()); ConnectionPool pool = mockPool(connection); + Context context = mockContext(); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); - StepVerifier.create(hostClient.sendRequest(request)) + StepVerifier.create(hostClient.sendRequest(request, context)) .expectNextCount(0) .expectComplete() .log() .verify(); verify(pool).closeConnection(any(Connection.class)); + verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin")); } @Test public void releasesConnectionWhenResponseFailsBeforeHeaders() { Connection connection = mockConnection(Flux.error(new RuntimeException())); ConnectionPool pool = mockPool(connection); + Context context = mockContext(); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); - StepVerifier.create(hostClient.sendRequest(request)) + StepVerifier.create(hostClient.sendRequest(request, context)) .expectNextCount(0) .expectError() .verify(); verify(pool).closeConnection(any(Connection.class)); + verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin")); } @Test @@ -184,11 +205,12 @@ public void terminatesConnectionDueToUnsubscribedBody() { TestPublisher testPublisher = TestPublisher.create(); Connection connection = mockConnection(just(LiveHttpResponse.response(OK).body(new ByteStream(testPublisher)).build())); ConnectionPool pool = mockPool(connection); + Context context = mockContext(); AtomicReference receivedResponse = new AtomicReference<>(); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); - StepVerifier.create(hostClient.sendRequest(request)) + StepVerifier.create(hostClient.sendRequest(request, context)) .consumeNextWith(receivedResponse::set) .expectComplete() .verify(); @@ -198,6 +220,7 @@ public void terminatesConnectionDueToUnsubscribedBody() { .verify(); verify(pool).closeConnection(any(Connection.class)); + verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin")); } @Test @@ -219,6 +242,18 @@ Connection mockConnection(Flux responseObservable) { ConnectionPool mockPool(Connection connection) { ConnectionPool pool = mock(ConnectionPool.class); when(pool.borrowConnection()).thenReturn(Flux.just(connection)); + Origin origin = mockOrigin("mockorigin"); + when(pool.getOrigin()).thenReturn(origin); return pool; } + + Origin mockOrigin(String id) { + Origin origin = mock(Origin.class); + when(origin.id()).thenReturn(Id.id(id)); + return origin; + } + + HttpInterceptor.Context mockContext() { + return mock(HttpInterceptor.Context.class); + } } diff --git a/components/proxy/src/main/java/com/hotels/styx/routing/handlers/HostProxy.java b/components/proxy/src/main/java/com/hotels/styx/routing/handlers/HostProxy.java index e348bb1f7d..31f68d9b53 100644 --- a/components/proxy/src/main/java/com/hotels/styx/routing/handlers/HostProxy.java +++ b/components/proxy/src/main/java/com/hotels/styx/routing/handlers/HostProxy.java @@ -134,7 +134,7 @@ public HostProxy(String host, int port, StyxHostHttpClient client, OriginMetrics public Eventual handle(LiveHttpRequest request, HttpInterceptor.Context context) { if (active) { return new Eventual<>( - ResponseEventListener.from(client.sendRequest(request)) + ResponseEventListener.from(client.sendRequest(request, context)) .whenCancelled(originMetrics::requestCancelled) .apply()); } else { diff --git a/components/proxy/src/test/java/com/hotels/styx/proxy/StyxBackendServiceClientFactoryTest.java b/components/proxy/src/test/java/com/hotels/styx/proxy/StyxBackendServiceClientFactoryTest.java index ed0c600f2f..0cac71d79b 100644 --- a/components/proxy/src/test/java/com/hotels/styx/proxy/StyxBackendServiceClientFactoryTest.java +++ b/components/proxy/src/test/java/com/hotels/styx/proxy/StyxBackendServiceClientFactoryTest.java @@ -17,6 +17,7 @@ import com.hotels.styx.Environment; import com.hotels.styx.StyxConfig; +import com.hotels.styx.api.HttpInterceptor.Context; import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; import com.hotels.styx.api.configuration.Configuration.MapBackedConfiguration; @@ -186,7 +187,7 @@ public void usesTheOriginSpecifiedInTheOriginsRestrictionCookie() { private StyxHostHttpClient hostClient(LiveHttpResponse response) { StyxHostHttpClient mockClient = mock(StyxHostHttpClient.class); - when(mockClient.sendRequest(any(LiveHttpRequest.class))).thenReturn(Flux.just(response)); + when(mockClient.sendRequest(any(LiveHttpRequest.class), any(Context.class))).thenReturn(Flux.just(response)); when(mockClient.loadBalancingMetric()).thenReturn(new LoadBalancingMetric(1)); return mockClient; } diff --git a/components/proxy/src/test/kotlin/com/hotels/styx/routing/handlers/HostProxyTest.kt b/components/proxy/src/test/kotlin/com/hotels/styx/routing/handlers/HostProxyTest.kt index c4ce7d1c01..7527ccf5f2 100644 --- a/components/proxy/src/test/kotlin/com/hotels/styx/routing/handlers/HostProxyTest.kt +++ b/components/proxy/src/test/kotlin/com/hotels/styx/routing/handlers/HostProxyTest.kt @@ -26,6 +26,7 @@ import com.hotels.styx.api.LiveHttpResponse import com.hotels.styx.client.StyxHostHttpClient import com.hotels.styx.client.applications.metrics.OriginMetrics import com.hotels.styx.RoutingObjectFactoryContext +import com.hotels.styx.api.HttpInterceptor.Context import com.hotels.styx.handle import com.hotels.styx.requestContext import com.hotels.styx.routingObjectDef @@ -51,7 +52,7 @@ class HostProxyTest : FeatureSpec() { HostProxy("localhost", 80, client, mockk()).handle(request.stream(), mockk()) verify { - client!!.sendRequest(ofType(LiveHttpRequest::class)) + client!!.sendRequest(ofType(LiveHttpRequest::class), ofType(Context::class)) } } @@ -69,7 +70,7 @@ class HostProxyTest : FeatureSpec() { exception.message shouldBe ("HostProxy localhost:80 is stopped but received traffic.") verify(exactly = 0) { - client!!.sendRequest(any()) + client!!.sendRequest(any(), any()) } } @@ -77,7 +78,7 @@ class HostProxyTest : FeatureSpec() { val client = mockk() val originMetrics = mockk() - every { client.sendRequest(any()) } returns Eventual(Mono.never()) + every { client.sendRequest(any(), any()) } returns Eventual(Mono.never()) val hostProxy = HostProxy("abc", 80, client, originMetrics) @@ -93,7 +94,7 @@ class HostProxyTest : FeatureSpec() { val client = mockk() val originMetrics = mockk() - every { client.sendRequest(any()) } returns Eventual.of( + every { client.sendRequest(any(), any()) } returns Eventual.of( LiveHttpResponse .response() .body(ByteStream(Flux.never())) @@ -164,7 +165,7 @@ class HostProxyTest : FeatureSpec() { override fun beforeTest(testCase: TestCase) { client = mockk(relaxed = true) { - every { sendRequest(any()) } returns Eventual.of(HttpResponse.response(OK).build().stream()) + every { sendRequest(any(), any()) } returns Eventual.of(HttpResponse.response(OK).build().stream()) } } diff --git a/system-tests/e2e-suite/src/test/java/com/hotels/styx/plugins/OnCompleteErrorPlugin.java b/system-tests/e2e-suite/src/test/java/com/hotels/styx/plugins/OnCompleteErrorPlugin.java index 81a562e5f2..051b7b1c7c 100644 --- a/system-tests/e2e-suite/src/test/java/com/hotels/styx/plugins/OnCompleteErrorPlugin.java +++ b/system-tests/e2e-suite/src/test/java/com/hotels/styx/plugins/OnCompleteErrorPlugin.java @@ -1,5 +1,5 @@ /* - Copyright (C) 2013-2018 Expedia Inc. + Copyright (C) 2013-2020 Expedia Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.