Skip to content

Commit afd19a3

Browse files
authored
Merge pull request #328 from marklogic/release/2.4.2
Merge 2.4.2 into master
2 parents 7ff9e0f + 8454c58 commit afd19a3

File tree

22 files changed

+306
-34
lines changed

22 files changed

+306
-34
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ plugins {
99
}
1010

1111
group 'com.marklogic'
12-
version '2.4.1'
12+
version '2.4.2'
1313

1414
java {
1515
// To support reading RDF files, Apache Jena is used - but that requires Java 11.

docs/getting-started/jupyter.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@ connector and also to initialize Spark:
3232

3333
```
3434
import os
35-
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars "/path/to/marklogic-spark-connector-2.4.1.jar" pyspark-shell'
35+
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars "/path/to/marklogic-spark-connector-2.4.2.jar" pyspark-shell'
3636
3737
from pyspark.sql import SparkSession
3838
spark = SparkSession.builder.master("local[*]").appName('My Notebook').getOrCreate()
3939
spark.sparkContext.setLogLevel("WARN")
4040
spark
4141
```
4242

43-
The path of `/path/to/marklogic-spark-connector-2.4.1.jar` should be changed to match the location of the connector
43+
The path of `/path/to/marklogic-spark-connector-2.4.2.jar` should be changed to match the location of the connector
4444
jar on your filesystem. You are free to customize the `spark` variable in any manner you see fit as well.
4545

4646
Now that you have an initialized Spark session, you can run any of the examples found in the

docs/getting-started/pyspark.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ shell by pressing `ctrl-D`.
2929

3030
Run PySpark from the directory that you downloaded the connector to per the [setup instructions](setup.md):
3131

32-
pyspark --jars marklogic-spark-connector-2.4.1.jar
32+
pyspark --jars marklogic-spark-connector-2.4.2.jar
3333

3434
The `--jars` command line option is PySpark's method for utilizing Spark connectors. Each Spark environment should have
3535
a similar mechanism for including third party connectors; please see the documentation for your particular Spark

docs/getting-started/setup.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ have an instance of MarkLogic running, you can skip step 4 below, but ensure tha
3131
extracted directory contains valid connection properties for your instance of MarkLogic.
3232

3333
1. From [this repository's Releases page](https://github.com/marklogic/marklogic-spark-connector/releases), select
34-
the latest release and download the `marklogic-spark-getting-started-2.4.1.zip` file.
34+
the latest release and download the `marklogic-spark-getting-started-2.4.2.zip` file.
3535
2. Extract the contents of the downloaded zip file.
3636
3. Open a terminal window and go to the directory created by extracting the zip file; the directory should have a
37-
name of "marklogic-spark-getting-started-2.4.1".
37+
name of "marklogic-spark-getting-started-2.4.2".
3838
4. Run `docker-compose up -d` to start an instance of MarkLogic
3939
5. Ensure that the `./gradlew` file is executable; depending on your operating system, you may need to run
4040
`chmod 755 gradlew` to make the file executable.

examples/entity-aggregation/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ repositories {
88

99
dependencies {
1010
implementation 'org.apache.spark:spark-sql_2.12:3.5.3'
11-
implementation "com.marklogic:marklogic-spark-connector:2.4.1"
11+
implementation "com.marklogic:marklogic-spark-connector:2.4.2"
1212
implementation "org.postgresql:postgresql:42.6.2"
1313
}
1414

examples/getting-started/marklogic-spark-getting-started.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"source": [
1010
"# Make the MarkLogic connector available to the underlying PySpark application.\n",
1111
"import os\n",
12-
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars \"marklogic-spark-connector-2.4.1.jar\" pyspark-shell'\n",
12+
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars \"marklogic-spark-connector-2.4.2.jar\" pyspark-shell'\n",
1313
"\n",
1414
"# Define the connection details for the getting-started example application.\n",
1515
"client_uri = \"spark-example-user:password@localhost:8003\"\n",

examples/java-dependency/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ repositories {
88

99
dependencies {
1010
implementation 'org.apache.spark:spark-sql_2.12:3.5.3'
11-
implementation 'com.marklogic:marklogic-spark-connector:2.4.1'
11+
implementation 'com.marklogic:marklogic-spark-connector:2.4.2'
1212
}
1313

1414
task runApp(type: JavaExec) {

src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ public final String getStringOption(String option) {
145145
return hasOption(option) ? properties.get(option).trim() : null;
146146
}
147147

148+
public final boolean getBooleanOption(String option, boolean defaultValue) {
149+
return hasOption(option) ? Boolean.parseBoolean(getStringOption(option)) : defaultValue;
150+
}
151+
148152
public final boolean isStreamingFiles() {
149153
return "true".equalsIgnoreCase(getStringOption(Options.STREAM_FILES));
150154
}

src/main/java/com/marklogic/spark/Options.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public abstract class Options {
5959
public static final String READ_TRIPLES_FILTERED = "spark.marklogic.read.triples.filtered";
6060
public static final String READ_TRIPLES_BASE_IRI = "spark.marklogic.read.triples.baseIri";
6161

62+
/**
63+
* The connector uses a consistent snapshot by default. Setting this to false results in queries being executed
64+
* at multiple points of time, potentially yielding inconsistent results.
65+
*
66+
* @since 2.4.2
67+
*/
68+
public static final String READ_SNAPSHOT = "spark.marklogic.read.snapshot";
69+
6270
// For logging progress when reading documents, rows, or items via custom code. Defines the interval at which
6371
// progress should be logged - e.g. a value of 10,000 will result in a message being logged on every 10,000 items
6472
// being written/processed.

src/main/java/com/marklogic/spark/reader/document/DocumentContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ int getPartitionsPerForest() {
9999
return (int) getNumericOption(Options.READ_DOCUMENTS_PARTITIONS_PER_FOREST, defaultPartitionsPerForest, 1);
100100
}
101101

102+
boolean isConsistentSnapshot() {
103+
// Starting in 2.2.0 and through 2.4.2, the default is a consistent snapshot. We may change this later.
104+
return getBooleanOption(Options.READ_SNAPSHOT, true);
105+
}
106+
102107
void setLimit(Integer limit) {
103108
this.limit = limit;
104109
}

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,16 @@ class ForestReader implements PartitionReader<InternalRow> {
6161
context.connectToMarkLogic(forestPartition.getHost()) :
6262
context.connectToMarkLogic();
6363

64+
final boolean filtered = context.getBooleanOption(Options.READ_DOCUMENTS_FILTERED, false);
65+
final boolean consistentSnapshot = context.isConsistentSnapshot();
66+
6467
if (logger.isDebugEnabled()) {
65-
logger.debug("Will read from host {} for partition: {}", client.getHost(), forestPartition);
68+
logger.debug("Will read from host {} for partition: {}; filtered: {}; consistent snapshot: {}",
69+
client.getHost(), forestPartition, filtered, consistentSnapshot);
6670
}
6771

6872
SearchQueryDefinition query = context.buildSearchQuery(client);
69-
boolean filtered = false;
70-
if (context.hasOption(Options.READ_DOCUMENTS_FILTERED)) {
71-
filtered = Boolean.parseBoolean(context.getProperties().get(Options.READ_DOCUMENTS_FILTERED));
72-
}
73-
this.uriBatcher = new UriBatcher(client, query, forestPartition, context.getBatchSize(), filtered);
73+
this.uriBatcher = new UriBatcher(client, query, forestPartition, context.getBatchSize(), filtered, consistentSnapshot);
7474

7575
this.documentManager = client.newDocumentManager();
7676
this.documentManager.setReadTransform(query.getResponseTransform());

src/main/java/com/marklogic/spark/reader/document/OpticTriplesReader.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
1616
import org.apache.spark.sql.connector.read.PartitionReader;
1717
import org.apache.spark.unsafe.types.UTF8String;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
1820

1921
import java.io.IOException;
2022
import java.net.URI;
@@ -27,6 +29,8 @@
2729
*/
2830
class OpticTriplesReader implements PartitionReader<InternalRow> {
2931

32+
private static final Logger logger = LoggerFactory.getLogger(OpticTriplesReader.class);
33+
3034
private static final String DATATYPE_COLUMN = "datatype";
3135
private static final String GRAPH_COLUMN = "graph";
3236
private static final String OBJECT_COLUMN = "object";
@@ -54,12 +58,15 @@ public OpticTriplesReader(ForestPartition forestPartition, DocumentContext conte
5458
this.op = this.rowManager.newPlanBuilder();
5559

5660
final SearchQueryDefinition query = context.buildTriplesSearchQuery(this.databaseClient);
57-
boolean filtered = false;
58-
if (context.hasOption(Options.READ_TRIPLES_FILTERED)) {
59-
filtered = Boolean.parseBoolean(context.getProperties().get(Options.READ_TRIPLES_FILTERED));
61+
final boolean filtered = context.getBooleanOption(Options.READ_TRIPLES_FILTERED, false);
62+
final boolean consistentSnapshot = context.isConsistentSnapshot();
63+
64+
if (logger.isDebugEnabled()) {
65+
logger.debug("Will read from host {} for partition: {}; filtered: {}; consistent snapshot: {}",
66+
databaseClient.getHost(), forestPartition, filtered, consistentSnapshot);
6067
}
61-
this.uriBatcher = new UriBatcher(this.databaseClient, query, forestPartition, context.getBatchSize(), filtered);
6268

69+
this.uriBatcher = new UriBatcher(this.databaseClient, query, forestPartition, context.getBatchSize(), filtered, consistentSnapshot);
6370
this.batchSize = context.getBatchSize();
6471
}
6572

src/main/java/com/marklogic/spark/reader/document/UriBatcher.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ class UriBatcher {
2525
private final ForestPartition partition;
2626
private final int pageLength;
2727
private final boolean filtered;
28+
private final boolean useConsistentSnapshot;
2829

2930
// These change as batches of URIs are retrieved.
3031
private String lastUri;
3132
private long offsetStart = 1;
3233

3334

34-
UriBatcher(DatabaseClient client, SearchQueryDefinition query, ForestPartition partition, int pageLength, boolean filtered) {
35+
UriBatcher(DatabaseClient client, SearchQueryDefinition query, ForestPartition partition, int pageLength,
36+
boolean filtered, boolean useConsistentSnapshot) {
3537
this.client = client;
3638
this.queryManager = (QueryManagerImpl) this.client.newQueryManager();
3739
this.queryManager.setPageLength(pageLength);
@@ -40,6 +42,7 @@ class UriBatcher {
4042
this.offsetStart = this.partition.getOffsetStart();
4143
this.pageLength = pageLength;
4244
this.filtered = filtered;
45+
this.useConsistentSnapshot = useConsistentSnapshot;
4346
}
4447

4548
/**
@@ -53,7 +56,9 @@ List<String> nextBatchOfUris() {
5356
}
5457

5558
UrisHandle urisHandle = new UrisHandle();
56-
urisHandle.setPointInTimeQueryTimestamp(partition.getServerTimestamp());
59+
if (useConsistentSnapshot) {
60+
urisHandle.setPointInTimeQueryTimestamp(partition.getServerTimestamp());
61+
}
5762

5863
// If we have an offsetEnd, the page length is adjusted to ensure we do not go past offsetEnd.
5964
if (partition.getOffsetEnd() != null && (this.offsetStart + this.pageLength > partition.getOffsetEnd())) {

src/main/java/com/marklogic/spark/reader/file/FileContext.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
import org.apache.hadoop.fs.Path;
1313
import org.apache.spark.util.SerializableConfiguration;
1414

15-
import java.io.IOException;
16-
import java.io.InputStream;
17-
import java.io.Serializable;
18-
import java.io.UnsupportedEncodingException;
15+
import java.io.*;
1916
import java.net.URLDecoder;
2017
import java.nio.charset.Charset;
2118
import java.nio.charset.UnsupportedCharsetException;
@@ -49,22 +46,36 @@ public boolean isGzip() {
4946
}
5047

5148
public InputStream openFile(String filePath) {
49+
return openFile(filePath, false);
50+
}
51+
52+
public InputStream openFile(String filePath, boolean guessIfGzipped) {
5253
try {
5354
Path hadoopPath = new Path(filePath);
5455
FileSystem fileSystem = hadoopPath.getFileSystem(hadoopConfiguration.value());
5556
FSDataInputStream inputStream = fileSystem.open(hadoopPath);
56-
return this.isGzip() ? new GZIPInputStream(inputStream) : inputStream;
57+
return isFileGzipped(filePath, guessIfGzipped) ? new GZIPInputStream(inputStream) : inputStream;
5758
} catch (Exception e) {
5859
throw new ConnectorException(String.format(
5960
"Unable to read file at %s; cause: %s", filePath, e.getMessage()), e);
6061
}
6162
}
6263

63-
public boolean isReadAbortOnFailure() {
64-
if (hasOption(Options.READ_FILES_ABORT_ON_FAILURE)) {
65-
return Boolean.parseBoolean(getStringOption(Options.READ_FILES_ABORT_ON_FAILURE));
64+
BufferedReader openFileReader(String filePath, boolean guessIfGzipped) {
65+
try {
66+
InputStream inputStream = openFile(filePath, guessIfGzipped);
67+
InputStreamReader inputStreamReader = this.encoding != null ?
68+
new InputStreamReader(inputStream, encoding) :
69+
new InputStreamReader(inputStream);
70+
return new BufferedReader(inputStreamReader);
71+
} catch (Exception e) {
72+
throw new ConnectorException(String.format(
73+
"Unable to read file at %s; cause: %s", filePath, e.getMessage()), e);
6674
}
67-
return true;
75+
}
76+
77+
public boolean isReadAbortOnFailure() {
78+
return getBooleanOption(Options.READ_FILES_ABORT_ON_FAILURE, true);
6879
}
6980

7081
byte[] readBytes(InputStream inputStream) throws IOException {
@@ -85,4 +96,11 @@ public String decodeFilePath(String path) {
8596
return path;
8697
}
8798
}
99+
100+
private boolean isFileGzipped(String filePath, boolean guessIfGzipped) {
101+
if (this.isGzip()) {
102+
return true;
103+
}
104+
return guessIfGzipped && filePath != null && (filePath.endsWith(".gz") || filePath.endsWith(".gzip"));
105+
}
88106
}

src/main/java/com/marklogic/spark/reader/file/FilePartitionReaderFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public PartitionReader<InternalRow> createReader(InputPartition partition) {
3535
return new MlcpArchiveFileReader(filePartition, fileContext);
3636
} else if ("archive".equalsIgnoreCase(fileType)) {
3737
return new ArchiveFileReader(filePartition, fileContext);
38+
} else if ("json_lines".equalsIgnoreCase(fileType)) {
39+
return new JsonLinesFileReader(filePartition, fileContext);
3840
} else if (fileContext.hasOption(Options.READ_AGGREGATES_XML_ELEMENT)) {
3941
return fileContext.isZip() ?
4042
new ZipAggregateXmlFileReader(filePartition, fileContext) :
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.reader.file;
5+
6+
import org.apache.commons.io.IOUtils;
7+
import org.apache.spark.sql.catalyst.InternalRow;
8+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
9+
import org.apache.spark.sql.connector.read.PartitionReader;
10+
import org.apache.spark.unsafe.types.ByteArray;
11+
import org.apache.spark.unsafe.types.UTF8String;
12+
13+
import java.io.BufferedReader;
14+
import java.util.Iterator;
15+
16+
class JsonLinesFileReader implements PartitionReader<InternalRow> {
17+
18+
private final FilePartition filePartition;
19+
private final FileContext fileContext;
20+
21+
private BufferedReader bufferedReader;
22+
private Iterator<String> bufferedLines;
23+
24+
private InternalRow nextRowToReturn;
25+
private String currentFilePath;
26+
private int lineCounter;
27+
private int filePathIndex;
28+
29+
JsonLinesFileReader(FilePartition filePartition, FileContext fileContext) {
30+
this.filePartition = filePartition;
31+
this.fileContext = fileContext;
32+
}
33+
34+
@Override
35+
public boolean next() {
36+
if (bufferedLines != null && bufferedLines.hasNext()) {
37+
this.nextRowToReturn = createRowFromNextJsonLine();
38+
return true;
39+
}
40+
41+
if (bufferedReader != null) {
42+
IOUtils.closeQuietly(bufferedReader);
43+
}
44+
45+
if (filePathIndex >= filePartition.getPaths().size()) {
46+
return false;
47+
}
48+
49+
openNextFile();
50+
return next();
51+
}
52+
53+
@Override
54+
public InternalRow get() {
55+
return nextRowToReturn;
56+
}
57+
58+
@Override
59+
public void close() {
60+
IOUtils.closeQuietly(bufferedReader);
61+
}
62+
63+
private void openNextFile() {
64+
final String originalFilePath = filePartition.getPaths().get(filePathIndex);
65+
this.currentFilePath = fileContext.decodeFilePath(originalFilePath);
66+
this.lineCounter = 1;
67+
this.filePathIndex++;
68+
// To mimic the behavior of the Spark JSON data source, this will guess if the file is gzipped based on its
69+
// file extension. This allows for .gz/.gzip files to be supported without the user having to specify the
70+
// compression option, which is the same behavior as Spark JSON provides.
71+
this.bufferedReader = fileContext.openFileReader(currentFilePath, true);
72+
this.bufferedLines = bufferedReader.lines().iterator();
73+
}
74+
75+
private InternalRow createRowFromNextJsonLine() {
76+
String line = bufferedLines.next();
77+
String uri = String.format("%s-%d.json", UTF8String.fromString(currentFilePath), lineCounter);
78+
lineCounter++;
79+
return new GenericInternalRow(new Object[]{
80+
UTF8String.fromString(uri),
81+
ByteArray.concat(line.getBytes()),
82+
null, null, null, null, null, null
83+
});
84+
}
85+
}

src/test/java/com/marklogic/spark/reader/document/ReadDocumentRowsTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,20 @@ void readByCollection() {
4040
assertEquals("Vivianne", doc.get("ForeName").asText());
4141
}
4242

43+
@Test
44+
void dirtyRead() {
45+
Dataset<Row> rows = startRead()
46+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "author")
47+
.option(Options.READ_SNAPSHOT, false)
48+
.load();
49+
50+
assertEquals(15, rows.count(), "This test only verifies that the snapshot option can be set to false. " +
51+
"We don't yet have a way to verify that the query doesn't use a consistent snapshot, which would entail " +
52+
"forcing the read to pause while an update and merge are performed in the database. Verifying the " +
53+
"difference between a consistent snapshot and a dirty read will need to be done manually, including " +
54+
"by inspecting the debug logs generated by this test.");
55+
}
56+
4357
@Test
4458
void logProgress() {
4559
newWriter().save();

0 commit comments

Comments
 (0)