diff --git a/library/CM/Elasticsearch/Cluster.php b/library/CM/Elasticsearch/Cluster.php index 9e9ce49f3..a24b9d5f7 100644 --- a/library/CM/Elasticsearch/Cluster.php +++ b/library/CM/Elasticsearch/Cluster.php @@ -51,6 +51,27 @@ public function getEnabled() { return $this->_enabled; } + /** + * @return CM_Elasticsearch_Type_Abstract[] + */ + public function getTypes() { + $types = CM_Util::getClassChildren('CM_Elasticsearch_Type_Abstract'); + return \Functional\map($types, function ($className) { + return new $className($this->getRandomClient()); + }); + } + + /** + * @param string $indexName + * @return CM_Elasticsearch_Type_Abstract + * @throws CM_Exception_Invalid + */ + public function findType($indexName) { + return \Functional\first($this->getTypes(), function (CM_Elasticsearch_Type_Abstract $type) use ($indexName) { + return $type->getIndex()->getName() === $indexName; + }); + } + /** * @param CM_Elasticsearch_Type_Abstract[] $types * @param array|null $data diff --git a/library/CM/Elasticsearch/Index/Cli.php b/library/CM/Elasticsearch/Index/Cli.php index b2d1aac36..b1e3c59c5 100644 --- a/library/CM/Elasticsearch/Index/Cli.php +++ b/library/CM/Elasticsearch/Index/Cli.php @@ -7,54 +7,24 @@ class CM_Elasticsearch_Index_Cli extends CM_Cli_Runnable_Abstract { * @param bool|null $skipIfExist */ public function create($indexName = null, $skipIfExist = null) { - if (null !== $indexName) { - $indexes = array($this->_getIndex($indexName)); - } else { - $indexes = $this->_getIndexes(); - } - foreach ($indexes as $index) { - if (!$index->getIndex()->exists() || !$skipIfExist) { - $this->_getStreamOutput()->writeln('Creating elasticsearch index `' . $index->getIndex()->getName() . '`…'); - $index->createVersioned(); - $index->getIndex()->refresh(); + $types = $this->_getTypes($indexName); + foreach ($types as $type) { + if (!$type->indexExists() || !$skipIfExist) { + $this->_getStreamOutput()->writeln('Creating elasticsearch index `' . $type->getIndex()->getName() . '`…'); + $type->createIndex(); + $type->refreshIndex(); } } } /** - * @param string|CM_Elasticsearch_Type_Abstract|null $indexName - * @param string|null $host Elasticsearch host - * @param int|null $port Elasticsearch port - * @throws CM_Exception_Invalid + * @param string|null $indexName */ - public function update($indexName = null, $host = null, $port = null) { - if ($indexName instanceof CM_Elasticsearch_Type_Abstract) { - $indexes = array($indexName); - } elseif (null !== $indexName) { - $indexes = array($this->_getIndex($indexName, $host, $port)); - } else { - $indexes = $this->_getIndexes($host, $port); - } - foreach ($indexes as $index) { - $this->_getStreamOutput()->writeln('Updating elasticsearch index `' . $index->getIndex()->getName() . '`...'); - $indexName = $index->getIndex()->getName(); - $key = 'Search.Updates_' . $index->getType()->getName(); - try { - $ids = $this->_getRedis()->sFlush($key); - $ids = array_filter(array_unique($ids)); - $index->update($ids); - $index->getIndex()->refresh(); - } catch (Exception $e) { - $message = $indexName . '-updates failed.' . PHP_EOL; - if (isset($ids)) { - $message .= 'Re-adding ' . count($ids) . ' ids to queue.' . PHP_EOL; - foreach ($ids as $id) { - $this->_getRedis()->sAdd($key, $id); - } - } - $message .= 'Reason: ' . $e->getMessage() . PHP_EOL; - throw new CM_Exception_Invalid($message); - } + public function update($indexName = null) { + $types = $this->_getTypes($indexName); + foreach ($types as $type) { + $this->_getStreamOutput()->writeln('Updating elasticsearch index `' . $type->getIndex()->getName() . '`...'); + $type->updateIndex(); } } @@ -62,15 +32,11 @@ public function update($indexName = null, $host = null, $port = null) { * @param string|null $indexName */ public function delete($indexName = null) { - if (null !== $indexName) { - $indexes = array($this->_getIndex($indexName)); - } else { - $indexes = $this->_getIndexes(); - } - foreach ($indexes as $index) { - if ($index->getIndex()->exists()) { - $this->_getStreamOutput()->writeln('Deleting elasticsearch index `' . $index->getIndex()->getName() . '`…'); - $index->getIndex()->delete(); + $types = $this->_getTypes($indexName); + foreach ($types as $type) { + if ($type->indexExists()) { + $this->_getStreamOutput()->writeln('Deleting elasticsearch index `' . $type->getIndex()->getName() . '`…'); + $type->deleteIndex(); } } } @@ -95,39 +61,22 @@ public function startMaintenance() { } /** - * @param string|null $host - * @param int|null $port + * @param string|null $filterIndexName + * @throws CM_Exception_Invalid * @return CM_Elasticsearch_Type_Abstract[] */ - private function _getIndexes($host = null, $port = null) { - $indexTypes = CM_Util::getClassChildren('CM_Elasticsearch_Type_Abstract'); - return array_map(function ($indexType) use ($host, $port) { - return new $indexType($host, $port); - }, $indexTypes); - } + protected function _getTypes($filterIndexName = null) { + $types = CM_Service_Manager::getInstance()->getElasticsearch()->getTypes(); - /** - * @param string $indexName - * @param string|null $host - * @param int|null $port - * @throws CM_Exception_Invalid - * @return CM_Elasticsearch_Type_Abstract - */ - private function _getIndex($indexName, $host = null, $port = null) { - $indexes = array_filter($this->_getIndexes($host, $port), function (CM_Elasticsearch_Type_Abstract $index) use ($indexName) { - return $index->getIndex()->getName() == $indexName; - }); - if (!$indexes) { - throw new CM_Exception_Invalid('No such index: ' . $indexName); + if (null !== $filterIndexName) { + $types = \Functional\filter($types, function (CM_Elasticsearch_Type_Abstract $type) use ($filterIndexName) { + return $type->getIndex()->getName() === $filterIndexName; + }); + if (count($types) === 0) { + throw new CM_Exception_Invalid('No type with such index name: ' . $filterIndexName); + } } - return current($indexes); - } - - /** - * @return CM_Redis_Client - */ - private function _getRedis() { - return CM_Service_Manager::getInstance()->getRedis(); + return $types; } public static function getPackageName() { diff --git a/library/CM/Elasticsearch/Type/Abstract.php b/library/CM/Elasticsearch/Type/Abstract.php index c3c9ab905..e8b1139ec 100644 --- a/library/CM/Elasticsearch/Type/Abstract.php +++ b/library/CM/Elasticsearch/Type/Abstract.php @@ -56,6 +56,13 @@ public function getIndex() { return $this->_index; } + /** + * @return bool + */ + public function indexExists() { + return $this->getIndex()->exists(); + } + /** * @return Elastica\Type */ @@ -71,18 +78,7 @@ public function convertDate($date) { return Elastica\Util::convertDate($date); } - /** - * @param bool|null $recreate - */ - public function create($recreate = null) { - $this->getIndex()->create($this->_indexParams, $recreate); - - $mapping = new Elastica\Type\Mapping($this->getType(), $this->_mapping); - $mapping->setSource(array('enabled' => $this->_source)); - $mapping->send(); - } - - public function createVersioned() { + public function createIndex() { // Remove old unfinished indices foreach ($this->_client->getStatus()->getIndicesWithAlias($this->getIndex()->getName() . '.tmp') as $index) { /** @var Elastica\Index $index */ @@ -98,7 +94,7 @@ public function createVersioned() { $version = time(); /** @var $indexNew CM_Elasticsearch_Type_Abstract */ $indexNew = new static($this->_client, $version); - $indexNew->create(true); + $indexNew->_createIndex(true); $indexNew->getIndex()->addAlias($this->getIndex()->getName() . '.tmp'); $settings = $indexNew->getIndex()->getSettings(); @@ -125,6 +121,39 @@ public function createVersioned() { } } + /** + * @throws CM_Exception_Invalid + */ + public function updateIndex() { + $redis = CM_Service_Manager::getInstance()->getRedis(); + $indexName = $this->getIndex()->getName(); + $key = 'Search.Updates_' . $this->getType()->getName(); + try { + $ids = $redis->sFlush($key); + $ids = array_filter(array_unique($ids)); + $this->update($ids); + $this->refreshIndex(); + } catch (Exception $e) { + $message = $indexName . '-updates failed.' . PHP_EOL; + if (isset($ids)) { + $message .= 'Re-adding ' . count($ids) . ' ids to queue.' . PHP_EOL; + foreach ($ids as $id) { + $redis->sAdd($key, $id); + } + } + $message .= 'Reason: ' . $e->getMessage() . PHP_EOL; + throw new CM_Exception_Invalid($message); + } + } + + public function deleteIndex() { + $this->getIndex()->delete(); + } + + public function refreshIndex() { + $this->getIndex()->refresh(); + } + /** * Update the complete index * @@ -183,6 +212,17 @@ public function update($ids = null, $useMaintenance = null, $limit = null, $maxD } } + /** + * @param bool|null $recreate + */ + protected function _createIndex($recreate = null) { + $this->getIndex()->create($this->_indexParams, $recreate); + + $mapping = new Elastica\Type\Mapping($this->getType(), $this->_mapping); + $mapping->setSource(array('enabled' => $this->_source)); + $mapping->send(); + } + /** * @param array $data * @return Elastica\Document Document with data diff --git a/tests/library/CM/Elasticsearch/Type/AbstractTest.php b/tests/library/CM/Elasticsearch/Type/AbstractTest.php index c56b40dab..f0f0f9ffb 100644 --- a/tests/library/CM/Elasticsearch/Type/AbstractTest.php +++ b/tests/library/CM/Elasticsearch/Type/AbstractTest.php @@ -17,7 +17,7 @@ public static function tearDownAfterClass() { public function setUp() { CMTest_TH::getServiceManager()->getElasticsearch()->setEnabled(true); $this->_type = new CM_Elasticsearch_Type_AbstractMock(); - $this->_type->createVersioned(); + $this->_type->createIndex(); $this->_type->getIndex()->refresh(); } @@ -40,8 +40,7 @@ public function testUpdateItem() { CM_Db_Db::update('index_mock', array('name' => 'bar'), array('id' => $id2)); $this->_type->updateItem(array('id' => $id2)); - $searchCli = new CM_Elasticsearch_Index_Cli(); - $searchCli->update($this->_type); + $this->_type->updateIndex(); $this->assertSame(1, $source->getCount()); $this->assertEquals(array($id1), $source->getItems()); } diff --git a/tests/library/CM/PagingSource/ElasticsearchTest.php b/tests/library/CM/PagingSource/ElasticsearchTest.php index 184b1d7ae..f362f28c9 100644 --- a/tests/library/CM/PagingSource/ElasticsearchTest.php +++ b/tests/library/CM/PagingSource/ElasticsearchTest.php @@ -20,9 +20,9 @@ public function setUp() { $type1 = new CM_Elasticsearch_Type_Mock1(); $type2 = new CM_Elasticsearch_Type_Mock2(); $type3 = new CM_Elasticsearch_Type_Mock3(); - $type1->createVersioned(); - $type2->createVersioned(); - $type3->createVersioned(); + $type1->createIndex(); + $type2->createIndex(); + $type3->createIndex(); $type1->getIndex()->refresh(); $type2->getIndex()->refresh(); $type3->getIndex()->refresh();