-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LC-395: Address error handling when SolrIndexer sends batch of documents to multiple collections #73
base: main
Are you sure you want to change the base?
LC-395: Address error handling when SolrIndexer sends batch of documents to multiple collections #73
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import org.apache.solr.common.SolrInputDocument; | ||
import org.apache.solr.common.util.NamedList; | ||
import org.apache.solr.common.util.SimpleOrderedMap; | ||
import org.eclipse.jetty.util.MultiException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import sun.misc.Signal; | ||
|
@@ -110,6 +111,10 @@ public void closeConnection() { | |
|
||
@Override | ||
protected void sendToIndex(List<Document> documents) throws Exception { | ||
//when sending updates/deletes to separate collections we do not want an error sending to a particular collection to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. super-minor, but whitespace after the // would be in keeping with the convention |
||
//prevent us from sending to other collections in the batch. collect any errors that occur in this multiexception and | ||
//then use it to throw the error(s) at the end. | ||
MultiException errorsSending = new MultiException(); | ||
|
||
if (solrClient == null) { | ||
log.debug("sendToSolr bypassed for documents: " + documents); | ||
|
@@ -146,7 +151,12 @@ protected void sendToIndex(List<Document> documents) throws Exception { | |
&& doc.has(deleteByFieldField) | ||
&& deleteByFieldValue != null | ||
&& doc.has(deleteByFieldValue))) { | ||
sendAddUpdateBatch(collection, solrDocRequests.getAddUpdateDocs()); | ||
try { | ||
sendAddUpdateBatch(collection, solrDocRequests.getAddUpdateDocs()); | ||
} catch (Exception e) { | ||
errorsSending.add(e); | ||
} | ||
|
||
solrDocRequests.resetAddUpdates(); | ||
} | ||
|
||
|
@@ -164,22 +174,35 @@ protected void sendToIndex(List<Document> documents) throws Exception { | |
SolrInputDocument solrDoc = toSolrDoc(doc, idOverride, collection); | ||
|
||
// if the delete requests contain the ID of this document, send the deletes immediately so | ||
// the delete is | ||
// processed before this document. | ||
// the delete is processed before this document. | ||
|
||
if (solrDocRequests.containsIdForDeletion(solrId) | ||
|| solrDocRequests.containsAnyDeleteByField()) { | ||
sendDeletionBatch(collection, solrDocRequests); | ||
try { | ||
sendDeletionBatch(collection, solrDocRequests); | ||
} catch (Exception e) { | ||
errorsSending.add(e); | ||
} | ||
|
||
solrDocRequests.resetDeletes(); | ||
} | ||
solrDocRequests.addDocForAddUpdate(solrDoc); | ||
} | ||
} | ||
for (String collection : solrDocRequestsByCollection.keySet()) { | ||
sendAddUpdateBatch( | ||
collection, solrDocRequestsByCollection.get(collection).getAddUpdateDocs()); | ||
sendDeletionBatch(collection, solrDocRequestsByCollection.get(collection)); | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so, this PR has a bigger consequence than just making sure that failures against one collection don't prevent us from sending to different collections... it also means that if we have an certain doc ID and there's an update followed by a delete, we might get a failure from the update but still go on to try the delete. Likewise, if there's a delete followed by an update, we might get a failure from the delete, but still go on to try the update. Thinking about this, maybe this new behavior is fine. The important thing is that we never swap an update and a delete, and that's not happening here. But, I wanted to leave this comment just to bring attention to the change -- is this something you considered and also concluded was OK? A code comment recording our conclusion might be helpful for the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did not consider this until you mentioned it. However, I do think we are doing the "least worse" thing here. I think we have to consider this situation in any retry behavior. We need to make sure that any indexer retry behavior that we implement does not mess with the order. |
||
sendAddUpdateBatch( | ||
collection, solrDocRequestsByCollection.get(collection).getAddUpdateDocs()); | ||
} catch (Exception e) { | ||
errorsSending.add(e); | ||
} | ||
try { | ||
sendDeletionBatch(collection, solrDocRequestsByCollection.get(collection)); | ||
} catch (Exception e) { | ||
errorsSending.add(e); | ||
} | ||
} | ||
errorsSending.ifExceptionThrow(); | ||
} | ||
|
||
private void sendAddUpdateBatch(String collection, List<SolrInputDocument> solrDocs) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,8 +24,10 @@ | |
import org.apache.solr.client.solrj.response.QueryResponse; | ||
import org.apache.solr.cloud.MiniSolrCloudCluster; | ||
import org.apache.solr.cloud.SolrCloudTestCase; | ||
import org.apache.solr.common.SolrException; | ||
import org.apache.solr.common.SolrInputDocument; | ||
import org.apache.solr.embedded.JettySolrRunner; | ||
import org.eclipse.jetty.util.MultiException; | ||
import org.junit.AfterClass; | ||
import org.junit.Before; | ||
import org.junit.BeforeClass; | ||
|
@@ -42,6 +44,7 @@ public class SolrIndexerIntegrationTest extends SolrCloudTestCase { | |
private static MiniSolrCloudCluster cluster; | ||
|
||
private static final String COL = "test"; | ||
private static final String COL_OTHER = "test_other"; | ||
|
||
@BeforeClass | ||
public static void setupCluster() throws Exception { | ||
|
@@ -339,6 +342,86 @@ public void testValidateConnection() throws SolrServerException, IOException { | |
verify(mockCloudHttp2Client).request(Mockito.any(CollectionAdminRequest.ClusterStatus.class)); | ||
} | ||
|
||
@Test | ||
public void testAddToTwoCollectionsWhenOneDoesNotExist() throws SolrServerException, IOException { | ||
|
||
JettySolrRunner jetty = cluster.getJettySolrRunners().get(0); | ||
|
||
Map<String, Object> map = new HashMap<>(); | ||
map.put("solr.url", Arrays.asList(jetty.getBaseUrl().toString())); | ||
map.put("solr.useCloudClient", true); | ||
map.put("indexer.batchTimeout", 5000); | ||
map.put("indexer.batchSize", 2); | ||
map.put("indexer.type", "solr"); | ||
map.put("indexer.sendEnabled", true); | ||
map.put("indexer.indexOverrideField", "collection"); | ||
|
||
Config config = ConfigFactory.parseMap(map); | ||
|
||
IndexerMessenger mockIndexerMessenger = mock(IndexerMessenger.class); | ||
final SolrIndexer indexer = new SolrIndexer(config, mockIndexerMessenger, false, "solr"); | ||
|
||
try { | ||
assertTrue(indexer.validateConnection()); | ||
|
||
SolrQuery qr = new SolrQuery(); | ||
qr.set("q", "*:*"); | ||
|
||
assertEquals( | ||
"The index should be empty.", | ||
0, | ||
cluster.getSolrClient().query(COL, qr).getResults().size()); | ||
|
||
// Send one document to the test collection. | ||
Document doc1 = Document.create("id_1"); | ||
doc1.setField("foo", "bar"); | ||
doc1.setField("collection", COL); | ||
|
||
// Send one document to a collection that doesn't exist. | ||
Document doc2 = Document.create("id_2"); | ||
doc2.setField("foo", "baz"); | ||
doc2.setField("collection", "this-collection-does-not-exist"); | ||
|
||
// Send a second document to the test collection. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. // send a second document to a different collection that doesn't exist |
||
Document doc3 = Document.create("id_3"); | ||
doc3.setField("foo", "bar"); | ||
doc3.setField("collection", "this-collection-also-does-not-exist"); | ||
|
||
Throwable exception = assertThrows(SolrException.class, () -> indexer.sendToIndex(List.of(doc1, doc2))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how come we are sending doc1 and doc2 but NOT doc3 here? |
||
|
||
cluster.getSolrClient().commit(COL); | ||
QueryResponse response = cluster.getSolrClient().query(COL, qr); | ||
assertEquals( | ||
"The test document should have been indexed to the new collection", | ||
1, | ||
response.getResults().size()); | ||
|
||
cluster.getSolrClient().deleteByQuery(COL, "*:*"); | ||
cluster.getSolrClient().commit(COL); | ||
|
||
assertEquals( | ||
"The index should be empty.", | ||
0, | ||
cluster.getSolrClient().query(COL, qr).getResults().size()); | ||
|
||
//run the same test again with docs targeting both collections that do not exist with doc1 last. | ||
|
||
exception = assertThrows(MultiException.class, () -> indexer.sendToIndex(List.of(doc3, doc2, doc1))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would there be any added testing value in putting doc1 in the middle? |
||
|
||
cluster.getSolrClient().commit(COL); | ||
response = cluster.getSolrClient().query(COL, qr); | ||
assertEquals( | ||
"The test document should have been indexed to the new collection", | ||
1, | ||
response.getResults().size()); | ||
|
||
} finally { | ||
if (indexer != null) { | ||
indexer.closeConnection(); | ||
} | ||
} | ||
} | ||
|
||
@AfterClass | ||
public static void tearDownClass() throws Exception { | ||
if (cluster != null) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.eclipse.jetty:jetty-util is a transitive dependency of solrj...
Should this be referenced explicitly in the lucille-core pom, or just leave it as a transitive dependency?
Seems like we might want to move the SolrIndexer to its own module at some point, in which case we'd have to deal with transitive dependencies like this that we're relying on without declaring... Thoughts on a general policy for this kind of situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right, we should declare dependencies that we directly use explicitly and not rely on transitive dependencies from a 3rd party. That said I actually think we can just have our own exception for this (which might be refactored into a batch response / batchlet given other work).