Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LC-395: Address error handling when SolrIndexer sends batch of documents to multiple collections #73

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.eclipse.jetty.util.MultiException;
Copy link
Contributor

@rseitz rseitz Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.eclipse.jetty:jetty-util is a transitive dependency of solrj...
Should this be referenced explicitly in the lucille-core pom, or just leave it as a transitive dependency?
Seems like we might want to move the SolrIndexer to its own module at some point, in which case we'd have to deal with transitive dependencies like this that we're relying on without declaring... Thoughts on a general policy for this kind of situation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, we should declare dependencies that we directly use explicitly and not rely on transitive dependencies from a 3rd party. That said I actually think we can just have our own exception for this (which might be refactored into a batch response / batchlet given other work).

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Signal;
Expand Down Expand Up @@ -110,6 +111,10 @@ public void closeConnection() {

@Override
protected void sendToIndex(List<Document> documents) throws Exception {
//when sending updates/deletes to separate collections we do not want an error sending to a particular collection to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super-minor, but whitespace after the // would be in keeping with the convention

//prevent us from sending to other collections in the batch. collect any errors that occur in this multiexception and
//then use it to throw the error(s) at the end.
MultiException errorsSending = new MultiException();

if (solrClient == null) {
log.debug("sendToSolr bypassed for documents: " + documents);
Expand Down Expand Up @@ -146,7 +151,12 @@ protected void sendToIndex(List<Document> documents) throws Exception {
&& doc.has(deleteByFieldField)
&& deleteByFieldValue != null
&& doc.has(deleteByFieldValue))) {
sendAddUpdateBatch(collection, solrDocRequests.getAddUpdateDocs());
try {
sendAddUpdateBatch(collection, solrDocRequests.getAddUpdateDocs());
} catch (Exception e) {
errorsSending.add(e);
}

solrDocRequests.resetAddUpdates();
}

Expand All @@ -164,22 +174,35 @@ protected void sendToIndex(List<Document> documents) throws Exception {
SolrInputDocument solrDoc = toSolrDoc(doc, idOverride, collection);

// if the delete requests contain the ID of this document, send the deletes immediately so
// the delete is
// processed before this document.
// the delete is processed before this document.

if (solrDocRequests.containsIdForDeletion(solrId)
|| solrDocRequests.containsAnyDeleteByField()) {
sendDeletionBatch(collection, solrDocRequests);
try {
sendDeletionBatch(collection, solrDocRequests);
} catch (Exception e) {
errorsSending.add(e);
}

solrDocRequests.resetDeletes();
}
solrDocRequests.addDocForAddUpdate(solrDoc);
}
}
for (String collection : solrDocRequestsByCollection.keySet()) {
sendAddUpdateBatch(
collection, solrDocRequestsByCollection.get(collection).getAddUpdateDocs());
sendDeletionBatch(collection, solrDocRequestsByCollection.get(collection));
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, this PR has a bigger consequence than just making sure that failures against one collection don't prevent us from sending to different collections...

it also means that if we have an certain doc ID and there's an update followed by a delete, we might get a failure from the update but still go on to try the delete. Likewise, if there's a delete followed by an update, we might get a failure from the delete, but still go on to try the update. Thinking about this, maybe this new behavior is fine. The important thing is that we never swap an update and a delete, and that's not happening here. But, I wanted to leave this comment just to bring attention to the change -- is this something you considered and also concluded was OK? A code comment recording our conclusion might be helpful for the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not consider this until you mentioned it. However, I do think we are doing the "least worse" thing here. I think we have to consider this situation in any retry behavior. We need to make sure that any indexer retry behavior that we implement does not mess with the order.

sendAddUpdateBatch(
collection, solrDocRequestsByCollection.get(collection).getAddUpdateDocs());
} catch (Exception e) {
errorsSending.add(e);
}
try {
sendDeletionBatch(collection, solrDocRequestsByCollection.get(collection));
} catch (Exception e) {
errorsSending.add(e);
}
}
errorsSending.ifExceptionThrow();
}

private void sendAddUpdateBatch(String collection, List<SolrInputDocument> solrDocs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.embedded.JettySolrRunner;
import org.eclipse.jetty.util.MultiException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -42,6 +44,7 @@ public class SolrIndexerIntegrationTest extends SolrCloudTestCase {
private static MiniSolrCloudCluster cluster;

private static final String COL = "test";
private static final String COL_OTHER = "test_other";

@BeforeClass
public static void setupCluster() throws Exception {
Expand Down Expand Up @@ -339,6 +342,86 @@ public void testValidateConnection() throws SolrServerException, IOException {
verify(mockCloudHttp2Client).request(Mockito.any(CollectionAdminRequest.ClusterStatus.class));
}

@Test
public void testAddToTwoCollectionsWhenOneDoesNotExist() throws SolrServerException, IOException {

JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);

Map<String, Object> map = new HashMap<>();
map.put("solr.url", Arrays.asList(jetty.getBaseUrl().toString()));
map.put("solr.useCloudClient", true);
map.put("indexer.batchTimeout", 5000);
map.put("indexer.batchSize", 2);
map.put("indexer.type", "solr");
map.put("indexer.sendEnabled", true);
map.put("indexer.indexOverrideField", "collection");

Config config = ConfigFactory.parseMap(map);

IndexerMessenger mockIndexerMessenger = mock(IndexerMessenger.class);
final SolrIndexer indexer = new SolrIndexer(config, mockIndexerMessenger, false, "solr");

try {
assertTrue(indexer.validateConnection());

SolrQuery qr = new SolrQuery();
qr.set("q", "*:*");

assertEquals(
"The index should be empty.",
0,
cluster.getSolrClient().query(COL, qr).getResults().size());

// Send one document to the test collection.
Document doc1 = Document.create("id_1");
doc1.setField("foo", "bar");
doc1.setField("collection", COL);

// Send one document to a collection that doesn't exist.
Document doc2 = Document.create("id_2");
doc2.setField("foo", "baz");
doc2.setField("collection", "this-collection-does-not-exist");

// Send a second document to the test collection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// send a second document to a different collection that doesn't exist

Document doc3 = Document.create("id_3");
doc3.setField("foo", "bar");
doc3.setField("collection", "this-collection-also-does-not-exist");

Throwable exception = assertThrows(SolrException.class, () -> indexer.sendToIndex(List.of(doc1, doc2)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come we are sending doc1 and doc2 but NOT doc3 here?


cluster.getSolrClient().commit(COL);
QueryResponse response = cluster.getSolrClient().query(COL, qr);
assertEquals(
"The test document should have been indexed to the new collection",
1,
response.getResults().size());

cluster.getSolrClient().deleteByQuery(COL, "*:*");
cluster.getSolrClient().commit(COL);

assertEquals(
"The index should be empty.",
0,
cluster.getSolrClient().query(COL, qr).getResults().size());

//run the same test again with docs targeting both collections that do not exist with doc1 last.

exception = assertThrows(MultiException.class, () -> indexer.sendToIndex(List.of(doc3, doc2, doc1)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would there be any added testing value in putting doc1 in the middle?


cluster.getSolrClient().commit(COL);
response = cluster.getSolrClient().query(COL, qr);
assertEquals(
"The test document should have been indexed to the new collection",
1,
response.getResults().size());

} finally {
if (indexer != null) {
indexer.closeConnection();
}
}
}

@AfterClass
public static void tearDownClass() throws Exception {
if (cluster != null) {
Expand Down
Loading