Skip to content

Commit

Permalink
Merge pull request #30 from commoncrawl/cc-http2
Browse files Browse the repository at this point in the history
WARC writer support HTTP/2
  • Loading branch information
sebastian-nagel authored Jul 27, 2024
2 parents 0a8e162 + 5f43692 commit 860f269
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 51 deletions.
10 changes: 10 additions & 0 deletions conf/nutch-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2885,6 +2885,16 @@ CAUTION: Set the parser.timeout to -1 or a bigger value than 30, when using this
</description>
</property>

<property>
<name>store.protocol.versions</name>
<value>false</value>
<description>
Store protocol versions in response metadata: HTTP and SSL/TLS
versions, SSL/TTL cipher suites and related information depending
on the protocol implementation. Supported by: protocol-okhttp.
</description>
</property>

<!-- index-links plugin -->

<property>
Expand Down
26 changes: 26 additions & 0 deletions src/java/org/apache/nutch/fetcher/FetcherThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.net.protocols.ProtocolLogUtil;
import org.apache.nutch.net.protocols.Response;
import org.apache.nutch.parse.Outlink;
import org.apache.nutch.parse.Parse;
import org.apache.nutch.parse.ParseData;
Expand Down Expand Up @@ -136,6 +137,7 @@ public class FetcherThread extends Thread {
private boolean storingContent;
private boolean storingWarc;
private boolean storing404s;
private boolean storingProtocolVersions;

private boolean signatureWithoutParsing;

Expand Down Expand Up @@ -195,6 +197,8 @@ public FetcherThread(Configuration conf, AtomicInteger activeThreads, FetchItemQ
this.storingContent = storingContent;
this.storing404s = conf.getBoolean("fetcher.store.404s", false);
this.storingWarc = Fetcher.isStoringWarc(conf);
this.storingProtocolVersions = conf.getBoolean("store.protocol.versions",
false);
this.pages = pages;
this.bytes = bytes;

Expand Down Expand Up @@ -428,6 +432,10 @@ public void run() {
}
context.getCounter("FetcherStatus", status.getName()).increment(1);

if (storingProtocolVersions && content != null) {
countProtocolVersions(content.getMetadata());
}

switch (status.getCode()) {

case ProtocolStatus.WOULDBLOCK:
Expand Down Expand Up @@ -684,6 +692,24 @@ private void logError(Text url, String message) {
errors.incrementAndGet();
}

private void countProtocolVersions(Metadata contentMetadata) {
if (contentMetadata == null) {
return;
}
String versionStr = contentMetadata.get(Response.PROTOCOL_VERSIONS);
if (versionStr != null) {
String[] versions = versionStr.split(",");
if (versions.length >= 1) {
context.getCounter("HttpProtocolVersion", versions[0]).increment(1);
} else {
context.getCounter("HttpProtocolVersion", "unknown").increment(1);
}
for (int i = 1; i < versions.length; i++) {
context.getCounter("TlsProtocolVersion", versions[i]).increment(1);
}
}
}

private ParseStatus output(Text key, CrawlDatum datum, Content content,
ProtocolStatus pstatus, int status) throws InterruptedException{
return output(key, datum, content, pstatus, status, 0);
Expand Down
12 changes: 12 additions & 0 deletions src/java/org/apache/nutch/net/protocols/Response.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ public interface Response extends HttpHeaders {
*/
public static final String IP_ADDRESS = "_ip_";

/**
* Key to hold the HTTP and SSL/TLS protocol versions if
* <code>store.protocol.versions</code> is true.
*/
public static final String PROTOCOL_VERSIONS = "_protocol_versions_";

/**
* Key to hold the SSL/TLS cipher suites
* <code>store.protocol.versions</code> is true.
*/
public static final String CIPHER_SUITES = "_cipher_suites_";

/**
* Key to hold the time when the page has been fetched
*/
Expand Down
11 changes: 6 additions & 5 deletions src/java/org/commoncrawl/util/WarcCdxWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ public WarcCdxWriter(OutputStream warcOut, OutputStream cdxOut,
public URI writeWarcRevisitRecord(final URI targetUri, final String ip,
final int httpStatusCode, final Date date, final URI warcinfoId,
final URI relatedId, final String warcProfile, final Date refersToDate,
final String payloadDigest, final String blockDigest, byte[] block,
final String payloadDigest, final String blockDigest,
String[] protocolVersions, String[] cipherSuites, byte[] block,
Content content) throws IOException {
long offset = countingOut.getByteCount();
URI recordId = super.writeWarcRevisitRecord(targetUri, ip, httpStatusCode,
date, warcinfoId, relatedId, warcProfile, refersToDate, payloadDigest,
blockDigest, block, content);
blockDigest, protocolVersions, cipherSuites, block, content);
long length = (countingOut.getByteCount() - offset);
writeCdxLine(targetUri, date, offset, length, payloadDigest, content, true,
null, null);
Expand All @@ -114,12 +115,12 @@ public URI writeWarcRevisitRecord(final URI targetUri, final String ip,
public URI writeWarcResponseRecord(final URI targetUri, final String ip,
final int httpStatusCode, final Date date, final URI warcinfoId,
final URI relatedId, final String payloadDigest, final String blockDigest,
final String truncated, final byte[] block, Content content)
throws IOException {
final String truncated, String[] protocolVersions, String[] cipherSuites,
final byte[] block, Content content) throws IOException {
long offset = countingOut.getByteCount();
URI recordId = super.writeWarcResponseRecord(targetUri, ip, httpStatusCode,
date, warcinfoId, relatedId, payloadDigest, blockDigest, truncated,
block, content);
protocolVersions, cipherSuites, block, content);
long length = (countingOut.getByteCount() - offset);
String redirectLocation = null;
if (isRedirect(httpStatusCode)) {
Expand Down
109 changes: 94 additions & 15 deletions src/java/org/commoncrawl/util/WarcRecordWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ class WarcRecordWriter extends RecordWriter<Text, WarcCapture> {
protected static final String X_HIDE_HEADER = "X-Crawler-";
public static final String WARC_WRITER_COUNTER_GROUP = "WARC-Writer";

protected static final Pattern STATUS_LINE_PATTERN = Pattern
.compile("^HTTP/1\\.[01] [0-9]{3}(?: .*)?$");
protected static final Pattern WS_PATTERN = Pattern.compile("\\s+");
protected static final Pattern HTTP_VERSION_PATTERN = Pattern
.compile("^HTTP/1\\.[01]$");
protected static final Pattern HTTP_STATUS_CODE_PATTERN = Pattern
.compile("^[0-9]{3}$");
protected static final String HTTP_VERSION_FALLBACK = "HTTP/1.1";

private TaskAttemptContext context;
private DataOutputStream warcOut;
private WarcWriter warcWriter;
Expand Down Expand Up @@ -291,20 +300,54 @@ public static String formatHttpHeaders(String statusLine, List<String> headers)
}

/**
* Fix the HTTP version in the status line - replace <code>HTTP/2</code>
* by <code>HTTP/1.1</code> ({@link this#HTTP_VERSION_FALLBACK}}.
*
* See also {@link #fixHttpHeaders(String, int)}
*
* @param headers
* HTTP 1.1 or 1.0 request header string, CR-LF-separated lines,
* first line is the status line
* @return safe HTTP request header
*/
public static String fixHttpRequestHeaders(String headers) {
String http2version = " HTTP/2\r\n";
int pos = headers.indexOf(http2version);
if (pos >= 0) {
StringBuilder replacement = new StringBuilder();
String statusLinePrefix = headers.substring(0, pos);
if (statusLinePrefix.indexOf(CRLF) > 0) {
// match in subsequent header lines (should not or rarely happen)
return headers;
}
replacement.append(statusLinePrefix);
replacement.append(' ');
replacement.append(HTTP_VERSION_FALLBACK);
replacement.append(CRLF);
replacement.append(headers.substring(pos + http2version.length()));
return replacement.toString();
}
return headers;
}

/**
* Modify verbatim HTTP response headers: fix, remove or replace headers
* <code>Content-Length</code>, <code>Content-Encoding</code> and
* <code>Transfer-Encoding</code> which may confuse WARC readers. Ensure that
* returned header end with a single empty line (<code>\r\n\r\n</code>).
*
* If the HTTP version in the status line is <code>HTTP/2</code>, replace it
* by <code>HTTP/1.1</code> ({@link this#HTTP_VERSION_FALLBACK}}.
*
* @param headers
* HTTP 1.1 or 1.0 response header string, CR-LF-separated lines,
* first line is status line
* first line is the status line
* @return safe HTTP response header
*/
public static String fixHttpHeaders(String headers, int contentLength) {
int start = 0, lineEnd = 0, last = 0, trailingCrLf= 0;
boolean hasContentLength = false;
StringBuilder replace = new StringBuilder();
StringBuilder replacement = new StringBuilder();
while (start < headers.length()) {
lineEnd = headers.indexOf(CRLF, start);
trailingCrLf = 1;
Expand All @@ -323,7 +366,32 @@ public static String fixHttpHeaders(String headers, int contentLength) {
boolean valid = true;
if (start == 0) {
// status line (without colon)
// TODO: http/2
final String statusLine = headers.substring(0, lineEnd);
if (!STATUS_LINE_PATTERN.matcher(statusLine).matches()) {
final String[] parts = WS_PATTERN
.split(headers.substring(0, lineEnd), 3);
if (parts.length < 2
|| !HTTP_STATUS_CODE_PATTERN.matcher(parts[1]).matches()) {
// nothing we can do here, leave status line as is
LOG.warn(
"WARC parsers may fail on non-standard HTTP 1.0 / 1.1 response status line: {}",
statusLine);
} else {
if (HTTP_VERSION_PATTERN.matcher(parts[0]).matches()) {
replacement.append(parts[0]);
} else {
replacement.append(HTTP_VERSION_FALLBACK);
}
replacement.append(' ');
replacement.append(parts[1]); // status code
replacement.append(' ');
if (parts.length == 3) {
replacement.append(parts[2]); // message
}
replacement.append(CRLF);
last = lineEnd + 2 * trailingCrLf;
}
}
} else if ((lineEnd + 4) == headers.length()
&& headers.endsWith(CRLF + CRLF)) {
// ok, trailing empty line
Expand All @@ -339,7 +407,7 @@ public static String fixHttpHeaders(String headers, int contentLength) {
}
if (!valid) {
if (last < start) {
replace.append(headers.substring(last, start));
replacement.append(headers.substring(last, start));
}
last = lineEnd + 2 * trailingCrLf;
}
Expand Down Expand Up @@ -367,18 +435,18 @@ public static String fixHttpHeaders(String headers, int contentLength) {
}
if (needsFix) {
if (last < start) {
replace.append(headers.substring(last, start));
replacement.append(headers.substring(last, start));
}
last = lineEnd + 2 * trailingCrLf;
replace.append(X_HIDE_HEADER)
replacement.append(X_HIDE_HEADER)
.append(headers.substring(start, lineEnd + 2 * trailingCrLf));
if (trailingCrLf == 0) {
replace.append(CRLF);
replacement.append(CRLF);
trailingCrLf = 1;
}
if (name.equalsIgnoreCase("content-length")) {
// add effective uncompressed and unchunked length of content
replace.append("Content-Length").append(COLONSP)
replacement.append("Content-Length").append(COLONSP)
.append(contentLength).append(CRLF);
}
}
Expand All @@ -388,17 +456,17 @@ public static String fixHttpHeaders(String headers, int contentLength) {
if (last > 0 || trailingCrLf != 2 || !hasContentLength) {
if (last < headers.length()) {
// append trailing headers
replace.append(headers.substring(last));
replacement.append(headers.substring(last));
}
if (!hasContentLength) {
replace.append("Content-Length").append(COLONSP).append(contentLength)
replacement.append("Content-Length").append(COLONSP).append(contentLength)
.append(CRLF);
}
while (trailingCrLf < 2) {
replace.append(CRLF);
replacement.append(CRLF);
trailingCrLf++;
}
return replace.toString();
return replacement.toString();
}
return headers;
}
Expand Down Expand Up @@ -558,6 +626,8 @@ public synchronized void write(Text key, WarcCapture value)
int httpStatusCode = 200;
String fetchDuration = null;
String truncatedReason = null;
String[] protocolVersions = null;
String[] cipherSuites = null;

if (value.datum != null) {
date = new Date(value.datum.getFetchTime());
Expand Down Expand Up @@ -666,6 +736,12 @@ public synchronized void write(Text key, WarcCapture value)
case Response.TRUNCATED_CONTENT_REASON:
truncatedReason = val;
break;
case Response.PROTOCOL_VERSIONS:
protocolVersions = val.split(",");
break;
case Response.CIPHER_SUITES:
cipherSuites = val.split(",");
break;
case Nutch.SEGMENT_NAME_KEY:
case Nutch.FETCH_STATUS_KEY:
case Nutch.SCORE_KEY:
Expand Down Expand Up @@ -739,7 +815,9 @@ public synchronized void write(Text key, WarcCapture value)
URI requestId = null;
if (verbatimRequestHeaders != null) {
requestId = writer.writeWarcRequestRecord(targetUri, ip, date, infoId,
verbatimRequestHeaders.getBytes(StandardCharsets.UTF_8));
protocolVersions, cipherSuites,
fixHttpRequestHeaders(verbatimRequestHeaders)
.getBytes(StandardCharsets.UTF_8));
}

if (generateCdx) {
Expand Down Expand Up @@ -804,7 +882,8 @@ public synchronized void write(Text key, WarcCapture value)
String payloadDigest = null;
writer.writeWarcRevisitRecord(targetUri, ip, httpStatusCode, date, infoId,
requestId, WarcWriter.PROFILE_REVISIT_NOT_MODIFIED, lastModifiedDate,
payloadDigest, blockDigest, responseHeaderBytes, value.content);
payloadDigest, blockDigest, protocolVersions, cipherSuites,
responseHeaderBytes, value.content);
} else {
StringBuilder responsesb = new StringBuilder(4096);
responsesb.append(responseHeaders);
Expand All @@ -822,7 +901,7 @@ public synchronized void write(Text key, WarcCapture value)
String blockDigest = getSha1DigestWithAlg(responseBytes);
URI responseId = writer.writeWarcResponseRecord(targetUri, ip,
httpStatusCode, date, infoId, requestId, payloadDigest, blockDigest,
truncatedReason, responseBytes, value.content);
truncatedReason, protocolVersions, cipherSuites, responseBytes, value.content);

// Write metadata record
StringBuilder metadatasb = new StringBuilder(4096);
Expand Down
Loading

0 comments on commit 860f269

Please sign in to comment.