Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add x header reflection #585

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
de20691
added filter to reflect any X- headers in the response. This is initi…
Mar 23, 2018
f9d2bfa
added configuration option and doco for 'enable.reflect.xheaders'
Mar 26, 2018
bff088f
added a test for xreflectheaders
Mar 26, 2018
750a242
fixing up checkstyle violations
Mar 27, 2018
b4a02f7
fixing tests for XHeader Reflection
Mar 27, 2018
3b23943
trying to get assertEquals to work with a multivaluemap which seems t…
Mar 27, 2018
69eb367
removed obsolete test class
Mar 27, 2018
2a03b91
so my issue with the tests and comparing strings and collections is a…
Mar 27, 2018
b28934b
seems you have to do the split manually.
Mar 27, 2018
a62ff1c
setting back to default - accidental commit removed.
Apr 1, 2018
1c77dd7
found the more in keeping way to add config, modified to match.
Apr 1, 2018
0e6a101
fix checkstyle issue
Apr 1, 2018
0675efc
found a better way to push in a test property
Apr 1, 2018
68a5756
issue with wrong version of resources being imported now corrected
Apr 1, 2018
9da6c9f
removed markdown syntax from doc string
Apr 1, 2018
68ddc9b
removed 'enable' from 'enable.reflect.xheaders' in doc
Apr 1, 2018
9b51e00
got the 'reflect_xheaders' setting backwards for this test. corrected…
Apr 1, 2018
f65485f
completed config conversion to proper boolean rather than pseudo-bool…
Apr 1, 2018
e325c7d
added configuration for setting idle timeout - requires updated rest-…
May 30, 2018
e7e0c28
broke reflect.xhearders, fixed it now.
May 30, 2018
8591a3c
Merge branch '4.1.1-post' of https://github.com/confluentinc/kafka-re…
Jul 3, 2018
2682a98
Merge branch 'confluentinc-4.1.1-post' into add-idle-timeout-config
Jul 3, 2018
f25ccec
Merge branch '5.0.0-post' of https://github.com/confluentinc/kafka-re…
joncourt Aug 8, 2018
7507cd1
Merge branch 'feature/merge-v5.0.0' into develop
joncourt Aug 8, 2018
9d45b19
added host.port config entry to do a similar job to host.name where t…
joncourt Aug 15, 2018
299c040
Tweaked order of logic for setting override port in UriUtils, added h…
joncourt Aug 16, 2018
3af252e
Merge pull request #2 from joncourt/add-idle-timeout-config
joncourt Aug 16, 2018
c9fde5f
Merge branch '5.3.0-post' of https://github.com/confluentinc/kafka-re…
Sep 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public class KafkaRestConfig extends RestConfig {
public static final String ID_CONFIG = "id";
private static final String ID_CONFIG_DOC =
"Unique ID for this REST server instance. This is used in generating unique IDs for "
+ "consumers that do "
+ "not specify their ID. The ID is empty by default, which makes a single server setup "
+ "easier to "
+ "get up and running, but is not safe for multi-server deployments where automatic "
+ "consumer IDs "
+ "are used.";
+ "consumers that do "
+ "not specify their ID. The ID is empty by default, which makes a single server setup "
+ "easier to "
+ "get up and running, but is not safe for multi-server deployments where automatic "
+ "consumer IDs "
+ "are used.";
public static final String ID_DEFAULT = "";

public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
Expand All @@ -62,7 +62,7 @@ public class KafkaRestConfig extends RestConfig {
public static final String HOST_NAME_CONFIG = "host.name";
private static final String HOST_NAME_DOC =
"The host name used to generate absolute URLs in responses. If empty, the default canonical"
+ " hostname is used";
+ " hostname is used";
public static final String HOST_NAME_DEFAULT = "";

public static final String CONSUMER_MAX_THREADS_CONFIG = "consumer.threads";
Expand All @@ -74,38 +74,40 @@ public class KafkaRestConfig extends RestConfig {
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
private static final String ZOOKEEPER_CONNECT_DOC =
"NOTE: Only required when using v1 Consumer API's. Specifies the ZooKeeper connection "
+ "string in the form "
+ "hostname:port where host and port are the host and port of a ZooKeeper server. To allow "
+ "connecting "
+ "through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify "
+ "multiple hosts "
+ "in the form hostname1:port1,hostname2:port2,hostname3:port3.\n"
+ "\n"
+ "The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection "
+ "string which puts "
+ "its data under some path in the global ZooKeeper namespace. If so the consumer should "
+ "use the same "
+ "chroot path in its connection string. For example to give a chroot path of /chroot/path "
+ "you would give "
+ "the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path. ";
+ "string in the form hostname:port where host and port are the host and port of a "
+ "ZooKeeper server. To allow connecting through other ZooKeeper nodes when that "
+ "ZooKeeper machine is down you can also specify multiple hosts in the form "
+ "hostname1:port1,hostname2:port2,hostname3:port3."
+ "\n"
+ "\n"
+ "The server may also have a "
+ "ZooKeeper chroot path as part of it's ZooKeeper connection string which puts its "
+ "data under some path in the global ZooKeeper namespace. If so the consumer should "
+ "use the same chroot path in its connection string. For example to give a chroot path "
+ "of /chroot/path you would give the connection string as hostname1:port1,"
+ "hostname2:port2,hostname3:port3/chroot/path. ";
public static final String ZOOKEEPER_CONNECT_DEFAULT = "";

public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
private static final String BOOTSTRAP_SERVERS_DOC =
"A list of host/port pairs to use for establishing the initial connection to the Kafka "
+ "cluster. "
+ "The client will make use of all servers irrespective of which servers are specified here"
+ " for "
+ "bootstrapping—this list only impacts the initial hosts used to discover the full set of "
+ "servers. "
+ "This list should be in the form host1:port1,host2:port2,.... Since these servers are "
+ "just used for the "
+ "initial connection to discover the full cluster membership (which may change "
+ "dynamically), "
+ "this list need not contain the full set of servers (you may want more than one, though, "
+ "in case a server is down).";
+ "cluster. The client will make use of all servers irrespective of which servers are "
+ "specified here for bootstrapping—this list only impacts the initial hosts used to "
+ "discover the full set of servers. This list should be in the form host1:port1,"
+ "host2:port2,.... Since these servers are just used for the initial connection to "
+ "discover the full cluster membership (which may change dynamically), this list need "
+ "not contain the full set of servers (you may want more than one, though, in case a "
+ "server is down).";

public static final String BOOTSTRAP_SERVERS_DEFAULT = "";

public static final String REFLECT_XHEADERS_CONFIG = "reflect.xheaders";
private static final String REFLECT_XHEADERS_DOC =
"Set true to have any headers starting with 'x-' or 'X-'"
+ "reflected unchanged in the response. This can be used for explicitly"
+ "correlating requests with responses for those clients that need it.";
public static final Boolean REFLECT_XHEADERS_DEFAULT = false;

public static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
private static final String SCHEMA_REGISTRY_URL_DOC =
"The base URL for the schema registry that should be used by the Avro serializer.";
Expand All @@ -129,25 +131,24 @@ public class KafkaRestConfig extends RestConfig {

public static final String CONSUMER_ITERATOR_TIMEOUT_MS_CONFIG = "consumer.iterator.timeout.ms";
private static final String CONSUMER_ITERATOR_TIMEOUT_MS_DOC =
"Timeout for blocking consumer iterator operations. "
+ "This should be set to a small enough value that it is possible to effectively peek() on "
+ "the iterator.";
"Timeout for blocking consumer iterator operations. This should be set to a small enough "
+ "value that it is possible to effectively peek() on the iterator.";
public static final String CONSUMER_ITERATOR_TIMEOUT_MS_DEFAULT = "1";

public static final String CONSUMER_ITERATOR_BACKOFF_MS_CONFIG = "consumer.iterator.backoff.ms";
private static final String CONSUMER_ITERATOR_BACKOFF_MS_DOC =
"Amount of time to backoff when an iterator runs "
+ "out of data. If a consumer has a dedicated worker thread, this is effectively the "
+ "maximum error for the "
+ "entire request timeout. It should be small enough to closely target the timeout, but "
+ "large enough to "
+ "avoid busy waiting.";
+ "out of data. If a consumer has a dedicated worker thread, this is effectively the "
+ "maximum error for the "
+ "entire request timeout. It should be small enough to closely target the timeout, but "
+ "large enough to "
+ "avoid busy waiting.";
public static final String CONSUMER_ITERATOR_BACKOFF_MS_DEFAULT = "50";

public static final String CONSUMER_REQUEST_TIMEOUT_MS_CONFIG = "consumer.request.timeout.ms";
private static final String CONSUMER_REQUEST_TIMEOUT_MS_DOC =
"The maximum total time to wait for messages for a "
+ "request if the maximum number of messages has not yet been reached.";
+ "request if the maximum number of messages has not yet been reached.";
public static final String CONSUMER_REQUEST_TIMEOUT_MS_DEFAULT = "1000";

public static final String CONSUMER_REQUEST_MAX_BYTES_CONFIG = "consumer.request.max.bytes";
Expand All @@ -163,20 +164,20 @@ public class KafkaRestConfig extends RestConfig {
public static final String CONSUMER_INSTANCE_TIMEOUT_MS_CONFIG = "consumer.instance.timeout.ms";
private static final String CONSUMER_INSTANCE_TIMEOUT_MS_DOC =
"Amount of idle time before a consumer instance "
+ "is automatically destroyed.";
+ "is automatically destroyed.";
public static final String CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT = "300000";

public static final String SIMPLE_CONSUMER_MAX_POOL_SIZE_CONFIG = "simpleconsumer.pool.size.max";
private static final String SIMPLE_CONSUMER_MAX_POOL_SIZE_DOC =
"Maximum number of SimpleConsumers that can be instantiated per broker."
+ " If 0, then the pool size is not limited.";
+ " If 0, then the pool size is not limited.";
public static final String SIMPLE_CONSUMER_MAX_POOL_SIZE_DEFAULT = "25";

public static final String SIMPLE_CONSUMER_POOL_TIMEOUT_MS_CONFIG =
"simpleconsumer.pool.timeout.ms";
private static final String SIMPLE_CONSUMER_POOL_TIMEOUT_MS_DOC =
"Amount of time to wait for an available SimpleConsumer from the pool before failing."
+ " Use 0 for no timeout";
+ " Use 0 for no timeout";
public static final String SIMPLE_CONSUMER_POOL_TIMEOUT_MS_DEFAULT = "1000";

// TODO: change this to "http://0.0.0.0:8082" when PORT_CONFIG is deleted.
Expand Down Expand Up @@ -246,18 +247,18 @@ public class KafkaRestConfig extends RestConfig {
"Zookeeper session timeout";
protected static final String KAFKACLIENT_INIT_TIMEOUT_DOC =
"The timeout for initialization of the Kafka store, including creation of the Kafka topic "
+ "that stores schema data.";
+ "that stores schema data.";
protected static final String KAFKACLIENT_TIMEOUT_DOC =
"The timeout for an operation on the Kafka store";
protected static final String
ZOOKEEPER_SET_ACL_DOC =
"Whether or not to set an ACL in ZooKeeper when znodes are created and ZooKeeper SASL "
+ "authentication is "
+ "configured. IMPORTANT: if set to `true`, the SASL principal must be the same as the "
+ "Kafka brokers.";
+ "authentication is "
+ "configured. IMPORTANT: if set to `true`, the SASL principal must be the same as the "
+ "Kafka brokers.";
protected static final String KAFKACLIENT_SECURITY_PROTOCOL_DOC =
"The security protocol to use when connecting with Kafka, the underlying persistent storage. "
+ "Values can be `PLAINTEXT`, `SSL`, `SASL_PLAINTEXT`, or `SASL_SSL`.";
+ "Values can be `PLAINTEXT`, `SSL`, `SASL_PLAINTEXT`, or `SASL_SSL`.";
protected static final String KAFKACLIENT_SSL_TRUSTSTORE_LOCATION_DOC =
"The location of the SSL trust store file.";
protected static final String KAFKACLIENT_SSL_TRUSTSTORE_PASSWORD_DOC =
Expand Down Expand Up @@ -287,12 +288,12 @@ public class KafkaRestConfig extends RestConfig {
protected static final String
KAFKACLIENT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC =
"The endpoint identification algorithm to validate the server hostname using the server "
+ "certificate.";
+ "certificate.";
public static final String
KAFKACLIENT_SASL_KERBEROS_SERVICE_NAME_DOC =
"The Kerberos principal name that the Kafka client runs as. This can be defined either in "
+ "the JAAS "
+ "config file or here.";
+ "the JAAS "
+ "config file or here.";
public static final String KAFKACLIENT_SASL_MECHANISM_DOC =
"The SASL mechanism used for Kafka connections. GSSAPI is the default.";
public static final String KAFKACLIENT_SASL_KERBEROS_KINIT_CMD_DOC =
Expand All @@ -304,13 +305,13 @@ public class KafkaRestConfig extends RestConfig {
public static final String
KAFKACLIENT_SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC =
"Login thread will sleep until the specified window factor of time from last refresh to "
+ "ticket's expiry has "
+ "been reached, at which time it will try to renew the ticket.";
+ "ticket's expiry has "
+ "been reached, at which time it will try to renew the ticket.";
protected static final String KAFKA_REST_RESOURCE_EXTENSION_DOC =
" A list of classes to use as RestResourceExtension. Implementing the interface "
+ " <code>RestResourceExtension</code> allows you to inject user defined resources "
+ " like filters to Rest Proxy. Typically used to add custom capability like logging, "
+ " security, etc.";
+ " <code>RestResourceExtension</code> allows you to inject user defined resources "
+ " like filters to Rest Proxy. Typically used to add custom capability like logging, "
+ " security, etc.";
private static final boolean ZOOKEEPER_SET_ACL_DEFAULT = false;
private static final ConfigDef config;

Expand Down Expand Up @@ -611,6 +612,11 @@ protected static ConfigDef baseKafkaRestConfigDef() {
"",
Importance.LOW,
KAFKA_REST_RESOURCE_EXTENSION_DOC
).define(REFLECT_XHEADERS_CONFIG,
Type.BOOLEAN,
REFLECT_XHEADERS_DEFAULT,
Importance.LOW,
REFLECT_XHEADERS_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.confluent.kafkarest.resources.PartitionsResource;
import io.confluent.kafkarest.resources.RootResource;
import io.confluent.kafkarest.resources.TopicsResource;
import io.confluent.kafkarest.resources.XHeaderReflectingResponseFilter;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
import io.confluent.rest.Application;
import io.confluent.rest.RestConfigException;
Expand Down Expand Up @@ -111,6 +112,7 @@ protected void setupInjectedResources(
config.register(new io.confluent.kafkarest.resources.v2.ConsumersResource(context));
config.register(new io.confluent.kafkarest.resources.v2.PartitionsResource(context));
config.register(KafkaRestCleanupFilter.class);
config.register(new XHeaderReflectingResponseFilter(context));

for (RestResourceExtension restResourceExtension : restResourceExtensions) {
restResourceExtension.register(config, appConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright 2015 Confluent Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package io.confluent.kafkarest.resources;

import io.confluent.kafkarest.KafkaRestContext;

import java.util.List;
import java.util.Map.Entry;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.ext.Provider;

import static io.confluent.kafkarest.KafkaRestConfig.REFLECT_XHEADERS_CONFIG;

/**
* Simply reflects any request headers starting with x- (or X-) into the response. This is initially
* to allow clients that need to manually correlate requests with responses to inject a header,
* usually something like X-Correlation-Id, to facilitate matching.
*/
@Provider
public class XHeaderReflectingResponseFilter implements ContainerResponseFilter {

private boolean enabled;

/**
* Reads the given context to extract the 'reflect.xheaders' property and assess it for
* true or false. Default is 'false' if it's not specified or context is null.
*/
public XHeaderReflectingResponseFilter(KafkaRestContext context) {
if (context == null) {
enabled = false;
} else {
final String property =
context.getConfig().getOriginalProperties().getProperty(REFLECT_XHEADERS_CONFIG);
enabled = Boolean.valueOf(property);
}

}

@Override
public void filter(final ContainerRequestContext requestContext,
final ContainerResponseContext responseContext) {
if (enabled) {
for (Entry<String, List<String>> header : requestContext.getHeaders().entrySet()) {
if (header.getKey() != null
&& header.getKey().toLowerCase().trim().startsWith("x-")) {
List<String> valueList = header.getValue();
for (String value : valueList) {
responseContext.getHeaders().add(header.getKey(), value);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.confluent.kafkarest.integration;

import io.confluent.kafkarest.Versions;

import java.util.Properties;
import javax.ws.rs.core.Response;

import org.junit.Test;

import static io.confluent.kafkarest.KafkaRestConfig.REFLECT_XHEADERS_CONFIG;
import static io.confluent.kafkarest.TestUtils.assertOKResponse;
import static org.junit.Assert.assertFalse;

/**
* Verifies that the XHeaderReflectingResponseFilter is engaged when 'enable.reflect.xheaders'
* property is set to 'true'.
* <p>
* This is non-exhaustive - testing only a sample set of API calls.
*/
public class XHeadersAreReflectedDisabledTest extends ClusterTestHarness {

public XHeadersAreReflectedDisabledTest() {
super(1, false);
}

@Override
protected void overrideKafkaRestConfigs(Properties restProperties) {
restProperties.put(REFLECT_XHEADERS_CONFIG, "false");
}

@Test
public void requestXHeaderIsNotReflectedInResponseWhenNotEnabled() {

// given x-header reflection is disabled and...
final String xHeaderKey = "x-some-arbitrary-header";
final String xHeaderValue = "some-value";

// when
Response response = request("/topics").header(xHeaderKey, xHeaderValue).get();
assertOKResponse(response, Versions.KAFKA_MOST_SPECIFIC_DEFAULT);

// then
assertFalse(response.getHeaders().containsKey(xHeaderKey));
}
}
Loading