Skip to content

Commit

Permalink
DRILL-8437: Add Header Index Pagination (#2806)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgivre authored May 21, 2023
1 parent 384fd81 commit 981aa64
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 15 deletions.
2 changes: 1 addition & 1 deletion contrib/storage-http/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1 @@
./src/test/resources/logback-test.xml
/src/test/resources/logback-test.xml
12 changes: 12 additions & 0 deletions contrib/storage-http/Pagination.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,15 @@ There are three possible parameters:


** Note: Index / Keyset Pagination is only implemented for APIs that return JSON **

## Header Index Pagination
Header index pagination is used when the API in question returns a link to the next page in the response header. Shopify is one such example of an API that does this.

The only configuration option is the `nextPageParam` which is the parameter that Drill should look for in the response header.

```json
"paginator": {
"nextPageParam": "page",
"method": "HEADER_INDEX"
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import java.io.File;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -118,6 +119,7 @@ public void addContext(UserException.Builder builder) {
.scanDefn(subScan)
.url(url)
.tempDir(new File(tempDirPath))
.paginator(paginator)
.proxyConfig(proxySettings(negotiator.drillConfig(), url))
.errorContext(errorContext)
.build();
Expand Down Expand Up @@ -225,7 +227,7 @@ protected void populateImplicitFieldMap(SimpleHttp http) {

protected Map<String, Object> generatePaginationFieldMap() {
if (paginator == null || paginator.getMode() != PaginatorMethod.INDEX) {
return null;
return Collections.emptyMap();
}

Map<String, Object> fieldMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void addContext(UserException.Builder builder) {
.scanDefn(subScan)
.url(url)
.tempDir(new File(tempDirPath))
.paginator(paginator)
.proxyConfig(proxySettings(negotiator.drillConfig(), url))
.errorContext(errorContext)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,18 @@ public HttpPaginatorConfig(HttpPaginatorConfigBuilder builder) {
.build(logger);
}
break;
case HEADER_INDEX:
if (StringUtils.isEmpty(this.nextPageParam)) {
throw UserException
.validationError()
.message("Invalid paginator configuration. For HEADER_INDEX pagination, the nextPageParam must be defined.")
.build(logger);
}
break;
default:
throw UserException
.validationError()
.message("Invalid paginator method: %s. Drill supports 'OFFSET', 'INDEX' and 'PAGE'", method)
.message("Invalid paginator method: %s. Drill supports 'OFFSET', 'INDEX', 'HEADER_INDEX' and 'PAGE'", method)
.build(logger);
}
}
Expand Down Expand Up @@ -230,7 +238,8 @@ public String toString() {
public enum PaginatorMethod {
OFFSET,
PAGE,
INDEX
INDEX,
HEADER_INDEX
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
import org.apache.drill.exec.store.http.paginator.HeaderIndexPaginator;
import org.apache.drill.exec.store.http.paginator.IndexPaginator;
import org.apache.drill.exec.store.http.paginator.OffsetPaginator;
import org.apache.drill.exec.store.http.paginator.PagePaginator;
Expand Down Expand Up @@ -92,7 +93,7 @@ public void addContext(UserException.Builder builder) {
private static class HttpReaderFactory implements ReaderFactory {
private final HttpSubScan subScan;
private final HttpPaginatorConfig paginatorConfig;
private Paginator paginator;
private final Paginator paginator;

private int count;

Expand All @@ -105,6 +106,8 @@ public HttpReaderFactory(HttpSubScan subScan) {

// Initialize the paginator and generate the base URLs
this.paginator = getPaginator();
} else {
this.paginator = null;
}
}

Expand All @@ -119,8 +122,6 @@ private Paginator getPaginator() {
rawUrl = HttpUrl.parse(subScan.tableSpec().connectionConfig().url());
}



// If the URL is not parsable or otherwise invalid
if (rawUrl == null) {
throw UserException.validationError()
Expand All @@ -130,28 +131,33 @@ private Paginator getPaginator() {

urlBuilder = rawUrl.newBuilder();

Paginator paginator = null;
if (paginatorConfig.getMethodType() == PaginatorMethod.OFFSET) {
paginator = new OffsetPaginator(urlBuilder,
return new OffsetPaginator(urlBuilder,
subScan.maxRecords(),
paginatorConfig.pageSize(),
paginatorConfig.limitParam(),
paginatorConfig.offsetParam());
} else if (paginatorConfig.getMethodType() == PaginatorMethod.PAGE) {
paginator = new PagePaginator(urlBuilder,
return new PagePaginator(urlBuilder,
subScan.maxRecords(),
paginatorConfig.pageSize(),
paginatorConfig.pageParam(),
paginatorConfig.pageSizeParam());
} else if (paginatorConfig.getMethodType() == PaginatorMethod.INDEX) {
paginator = new IndexPaginator(urlBuilder,
return new IndexPaginator(urlBuilder,
0, // Page size not used for Index/Keyset pagination
subScan.maxRecords(),
paginatorConfig.hasMoreParam(),
paginatorConfig.indexParam(),
paginatorConfig.nextPageParam());
} else if (paginatorConfig.getMethodType() == PaginatorMethod.HEADER_INDEX) {
return new HeaderIndexPaginator(urlBuilder,
subScan.maxRecords(),
paginatorConfig.pageSize(),
paginatorConfig.nextPageParam(),
subScan.tableSpec().connectionConfig().url());
}
return paginator;
return null;
}

@Override
Expand Down Expand Up @@ -181,6 +187,7 @@ public ManagedReader<SchemaNegotiator> next() {
* the group scan such that the calls could be sent to different drillbits.
*/
if (!paginator.hasNext()) {
logger.debug("Ending Batch Generation.");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.drill.exec.store.http.paginator;

import okhttp3.Headers;
import okhttp3.HttpUrl.Builder;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* The Header Index Paginator is used when the API in question send a link in the HTTP header
* containing the URL for the next page.
*/
public class HeaderIndexPaginator extends Paginator {

private static final Logger logger = LoggerFactory.getLogger(HeaderIndexPaginator.class);
private static final Pattern URL_REGEX = Pattern.compile("(https?:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*))");

private final String nextPageParam;
private final String firstPageURL;
private Headers headers;
private boolean firstPage;
private int pageCount;

public HeaderIndexPaginator(Builder builder, int pageSize, int limit, String nextPageParam, String firstPageURL) {
super(builder, PaginatorMethod.HEADER_INDEX, pageSize, limit);
this.nextPageParam = nextPageParam;
this.firstPageURL = firstPageURL;
this.firstPage = true;
this.pageCount = 0;
}

@Override
public boolean hasNext() {
// If the headers are null and it isn't the first page, end pagination
if ( !firstPage &&
(headers == null || StringUtils.isEmpty(headers.get(nextPageParam)))
) {
notifyPartialPage();
logger.debug("Ending pagination. No additional info in headers.");
return false;
}

return !partialPageReceived;
}

/**
* This method sets the headers for the Header Index Paginator. This must be called with updated headers
* before the {@link #next()} method is called.
* @param headers A {@link Headers} object containing the response headers from the previous API call.
*/
public void setResponseHeaders(Headers headers) {
logger.debug("Setting response headers. ");
this.headers = headers;

// If the next page URL is empty or otherwise undefined, halt pagination.
if (StringUtils.isEmpty(headers.get(nextPageParam))) {
notifyPartialPage();
}
}

@Override
public String next() {
pageCount++;
if (firstPage) {
firstPage = false;
return firstPageURL;
}

if (headers == null) {
throw UserException.dataReadError()
.message("Headers are empty. HeaderIndex Pagination requires parameters that are passed in the HTTP header." + pageCount)
.build(logger);
}
// Now attempt to retrieve the field from the response headers.
String nextPage = headers.get(nextPageParam);

// If the next page value is null or empty, halt pagination
if (StringUtils.isEmpty(nextPage)) {
super.notifyPartialPage();
return null;
}

logger.debug("Found next page URL: {}", nextPage);

// Clean up any extraneous garbage from the header field.
Matcher urlMatcher = URL_REGEX.matcher(nextPage);
if (urlMatcher.find()) {
return urlMatcher.group(1);
}

return nextPage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class PagePaginator extends Paginator {

private static final Logger logger = LoggerFactory.getLogger(OffsetPaginator.class);
private static final Logger logger = LoggerFactory.getLogger(PagePaginator.class);

private final int limit;
private final String pageParam;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
import org.apache.drill.exec.store.http.HttpApiConfig;
import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod;
import org.apache.drill.exec.store.http.HttpApiConfig.PostLocation;
import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
import org.apache.drill.exec.store.http.HttpStoragePlugin;
import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
import org.apache.drill.exec.store.http.HttpSubScan;
import org.apache.drill.exec.store.http.paginator.HeaderIndexPaginator;
import org.apache.drill.exec.store.http.paginator.Paginator;
import org.apache.drill.exec.store.http.oauth.AccessTokenAuthenticator;
import org.apache.drill.exec.store.http.oauth.AccessTokenInterceptor;
Expand Down Expand Up @@ -110,8 +112,6 @@ public class SimpleHttp implements AutoCloseable {
.writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
.readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
.build();


private final OkHttpClient client;
private final File tempDir;
private final HttpProxyConfig proxyConfig;
Expand Down Expand Up @@ -436,6 +436,11 @@ public InputStream getInputStream() {
logger.debug("HTTP Request for {} successful.", url());
logger.debug("Response Headers: {} ", response.headers());

// In the case of Header Index Pagination, send the header(s) to the paginator
if (paginator != null && paginator.getMode() == PaginatorMethod.HEADER_INDEX) {
((HeaderIndexPaginator)paginator).setResponseHeaders(response.headers());
}

// Return the InputStream of the response. Note that it is necessary and
// and sufficient that the caller invokes close() on the returned stream.
return Objects.requireNonNull(response.body()).byteStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ public static void makeMockConfig(ClusterFixture cluster) {
headers.put("header1", "value1");
headers.put("header2", "value2");

HttpPaginatorConfig headerIndexPaginator = HttpPaginatorConfig.builder()
.nextPageParam("link")
.pageSize(10)
.method("header_index")
.build();

HttpApiConfig mockJsonConfigWithHeaderIndex = HttpApiConfig.builder()
.url("http://localhost:8092/json")
.method("get")
.requireTail(false)
.paginator(headerIndexPaginator)
.inputType("json")
.build();


HttpPaginatorConfig offsetPaginatorForJson = HttpPaginatorConfig.builder()
.limitParam("limit")
Expand Down Expand Up @@ -329,6 +343,7 @@ public static void makeMockConfig(ClusterFixture cluster) {
configs.put("json_tail", mockJsonConfigWithPaginatorAndTail);
configs.put("xml_paginator", mockXmlConfigWithPaginator);
configs.put("xml_paginator_url_params", mockXmlConfigWithPaginatorAndUrlParams);
configs.put("customers", mockJsonConfigWithHeaderIndex);

HttpStoragePluginConfig mockStorageConfigWithWorkspace =
new HttpStoragePluginConfig(false, configs, 2,1000, null, null, "", 80, "", "", "", null,
Expand All @@ -338,6 +353,28 @@ public static void makeMockConfig(ClusterFixture cluster) {
}


@Test
public void testPagePaginationWithHeaderIndex() throws Exception {
String sql = "SELECT col1, _response_url FROM `local`.`customers`";
try (MockWebServer server = startServer()) {
server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1).setHeader("link", "http://localhost:8092/json?page=2"));
server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE2).setHeader("link", "http://localhost:8092/json?page=3"));
server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE3));

List<QueryDataBatch> results = client.queryBuilder()
.sql(sql)
.results();

int count = 0;
for(QueryDataBatch b : results){
count += b.getHeader().getRowCount();
b.release();
}

assertEquals(3, results.size());
}
}

@Test
@Ignore("Requires Live Connection to Github")
public void testPagePaginationWithURLParameters() throws Exception {
Expand Down

0 comments on commit 981aa64

Please sign in to comment.