Skip to content
This repository has been archived by the owner on Jan 21, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1699 from tomaszdurka/fix-elasticsearch-implemnta…
Browse files Browse the repository at this point in the history
…tion

Rework create, update, delete indexes for Elasticsearch
  • Loading branch information
tomaszdurka committed Mar 19, 2015
2 parents 47f0065 + 32e6bdb commit 23f2cdc
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 99 deletions.
21 changes: 21 additions & 0 deletions library/CM/Elasticsearch/Cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,27 @@ public function getEnabled() {
return $this->_enabled;
}

/**
* @return CM_Elasticsearch_Type_Abstract[]
*/
public function getTypes() {
$types = CM_Util::getClassChildren('CM_Elasticsearch_Type_Abstract');
return \Functional\map($types, function ($className) {
return new $className($this->getRandomClient());
});
}

/**
* @param string $indexName
* @return CM_Elasticsearch_Type_Abstract
* @throws CM_Exception_Invalid
*/
public function findType($indexName) {
return \Functional\first($this->getTypes(), function (CM_Elasticsearch_Type_Abstract $type) use ($indexName) {
return $type->getIndex()->getName() === $indexName;
});
}

/**
* @param CM_Elasticsearch_Type_Abstract[] $types
* @param array|null $data
Expand Down
109 changes: 29 additions & 80 deletions library/CM/Elasticsearch/Index/Cli.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,70 +7,36 @@ class CM_Elasticsearch_Index_Cli extends CM_Cli_Runnable_Abstract {
* @param bool|null $skipIfExist
*/
public function create($indexName = null, $skipIfExist = null) {
if (null !== $indexName) {
$indexes = array($this->_getIndex($indexName));
} else {
$indexes = $this->_getIndexes();
}
foreach ($indexes as $index) {
if (!$index->getIndex()->exists() || !$skipIfExist) {
$this->_getStreamOutput()->writeln('Creating elasticsearch index `' . $index->getIndex()->getName() . '`…');
$index->createVersioned();
$index->getIndex()->refresh();
$types = $this->_getTypes($indexName);
foreach ($types as $type) {
if (!$type->indexExists() || !$skipIfExist) {
$this->_getStreamOutput()->writeln('Creating elasticsearch index `' . $type->getIndex()->getName() . '`…');
$type->createIndex();
$type->refreshIndex();
}
}
}

/**
* @param string|CM_Elasticsearch_Type_Abstract|null $indexName
* @param string|null $host Elasticsearch host
* @param int|null $port Elasticsearch port
* @throws CM_Exception_Invalid
* @param string|null $indexName
*/
public function update($indexName = null, $host = null, $port = null) {
if ($indexName instanceof CM_Elasticsearch_Type_Abstract) {
$indexes = array($indexName);
} elseif (null !== $indexName) {
$indexes = array($this->_getIndex($indexName, $host, $port));
} else {
$indexes = $this->_getIndexes($host, $port);
}
foreach ($indexes as $index) {
$this->_getStreamOutput()->writeln('Updating elasticsearch index `' . $index->getIndex()->getName() . '`...');
$indexName = $index->getIndex()->getName();
$key = 'Search.Updates_' . $index->getType()->getName();
try {
$ids = $this->_getRedis()->sFlush($key);
$ids = array_filter(array_unique($ids));
$index->update($ids);
$index->getIndex()->refresh();
} catch (Exception $e) {
$message = $indexName . '-updates failed.' . PHP_EOL;
if (isset($ids)) {
$message .= 'Re-adding ' . count($ids) . ' ids to queue.' . PHP_EOL;
foreach ($ids as $id) {
$this->_getRedis()->sAdd($key, $id);
}
}
$message .= 'Reason: ' . $e->getMessage() . PHP_EOL;
throw new CM_Exception_Invalid($message);
}
public function update($indexName = null) {
$types = $this->_getTypes($indexName);
foreach ($types as $type) {
$this->_getStreamOutput()->writeln('Updating elasticsearch index `' . $type->getIndex()->getName() . '`...');
$type->updateIndex();
}
}

/**
* @param string|null $indexName
*/
public function delete($indexName = null) {
if (null !== $indexName) {
$indexes = array($this->_getIndex($indexName));
} else {
$indexes = $this->_getIndexes();
}
foreach ($indexes as $index) {
if ($index->getIndex()->exists()) {
$this->_getStreamOutput()->writeln('Deleting elasticsearch index `' . $index->getIndex()->getName() . '`…');
$index->getIndex()->delete();
$types = $this->_getTypes($indexName);
foreach ($types as $type) {
if ($type->indexExists()) {
$this->_getStreamOutput()->writeln('Deleting elasticsearch index `' . $type->getIndex()->getName() . '`…');
$type->deleteIndex();
}
}
}
Expand All @@ -95,39 +61,22 @@ public function startMaintenance() {
}

/**
* @param string|null $host
* @param int|null $port
* @param string|null $filterIndexName
* @throws CM_Exception_Invalid
* @return CM_Elasticsearch_Type_Abstract[]
*/
private function _getIndexes($host = null, $port = null) {
$indexTypes = CM_Util::getClassChildren('CM_Elasticsearch_Type_Abstract');
return array_map(function ($indexType) use ($host, $port) {
return new $indexType($host, $port);
}, $indexTypes);
}
protected function _getTypes($filterIndexName = null) {
$types = CM_Service_Manager::getInstance()->getElasticsearch()->getTypes();

/**
* @param string $indexName
* @param string|null $host
* @param int|null $port
* @throws CM_Exception_Invalid
* @return CM_Elasticsearch_Type_Abstract
*/
private function _getIndex($indexName, $host = null, $port = null) {
$indexes = array_filter($this->_getIndexes($host, $port), function (CM_Elasticsearch_Type_Abstract $index) use ($indexName) {
return $index->getIndex()->getName() == $indexName;
});
if (!$indexes) {
throw new CM_Exception_Invalid('No such index: ' . $indexName);
if (null !== $filterIndexName) {
$types = \Functional\filter($types, function (CM_Elasticsearch_Type_Abstract $type) use ($filterIndexName) {
return $type->getIndex()->getName() === $filterIndexName;
});
if (count($types) === 0) {
throw new CM_Exception_Invalid('No type with such index name: ' . $filterIndexName);
}
}
return current($indexes);
}

/**
* @return CM_Redis_Client
*/
private function _getRedis() {
return CM_Service_Manager::getInstance()->getRedis();
return $types;
}

public static function getPackageName() {
Expand Down
66 changes: 53 additions & 13 deletions library/CM/Elasticsearch/Type/Abstract.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public function getIndex() {
return $this->_index;
}

/**
* @return bool
*/
public function indexExists() {
return $this->getIndex()->exists();
}

/**
* @return Elastica\Type
*/
Expand All @@ -71,18 +78,7 @@ public function convertDate($date) {
return Elastica\Util::convertDate($date);
}

/**
* @param bool|null $recreate
*/
public function create($recreate = null) {
$this->getIndex()->create($this->_indexParams, $recreate);

$mapping = new Elastica\Type\Mapping($this->getType(), $this->_mapping);
$mapping->setSource(array('enabled' => $this->_source));
$mapping->send();
}

public function createVersioned() {
public function createIndex() {
// Remove old unfinished indices
foreach ($this->_client->getStatus()->getIndicesWithAlias($this->getIndex()->getName() . '.tmp') as $index) {
/** @var Elastica\Index $index */
Expand All @@ -98,7 +94,7 @@ public function createVersioned() {
$version = time();
/** @var $indexNew CM_Elasticsearch_Type_Abstract */
$indexNew = new static($this->_client, $version);
$indexNew->create(true);
$indexNew->_createIndex(true);
$indexNew->getIndex()->addAlias($this->getIndex()->getName() . '.tmp');

$settings = $indexNew->getIndex()->getSettings();
Expand All @@ -125,6 +121,39 @@ public function createVersioned() {
}
}

/**
* @throws CM_Exception_Invalid
*/
public function updateIndex() {
$redis = CM_Service_Manager::getInstance()->getRedis();
$indexName = $this->getIndex()->getName();
$key = 'Search.Updates_' . $this->getType()->getName();
try {
$ids = $redis->sFlush($key);
$ids = array_filter(array_unique($ids));
$this->update($ids);
$this->refreshIndex();
} catch (Exception $e) {
$message = $indexName . '-updates failed.' . PHP_EOL;
if (isset($ids)) {
$message .= 'Re-adding ' . count($ids) . ' ids to queue.' . PHP_EOL;
foreach ($ids as $id) {
$redis->sAdd($key, $id);
}
}
$message .= 'Reason: ' . $e->getMessage() . PHP_EOL;
throw new CM_Exception_Invalid($message);
}
}

public function deleteIndex() {
$this->getIndex()->delete();
}

public function refreshIndex() {
$this->getIndex()->refresh();
}

/**
* Update the complete index
*
Expand Down Expand Up @@ -183,6 +212,17 @@ public function update($ids = null, $useMaintenance = null, $limit = null, $maxD
}
}

/**
* @param bool|null $recreate
*/
protected function _createIndex($recreate = null) {
$this->getIndex()->create($this->_indexParams, $recreate);

$mapping = new Elastica\Type\Mapping($this->getType(), $this->_mapping);
$mapping->setSource(array('enabled' => $this->_source));
$mapping->send();
}

/**
* @param array $data
* @return Elastica\Document Document with data
Expand Down
5 changes: 2 additions & 3 deletions tests/library/CM/Elasticsearch/Type/AbstractTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static function tearDownAfterClass() {
public function setUp() {
CMTest_TH::getServiceManager()->getElasticsearch()->setEnabled(true);
$this->_type = new CM_Elasticsearch_Type_AbstractMock();
$this->_type->createVersioned();
$this->_type->createIndex();
$this->_type->getIndex()->refresh();
}

Expand All @@ -40,8 +40,7 @@ public function testUpdateItem() {

CM_Db_Db::update('index_mock', array('name' => 'bar'), array('id' => $id2));
$this->_type->updateItem(array('id' => $id2));
$searchCli = new CM_Elasticsearch_Index_Cli();
$searchCli->update($this->_type);
$this->_type->updateIndex();
$this->assertSame(1, $source->getCount());
$this->assertEquals(array($id1), $source->getItems());
}
Expand Down
6 changes: 3 additions & 3 deletions tests/library/CM/PagingSource/ElasticsearchTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public function setUp() {
$type1 = new CM_Elasticsearch_Type_Mock1();
$type2 = new CM_Elasticsearch_Type_Mock2();
$type3 = new CM_Elasticsearch_Type_Mock3();
$type1->createVersioned();
$type2->createVersioned();
$type3->createVersioned();
$type1->createIndex();
$type2->createIndex();
$type3->createIndex();
$type1->getIndex()->refresh();
$type2->getIndex()->refresh();
$type3->getIndex()->refresh();
Expand Down

0 comments on commit 23f2cdc

Please sign in to comment.