Skip to content

Commit

Permalink
DRILL-8393: Allow parameters to be passed to headers through SQL in W…
Browse files Browse the repository at this point in the history
…HERE clause (apache#2747)
  • Loading branch information
LYCJeff authored and cgivre committed Nov 2, 2023
1 parent e6bbefa commit e2b9efd
Show file tree
Hide file tree
Showing 13 changed files with 1,684 additions and 99 deletions.
61 changes: 52 additions & 9 deletions contrib/storage-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ To configure the plugin, create a new storage plugin, and add the following conf
{
"type": "http",
"cacheResults": true,
"enhanced": false,
"connections": {},
"timeout": 0,
"proxyHost": null,
Expand All @@ -29,6 +30,7 @@ The required options are:

* `type`: This should be `http`
* `cacheResults`: Enable caching of the HTTP responses. Defaults to `false`
* `enhanced`: Enable enhanced param syntax in Drill 1.22 and beyond. The enhanced mode allows the user to send the request header through the `WHERE` clause. Defaults to `false`
* `timeout`: Sets the response timeout in seconds. Defaults to `0` which is no timeout.
* `connections`: This field contains the details for individual connections. See the section *Configuring API Connections for Details*.

Expand Down Expand Up @@ -112,17 +114,18 @@ if the parameters specify which data sets to return:
```json
url: "https://api.sunrise-sunset.org/json",
requireTail: false,
params: ["lat", "lng", "date"]
enhanced: true,
params: ["tail.lat", "tail.lng", "tail.date"]
```

SQL query:

```sql
SELECT * FROM api.sunrise
WHERE `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = '2019-10-02'
WHERE `tail.lat` = 36.7201600 AND `tail.lng` = -4.4203400 AND `tail.date` = '2019-10-02'
```

In this case, Drill appends the parameters to the URL, adding a question mark
In this case, Drill appends the `tail` prefixed parameters to the URL, adding a question mark
to separate the two.

#### Method
Expand All @@ -138,11 +141,28 @@ key2=value2"
```

`postParameterLocation`: If the API uses the `POST` method, you can send parameters in several different ways:
* `query_string`: Parameters from the query are pushed down to the query string. Static parameters are pushed to the post body.
* `query_string`: Parameters from the query are pushed down to the query string. Static parameters are pushed to the post body. This option is abandoned in enhanced mode.
* `post_body`: Both static and parameters from the query are pushed to the post body as key/value pairs
* `json_body`: Both static and parameters from the query are pushed to the post body as json.
* `xml_body`: Both static and parameters from the query are pushed to the post body as XML.

```json
url: "https://api.sunrise-sunset.org/json",
requireTail: false,
enhanced: true,
postParameterLocation: "json_body",
params: ["body.lat", "body.lng", "body.date"]
```

SQL query:

```sql
SELECT * FROM api.sunrise
WHERE `body.lat` = 36.7201600 AND `body.lng` = -4.4203400 AND `body.date` = '2019-10-02'
```

In this case, Drill appends the `body` prefixed parameters to the post body as json.

#### Headers

`headers`: Often APIs will require custom headers as part of the authentication. This field allows
Expand All @@ -155,20 +175,43 @@ headers: {
}
```

You can also pass the request headers through the where clause.

```json
url: "https://api.sunrise-sunset.org/json",
requireTail: false,
enhanced: true,
postParameterLocation: "json_body",
params: ["body.lat", "body.lng", "body.date", "header.header1"]
```

SQL query:

```sql
SELECT * FROM api.sunrise
WHERE `body.lat` = 36.7201600 AND `body.lng` = -4.4203400 AND `body.date` = '2019-10-02'
AND `header.header1` = 'value1'
```

In this case, Drill appends the `header` prefixed parameters to the headers.

#### Query Parmeters as Filters

* `params`: Allows you to map SQL `WHERE` clause conditions to query parameters.
* `params`: Allows you to map SQL `WHERE` clause conditions to query parameters, request bodies, and
request headers. The `tail` prefixed params maps to the query parameters at the end of the URL, the
`body` prefixed params maps to the request bodies, and the `header` prefixed params maps to the
request headers.

```json
url: "https://api.sunrise-sunset.org/json",
params: ["lat", "lng", "date"]
params: ["tail.lat", "tail.lng", "tail.date"]
```

SQL query:

```sql
SELECT * FROM api.sunrise
WHERE `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = '2019-10-02'
WHERE `tail.lat` = 36.7201600 AND `tail.lng` = -4.4203400 AND `tail.date` = '2019-10-02'
```

HTTP parameters are untyped; Drill converts any value you provide into a string.
Expand Down Expand Up @@ -493,7 +536,7 @@ body. Set the configuration as follows:
"method": "GET",
"dataPath": "results",
"headers": null,
"params": [ "lat", "lng", "date" ],
"params": [ "tail.lat", "tail.lng", "tail.date" ],
"authType": "none",
"userName": null,
"password": null,
Expand All @@ -508,7 +551,7 @@ Then, to execute a query:
```sql
SELECT sunrise, sunset
FROM http.sunrise
WHERE `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = 'today'
WHERE `tail.lat` = 36.7201600 AND `tail.lng` = -4.4203400 AND `tail.date` = 'today'
```

Which yields the same results as before.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ public class HttpApiConfig {
protected static final String CSV_INPUT_FORMAT = "csv";
protected static final String XML_INPUT_FORMAT = "xml";

public static final String POST_BODY_POST_LOCATION = "post_body";
public static final String QUERY_STRING_POST_LOCATION = "query_string";
public static final String POST_BODY_POST_LOCATION = "post_body";
public static final String XML_BODY_POST_LOCATION = "xml_body";
public static final String JSON_BODY_POST_LOCATION = "json_body";

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {

Expand Down Expand Up @@ -296,18 +297,30 @@ protected HttpUrl buildUrl() {
protected void addFilters(Builder urlBuilder, List<String> params,
Map<String, String> filters) {

// If the request is a POST query and the user selected to push the filters to either JSON body
// or the post body, do not add to the query string.
if (subScan.tableSpec().connectionConfig().getMethodType() == HttpApiConfig.HttpMethod.POST &&
(subScan.tableSpec().connectionConfig().getPostLocation() == PostLocation.POST_BODY ||
subScan.tableSpec().connectionConfig().getPostLocation() == PostLocation.JSON_BODY)
) {
return;
}
for (String param : params) {
String value = filters.get(param);
if (value != null) {
urlBuilder.addQueryParameter(param, value);
final Pattern tailParamsKeyPattern = Pattern.compile("^tail\\..+$");
final HttpStoragePluginConfig config = subScan.tableSpec().config();
if (!config.enableEnhancedParamSyntax()) {
// If the request is a POST query and the user selected to push the filters to either JSON body
// or the post body, do not add to the query string.
if (subScan.tableSpec().connectionConfig().getMethodType() == HttpApiConfig.HttpMethod.GET ||
(subScan.tableSpec().connectionConfig().getMethodType() == HttpApiConfig.HttpMethod.POST
&& subScan.tableSpec().connectionConfig().getPostLocation() == PostLocation.QUERY_STRING)
) {
for (String param : params) {
String value = filters.get(param);
if (value != null) {
urlBuilder.addQueryParameter(param, value);
}
}
}
} else {
for (String param : params) {
if (tailParamsKeyPattern.matcher(param).find()){
String value = filters.get(param);
if (value != null) {
urlBuilder.addQueryParameter(param.substring(5), value);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class HttpStoragePluginConfig extends StoragePluginConfig {
public final String proxyHost;
public final int proxyPort;
public final String proxyType;
public final boolean enableEnhancedParamSyntax;
/**
* Timeout in {@link TimeUnit#SECONDS}.
*/
Expand All @@ -62,6 +63,7 @@ public class HttpStoragePluginConfig extends StoragePluginConfig {

@JsonCreator
public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResults,
@JsonProperty("enhanced") Boolean enableEnhancedParamSyntax,
@JsonProperty("connections") Map<String, HttpApiConfig> connections,
@JsonProperty("timeout") Integer timeout,
@JsonProperty("retryDelay") Integer retryDelay,
Expand Down Expand Up @@ -89,6 +91,7 @@ public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResult
AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER),
oAuthConfig);
this.cacheResults = cacheResults != null && cacheResults;
this.enableEnhancedParamSyntax = enableEnhancedParamSyntax != null && enableEnhancedParamSyntax;
this.retryDelay = (retryDelay == null || retryDelay < 0) ? DEFAULT_RATE_LIMIT : retryDelay;

this.connections = CaseInsensitiveMap.newHashMap();
Expand Down Expand Up @@ -121,6 +124,7 @@ public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResult
private HttpStoragePluginConfig(HttpStoragePluginConfig that, CredentialsProvider credentialsProvider) {
super(credentialsProvider, credentialsProvider == null, that.authMode);
this.cacheResults = that.cacheResults;
this.enableEnhancedParamSyntax = that.enableEnhancedParamSyntax;
this.connections = that.connections;
this.timeout = that.timeout;
this.proxyHost = that.proxyHost;
Expand All @@ -140,6 +144,7 @@ public HttpStoragePluginConfig(HttpStoragePluginConfig that, OAuthConfig oAuthCo
that.credentialsProvider == null);

this.cacheResults = that.cacheResults;
this.enableEnhancedParamSyntax = that.enableEnhancedParamSyntax;
this.connections = that.connections;
this.timeout = that.timeout;
this.proxyHost = that.proxyHost;
Expand All @@ -165,6 +170,7 @@ public HttpStoragePluginConfig copyForPlan(String connectionName) {
Optional<UsernamePasswordWithProxyCredentials> creds = getUsernamePasswordCredentials();
return new HttpStoragePluginConfig(
cacheResults,
enableEnhancedParamSyntax,
configFor(connectionName),
timeout, retryDelay,
username(),
Expand Down Expand Up @@ -196,6 +202,7 @@ public boolean equals(Object that) {
HttpStoragePluginConfig thatConfig = (HttpStoragePluginConfig) that;
return Objects.equals(connections, thatConfig.connections) &&
Objects.equals(cacheResults, thatConfig.cacheResults) &&
Objects.equals(enableEnhancedParamSyntax, thatConfig.enableEnhancedParamSyntax) &&
Objects.equals(proxyHost, thatConfig.proxyHost) &&
Objects.equals(retryDelay, thatConfig.retryDelay) &&
Objects.equals(proxyPort, thatConfig.proxyPort) &&
Expand All @@ -210,6 +217,7 @@ public String toString() {
return new PlanStringBuilder(this)
.field("connections", connections)
.field("cacheResults", cacheResults)
.field("enhanced", enableEnhancedParamSyntax)
.field("timeout", timeout)
.field("retryDelay", retryDelay)
.field("proxyHost", proxyHost)
Expand All @@ -223,13 +231,16 @@ public String toString() {

@Override
public int hashCode() {
return Objects.hash(connections, cacheResults, timeout, retryDelay,
return Objects.hash(connections, cacheResults, enableEnhancedParamSyntax, timeout, retryDelay,
proxyHost, proxyPort, proxyType, oAuthConfig, credentialsProvider, authMode);
}

@JsonProperty("cacheResults")
public boolean cacheResults() { return cacheResults; }

@JsonProperty("enhanced")
public boolean enableEnhancedParamSyntax() { return enableEnhancedParamSyntax; }

@JsonProperty("connections")
public Map<String, HttpApiConfig> connections() { return connections; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ public InputStream getInputStream() {

Request.Builder requestBuilder = new Request.Builder()
.url(url);
final Pattern bodyParamsKeyPattern = Pattern.compile("^body\\..+$");
final Pattern headerParamsKeyPattern = Pattern.compile("^header\\..+$");

// The configuration does not allow for any other request types other than POST and GET.
if (apiConfig.getMethodType() == HttpMethod.POST) {
Expand All @@ -354,11 +356,19 @@ public InputStream getInputStream() {
if (apiConfig.getPostLocation() == PostLocation.POST_BODY) {
formBodyBuilder = buildPostBody(filters, apiConfig.postBody());
requestBuilder.post(formBodyBuilder.build());
} else if (apiConfig.getPostLocation() == PostLocation.JSON_BODY) {
} else if (apiConfig.getPostLocation() == PostLocation.JSON_BODY
|| (apiConfig.getPostLocation() == PostLocation.QUERY_STRING
&& pluginConfig.enableEnhancedParamSyntax())) {
// Add static parameters from postBody
JSONObject json = buildJsonPostBody(apiConfig.postBody());
// Now add filters
if (filters != null) {
if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
if (bodyParamsKeyPattern.matcher(filter.getKey()).find()){
json.put(filter.getKey().substring(5), filter.getValue());
}
}
} else if (filters != null && !pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
json.put(filter.getKey(), filter.getValue());
}
Expand All @@ -369,7 +379,15 @@ public InputStream getInputStream() {
} else if (apiConfig.getPostLocation() == PostLocation.XML_BODY) {
StringBuilder xmlRequest = new StringBuilder();
xmlRequest.append("<request>");
if (filters != null) {
if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
if (bodyParamsKeyPattern.matcher(filter.getKey()).find()){
xmlRequest.append("<").append(filter.getKey().substring(5)).append(">");
xmlRequest.append(filter.getValue());
xmlRequest.append("</").append(filter.getKey().substring(5)).append(">");
}
}
} else if (filters != null && !pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
xmlRequest.append("<").append(filter.getKey()).append(">");
xmlRequest.append(filter.getValue());
Expand Down Expand Up @@ -398,6 +416,14 @@ public InputStream getInputStream() {
}
}

if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
if (headerParamsKeyPattern.matcher(filter.getKey()).find()){
requestBuilder.addHeader(filter.getKey().substring(7), filter.getValue());
}
}
}

// Build the request object
Request request = requestBuilder.build();
Response response = null;
Expand Down Expand Up @@ -657,10 +683,19 @@ private JSONObject buildJsonPostBody(String postBody) {
public FormBody.Builder buildPostBody(Map<String, String> filters, String postBody) {
// Add static parameters
FormBody.Builder builder = buildPostBody(postBody);
final Pattern bodyParamsKeyPattern = Pattern.compile("^body\\..+$");

// Now add the filters
for (Map.Entry<String, String> filter : filters.entrySet()) {
builder.add(filter.getKey(), filter.getValue());
if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
if (bodyParamsKeyPattern.matcher(filter.getKey()).find()){
builder.add(filter.getKey().substring(5), filter.getValue());
}
}
} else if (filters != null && !pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
builder.add(filter.getKey(), filter.getValue());
}
}
return builder;
}
Expand Down
Loading

0 comments on commit e2b9efd

Please sign in to comment.