Skip to content

Commit

Permalink
Add batch document handling to solr
Browse files Browse the repository at this point in the history
  • Loading branch information
anvit committed Aug 29, 2024
1 parent 4aef85a commit f0876e4
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 15 deletions.
2 changes: 1 addition & 1 deletion lib/task/search/arSolrPopulateTask.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function execute($arguments = [], $options = [])

new sfDatabaseManager($this->configuration);

$solr = new arSolrPlugin($options);
$solr = QubitSearch::getSolrInstance();

// Index by slug, if specified, or all indexable resources except those with an excluded type
//if ($options['slug']) {
Expand Down
147 changes: 133 additions & 14 deletions plugins/arSolrPlugin/lib/arSolrPlugin.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ class arSolrPlugin extends QubitSearchEngine
*/
protected $enabled = true;

/**
* This array will be used to store documents to add in a batch.
*
* @var array
*/
private $batchAddDocs = [];

/**
* This array will be used to store documents to delete in a batch.
*
* @var array
*/
private $batchDeleteDocs = [];

/**
* Constructor.
*/
Expand All @@ -53,6 +67,10 @@ public function __construct(array $options = [])

$this->client = new arSolrClient($this->config['solr']);

// Load batch mode configuration
$this->batchMode = true === $this->config['batch_mode'];
$this->batchSize = $this->config['batch_size'];

$this->initialize();
}

Expand Down Expand Up @@ -113,6 +131,55 @@ public function flush()
$this->initialize();
}

/*
* Flush batch of documents if we're in batch mode.
*
* We process additions before deletions to avoid an error due to deleting a
* document that hasn't been created yet.
*/
public function flushBatch()
{
if ($this->batchMode) {
// Batch add documents, if any
if (count($this->batchAddDocs) > 0) {
try {
$response = $this->client->addDocuments($this->batchAddDocs);

if ($response->error) {
$this->log(var_export($response->error, true));
$this->log(json_encode($this->batchAddDocs));
}
} catch (Exception $e) {
// Clear batchAddDocs if something went wrong too
$this->batchAddDocs = [];

throw $e;
}

$this->batchAddDocs = [];
}

// Batch delete documents, if any
if (count($this->batchDeleteDocs) > 0) {
try {
$response = $this->client->deleteDocuments($this->batchDeleteDocs);

if ($response->error) {
$this->log(var_export($response->error, true));
$this->log(json_encode($this->batchDeleteDocs));
}
} catch (Exception $e) {
// Clear batchDeleteDocs if something went wrong too
$this->batchDeleteDocs = [];

throw $e;
}

$this->batchDeleteDocs = [];
}
}
}

/**
* Populate index.
*
Expand Down Expand Up @@ -185,6 +252,9 @@ public function populate($options = [])
}
}

// Add the last batch of documents
$this->flushBatch();

$this->addAutoCompleteConfigs();
$this->setAnalyzers();

Expand Down Expand Up @@ -229,11 +299,56 @@ public function addDocument($data, $type)
throw new sfException('Failed to parse id field.');
}

$response = $this->client->addDocument([$type => $data]);
if ($this->batchMode) {
// Add this document to the batch add queue
$document = [
$type => $data,
];
array_push($this->batchAddDocs, $document);

// If we have a full batch, send additions and deletions in bulk
if (count($this->batchAddDocs) >= $this->batchSize) {
$this->flushBatch();
}
} else {
$response = $this->client->addDocument([$type => $data]);

if ($response->error) {
$this->log(var_export($response->error, true));
$this->log(json_encode([$type => $data]));
}
}
}

public function delete($object)
{
if (!$this->enabled) {
return;
}

if ($object instanceof QubitUser) {
return;
}

if ($this->batchMode) {
// The document being deleted may not have been added to the index yet (if it's
// still queued up in $this->batchAddDocs) so create a document object representing
// the document to be deleted and add this document object to the batch delete
// queue.
$document = $this->client->createDocumentWithId($object->id, get_class($object));

$this->batchDeleteDocs[] = $document;

if ($response->error) {
$this->log(var_export($response->error, true));
$this->log(json_encode([$type => $data]));
// If we have a full batch, send additions and deletions in bulk
if (count($this->batchDeleteDocs) >= $this->batchSize) {
$this->flushBatch();
}
} else {
try {
$this->client->deleteById($object->id, get_class($object));
} catch (Exception $e) {
// Ignore
}
}
}

Expand Down Expand Up @@ -292,32 +407,36 @@ private function addAutoCompleteFields()
'QubitAip.type.i18n.%s%.name',
];

$fields = [];
$copyFields = [];

foreach ($this->langs as $lang) {
$addFieldArr = [
$langField = [
'name' => "autocomplete_{$lang}",
'type' => "text_{$lang}",
'stored' => 'true',
'multiValued' => 'true',
];

$copyFieldsArr = [
[
'source' => 'QubitInformationObject.referenceCode',
'dest' => "autocomplete_{$lang}",
],
$refField = [
'source' => 'QubitInformationObject.referenceCode',
'dest' => "autocomplete_{$lang}",
];
$this->client->addFields($addFieldArr);

array_push($fields, $langField);
array_push($copyFields, $refField);

foreach ($autocompleteFields as $field) {
$field = str_replace('%s%', $lang, $field);
array_push($copyFieldsArr, [
array_push($copyFields, [
'source' => $field,
'dest' => "autocomplete_{$lang}",
]);
}

$this->client->addCopyFields($copyFieldsArr);
}

$this->client->addFields($fields);
$this->client->addCopyFields($copyFields);
}

private function addAutoCompleteConfigs()
Expand Down
38 changes: 38 additions & 0 deletions plugins/arSolrPlugin/lib/client/arSolrClient.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,44 @@ public function addDocument($document)
return makeHttpRequest($url, 'POST', json_encode($document));
}

public function addDocuments($documents)
{
$url = "{$this->config['api_url']}/solr/{$this->config['collection']}/update/json/docs";

return makeHttpRequest($url, 'POST', json_encode($documents));
}

public function deleteDocuments($documents)
{
$url = "{$this->config['api_url']}/solr/{$this->config['collection']}/update";

return makeHttpRequest($url, 'POST', json_encode([
'delete' => $documents,
]));
}

public function deleteById($id, $type)
{
$document = $this->createDocumentWithId($id, $type);

return $this->deleteDocuments($document);
}

public function deleteByQuery($query)
{
$queryParams = $query->getQueryParams();

// Ignore offset, size, and additional params when deleting by query
return $this->deleteDocuments([
'query' => $queryParams['query'],
]);
}

public function createDocumentWithId($id, $type)
{
return ["{$type}.id" => $id];
}

public function getCollections()
{
$url = "{$this->config['api_url']}/solr/admin/collections?action=LIST";
Expand Down

0 comments on commit f0876e4

Please sign in to comment.