From 92b98eb7792e62112171a632aec2e545a510f447 Mon Sep 17 00:00:00 2001 From: Terry <2358269014@qq.com> Date: Thu, 8 Feb 2018 18:56:26 +0800 Subject: [PATCH] update readme --- README.md | 131 ++++- config/fecshop_elasticsearch.php | 36 +- yii2-elasticsearch/ActiveRecord.php | 845 ++++++++++++++++++++++++++++ yii2-elasticsearch/Command.php | 558 ++++++++++++++++++ yii2-elasticsearch/Connection.php | 639 +++++++++++++++++++++ yii2-elasticsearch/README.md | 6 + 6 files changed, 2213 insertions(+), 2 deletions(-) create mode 100644 yii2-elasticsearch/ActiveRecord.php create mode 100644 yii2-elasticsearch/Command.php create mode 100644 yii2-elasticsearch/Connection.php create mode 100644 yii2-elasticsearch/README.md diff --git a/README.md b/README.md index 091a0b1..30956a0 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,20 @@ Fecshop ElasticSearch > fecshop elasticsearch 功能部分,用于将分类,产品,搜索页面,底层使用elasticSearch支持 + +### 环境安装 + +1.安装elasticSearch 6.1 + +http://www.fecshop.com/topic/672 + +2.安装elasticSearch 可视化工具 kibana + +http://www.fecshop.com/topic/668 + + +### 安装fecshop elasticSearch扩展 + 1.安装 ``` @@ -12,7 +26,7 @@ composer require --prefer-dist fancyecommerce/fecshop_elasticsearch or 在根目录的`composer.json`中添加 ``` -"fancyecommerce/fecshop_elasticsearch": "~1.0.2" +"fancyecommerce/fecshop_elasticsearch": "~1.xx" // 使用最新版本号 ``` @@ -21,3 +35,118 @@ or 在根目录的`composer.json`中添加 ``` composer update ``` + +### yii2-elasticSearch 不支持es6的处理 + +由于扩展不支持es6,因此对其进行了改动 + +1. composer.json`中添加 +`"yiisoft/yii2-elasticsearch": "2.1@dev",` +,然后执行`composer update` + +2.更新后,然后将 vendor/fancyecommerce/fecshop_elasticsearch/yii2-elasticSearch +下的三个php文件覆盖到`/vendor/yiisoft/yii2-elasticsearch` 下即可 + +3.如果yii2-elasticSearch 支持es6,修复了这个文件,此处将不需要执行(这个只能等官方了) + +### 配置 + +1.添加配置 + +将 ./config/fecshop_elasticsearch.php文件复制到 common/config/fecshop_third_extensions/下面 +,然后打开这个文件 + +1.1在 `nodes` 处配置ip和port + +1.2在`searchLang`处,配置支持的语言,也就是把您的网站的语言都填写过来,那么,这些语言就会使用 +elasticSearch搜索。 + + +2.关闭mongodb和xunsearch搜索 + +打开文件 common/config/fecshop_local_services/Search.php + +将 mongodb 和 xunsearch 部分的搜索语言部分注释掉, +如果您想要某些语言继续使用mongodb或xunsearch搜索,那么可以保留某些语言, +各个搜索引擎的`searchLang`中的语言都是唯一的,不要一种语言出现在2个搜索引擎里面 + +``` + // mongodb + /* + 'searchLang' => [ + 'en' => 'english', + 'fr' => 'french', + 'de' => 'german', + 'es' => 'spanish', + 'ru' => 'russian', + 'pt' => 'portuguese', + ], + */ + + // xunsearch + /* + 'searchLang' => [ + 'zh' => 'chinese', + ], + */ + +``` + +3.初始化数据 + +fecshop 根目录下执行 + +3.1新建elasticSearch的mapping + +``` +./yii elasticsearch/updatemapping +``` + +3.2删除es的产品index(当您的mapping中的某个字段需要修改,直接修改是无效的,只能删除index库,然后重建) + +``` +./yii elasticsearch/clean +``` + +3.3同步产品到elasticSearch + +``` +cd vendor/fancyecommerce/fecshop/shell/search/ +sh fullSearchSync.sh +``` + +3.4然后,es部分就可以访问了 + + + +### 备注 + +1.支持的语言 + +https://github.com/fecshop/yii2_fecshop_elasticsearch/blob/master/models/elasticSearch/Product.php + +暂时支持这些语言 + + +``` +'zh' => 'cjk', // 中国 +'kr' => 'cjk', // 韩国 +'jp' => 'cjk', // 日本 +'en' => 'english', // +'fr' => 'french', +'de' => 'german', +'it' => 'italian', +'pt' => 'portuguese', +'es' => 'spanish', +'ru' => 'russian', +'nl' => 'dutch', +'br' => 'brazilian', +``` + + +2.产品搜索index + +一个语言一个index(elasticSearch的index,有一点点类似mysql的数据库,type,一点点类似表 +,不过完全不同。) + + diff --git a/config/fecshop_elasticsearch.php b/config/fecshop_elasticsearch.php index 1515d25..26168b3 100644 --- a/config/fecshop_elasticsearch.php +++ b/config/fecshop_elasticsearch.php @@ -28,12 +28,46 @@ // configure more hosts if you have a cluster ], ], - ] + ], + 'services' => [ + 'search' => [ + 'childService' => [ + 'elasticSearch' => [ + 'class' => 'fecshop\elasticsearch\services\search\ElasticSearch', + 'enableService' => true, + 'searchLang' => [ + 'en' => 'english', + 'fr' => 'french', + 'de' => 'german', + 'es' => 'spanish', + 'ru' => 'russian', + 'pt' => 'portuguese', + 'zh' => 'chinese', + ], + ], + ], + ], + ], + ], + ], + + + 'console' => [ + // 在公用层的开关,设置成false后,公用层的配置将失效 + 'enable' => true, + // 公用层的具体配置下载下面 + 'config' => [ + 'controllerMap'=>[ + 'elasticsearch'=>[ + 'class'=>'fecshop\elasticsearch\controllers\ElasticsearchController' + ], + ], ], ], + ], ]; diff --git a/yii2-elasticsearch/ActiveRecord.php b/yii2-elasticsearch/ActiveRecord.php new file mode 100644 index 0000000..346d08c --- /dev/null +++ b/yii2-elasticsearch/ActiveRecord.php @@ -0,0 +1,845 @@ + + * @since 2.0 + */ +class ActiveRecord extends BaseActiveRecord +{ + private $_id; + private $_score; + private $_version; + private $_highlight; + private $_explanation; + + /** + * Returns the database connection used by this AR class. + * By default, the "elasticsearch" application component is used as the database connection. + * You may override this method if you want to use a different database connection. + * @return Connection the database connection used by this AR class. + */ + public static function getDb() + { + return \Yii::$app->get('elasticsearch'); + } + + /** + * @inheritdoc + * @return ActiveQuery the newly created [[ActiveQuery]] instance. + */ + public static function find() + { + return Yii::createObject(ActiveQuery::className(), [get_called_class()]); + } + + /** + * @inheritdoc + */ + public static function findOne($condition) + { + $query = static::find(); + if (is_array($condition)) { + return $query->andWhere($condition)->one(); + } else { + return static::get($condition); + } + } + + /** + * @inheritdoc + */ + public static function findAll($condition) + { + $query = static::find(); + if (ArrayHelper::isAssociative($condition)) { + return $query->andWhere($condition)->all(); + } else { + return static::mget((array) $condition); + } + } + + /** + * Gets a record by its primary key. + * + * @param mixed $primaryKey the primaryKey value + * @param array $options options given in this parameter are passed to elasticsearch + * as request URI parameters. + * Please refer to the [elasticsearch documentation](http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html) + * for more details on these options. + * @return static|null The record instance or null if it was not found. + */ + public static function get($primaryKey, $options = []) + { + if ($primaryKey === null) { + return null; + } + $command = static::getDb()->createCommand(); + $result = $command->get(static::index(), static::type(), $primaryKey, $options); + if ($result['found']) { + $model = static::instantiate($result); + static::populateRecord($model, $result); + $model->afterFind(); + + return $model; + } + + return null; + } + + /** + * Gets a list of records by its primary keys. + * + * @param array $primaryKeys an array of primaryKey values + * @param array $options options given in this parameter are passed to elasticsearch + * as request URI parameters. + * + * Please refer to the [elasticsearch documentation](http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html) + * for more details on these options. + * @return array The record instances, or empty array if nothing was found + */ + public static function mget(array $primaryKeys, $options = []) + { + if (empty($primaryKeys)) { + return []; + } + if (count($primaryKeys) === 1) { + $model = static::get(reset($primaryKeys)); + return $model === null ? [] : [$model]; + } + + $command = static::getDb()->createCommand(); + $result = $command->mget(static::index(), static::type(), $primaryKeys, $options); + $models = []; + foreach ($result['docs'] as $doc) { + if ($doc['found']) { + $model = static::instantiate($doc); + static::populateRecord($model, $doc); + $model->afterFind(); + $models[] = $model; + } + } + + return $models; + } + + // TODO add more like this feature http://www.elastic.co/guide/en/elasticsearch/reference/current/search-more-like-this.html + + // TODO add percolate functionality http://www.elastic.co/guide/en/elasticsearch/reference/current/search-percolate.html + + // TODO implement copy and move as pk change is not possible + + /** + * @return float returns the score of this record when it was retrieved via a [[find()]] query. + */ + public function getScore() + { + return $this->_score; + } + + /** + * @return array|null A list of arrays with highlighted excerpts indexed by field names. + */ + public function getHighlight() + { + return $this->_highlight; + } + + /** + * @return array|null An explanation for each hit on how its score was computed. + * @since 2.0.5 + */ + public function getExplanation() + { + return $this->_explanation; + } + + /** + * Sets the primary key + * @param mixed $value + * @throws \yii\base\InvalidCallException when record is not new + */ + public function setPrimaryKey($value) + { + $pk = static::primaryKey()[0]; + if ($this->getIsNewRecord() || $pk != '_id') { + $this->$pk = $value; + } else { + throw new InvalidCallException('Changing the primaryKey of an already saved record is not allowed.'); + } + } + + /** + * @inheritdoc + */ + public function getPrimaryKey($asArray = false) + { + $pk = static::primaryKey()[0]; + if ($asArray) { + return [$pk => $this->$pk]; + } else { + return $this->$pk; + } + } + + /** + * @inheritdoc + */ + public function getOldPrimaryKey($asArray = false) + { + $pk = static::primaryKey()[0]; + if ($this->getIsNewRecord()) { + $id = null; + } elseif ($pk == '_id') { + $id = $this->_id; + } else { + $id = $this->getOldAttribute($pk); + } + if ($asArray) { + return [$pk => $id]; + } else { + return $id; + } + } + + /** + * This method defines the attribute that uniquely identifies a record. + * + * The primaryKey for elasticsearch documents is the `_id` field by default. This field is not part of the + * ActiveRecord attributes so you should never add `_id` to the list of [[attributes()|attributes]]. + * + * You may override this method to define the primary key name when you have defined + * [path mapping](http://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html) + * for the `_id` field so that it is part of the `_source` and thus part of the [[attributes()|attributes]]. + * + * Note that elasticsearch only supports _one_ attribute to be the primary key. However to match the signature + * of the [[\yii\db\ActiveRecordInterface|ActiveRecordInterface]] this methods returns an array instead of a + * single string. + * + * @return string[] array of primary key attributes. Only the first element of the array will be used. + */ + public static function primaryKey() + { + return ['_id']; + } + + /** + * Returns the list of all attribute names of the model. + * + * This method must be overridden by child classes to define available attributes. + * + * Attributes are names of fields of the corresponding elasticsearch document. + * The primaryKey for elasticsearch documents is the `_id` field by default which is not part of the attributes. + * You may define [path mapping](http://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html) + * for the `_id` field so that it is part of the `_source` fields and thus becomes part of the attributes. + * + * @return string[] list of attribute names. + * @throws \yii\base\InvalidConfigException if not overridden in a child class. + */ + public function attributes() + { + throw new InvalidConfigException('The attributes() method of elasticsearch ActiveRecord has to be implemented by child classes.'); + } + + /** + * A list of attributes that should be treated as array valued when retrieved through [[ActiveQuery::fields]]. + * + * If not listed by this method, attributes retrieved through [[ActiveQuery::fields]] will converted to a scalar value + * when the result array contains only one value. + * + * @return string[] list of attribute names. Must be a subset of [[attributes()]]. + */ + public function arrayAttributes() + { + return []; + } + + /** + * @return string the name of the index this record is stored in. + */ + public static function index() + { + return Inflector::pluralize(Inflector::camel2id(StringHelper::basename(get_called_class()), '-')); + } + + /** + * @return string the name of the type of this record. + */ + public static function type() + { + return Inflector::camel2id(StringHelper::basename(get_called_class()), '-'); + } + + /** + * @inheritdoc + * + * @param ActiveRecord $record the record to be populated. In most cases this will be an instance + * created by [[instantiate()]] beforehand. + * @param array $row attribute values (name => value) + */ + public static function populateRecord($record, $row) + { + $attributes = []; + if (isset($row['_source'])) { + $attributes = $row['_source']; + } + if (isset($row['fields'])) { + // reset fields in case it is scalar value + $arrayAttributes = $record->arrayAttributes(); + foreach($row['fields'] as $key => $value) { + if (!isset($arrayAttributes[$key]) && count($value) == 1) { + $row['fields'][$key] = reset($value); + } + } + $attributes = array_merge($attributes, $row['fields']); + } + + parent::populateRecord($record, $attributes); + + $pk = static::primaryKey()[0];//TODO should always set ID in case of fields are not returned + if ($pk === '_id') { + $record->_id = $row['_id']; + } + $record->_highlight = isset($row['highlight']) ? $row['highlight'] : null; + $record->_score = isset($row['_score']) ? $row['_score'] : null; + $record->_version = isset($row['_version']) ? $row['_version'] : null; // TODO version should always be available... + $record->_explanation = isset($row['_explanation']) ? $row['_explanation'] : null; + } + + /** + * Creates an active record instance. + * + * This method is called together with [[populateRecord()]] by [[ActiveQuery]]. + * It is not meant to be used for creating new records directly. + * + * You may override this method if the instance being created + * depends on the row data to be populated into the record. + * For example, by creating a record based on the value of a column, + * you may implement the so-called single-table inheritance mapping. + * @param array $row row data to be populated into the record. + * This array consists of the following keys: + * - `_source`: refers to the attributes of the record. + * - `_type`: the type this record is stored in. + * - `_index`: the index this record is stored in. + * @return static the newly created active record + */ + public static function instantiate($row) + { + return new static; + } + + /** + * Inserts a document into the associated index using the attribute values of this record. + * + * This method performs the following steps in order: + * + * 1. call [[beforeValidate()]] when `$runValidation` is true. If validation + * fails, it will skip the rest of the steps; + * 2. call [[afterValidate()]] when `$runValidation` is true. + * 3. call [[beforeSave()]]. If the method returns false, it will skip the + * rest of the steps; + * 4. insert the record into database. If this fails, it will skip the rest of the steps; + * 5. call [[afterSave()]]; + * + * In the above step 1, 2, 3 and 5, events [[EVENT_BEFORE_VALIDATE]], + * [[EVENT_BEFORE_INSERT]], [[EVENT_AFTER_INSERT]] and [[EVENT_AFTER_VALIDATE]] + * will be raised by the corresponding methods. + * + * Only the [[dirtyAttributes|changed attribute values]] will be inserted into database. + * + * If the [[primaryKey|primary key]] is not set (null) during insertion, + * it will be populated with a + * [randomly generated value](http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_automatic_id_generation) + * after insertion. + * + * For example, to insert a customer record: + * + * ~~~ + * $customer = new Customer; + * $customer->name = $name; + * $customer->email = $email; + * $customer->insert(); + * ~~~ + * + * @param boolean $runValidation whether to perform validation before saving the record. + * If the validation fails, the record will not be inserted into the database. + * @param array $attributes list of attributes that need to be saved. Defaults to null, + * meaning all attributes will be saved. + * @param array $options options given in this parameter are passed to elasticsearch + * as request URI parameters. These are among others: + * + * - `routing` define shard placement of this record. + * - `parent` by giving the primaryKey of another record this defines a parent-child relation + * + * Please refer to the [elasticsearch documentation](http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html) + * for more details on these options. + * + * By default the `op_type` is set to `create` if model primary key is present. + * @return boolean whether the attributes are valid and the record is inserted successfully. + */ + public function insert($runValidation = true, $attributes = null, $options = [ ]) + { + if ($runValidation && !$this->validate($attributes)) { + return false; + } + if (!$this->beforeSave(true)) { + return false; + } + $values = $this->getDirtyAttributes($attributes); + + if ($this->getPrimaryKey() !== null) { + $options['op_type'] = isset($options['op_type']) ? $options['op_type'] : 'create'; + } + Yii::$app->params['es_pro_id'] = $values['m_id']; + //unset($values['_id']); + $response = static::getDb()->createCommand()->insert( + static::index(), + static::type(), + $values, + $this->getPrimaryKey(), + $options + ); + + $pk = static::primaryKey()[0]; + $this->$pk = $response['_id']; + if ($pk != '_id') { + $values[$pk] = $response['_id']; + } + $this->_version = $response['_version']; + $this->_score = null; + + $changedAttributes = array_fill_keys(array_keys($values), null); + $this->setOldAttributes($values); + $this->afterSave(true, $changedAttributes); + + return true; + } + + /** + * @inheritdoc + * + * @param boolean $runValidation whether to perform validation before saving the record. + * If the validation fails, the record will not be inserted into the database. + * @param array $attributeNames list of attribute names that need to be saved. Defaults to null, + * meaning all attributes that are loaded from DB will be saved. + * @param array $options options given in this parameter are passed to elasticsearch + * as request URI parameters. These are among others: + * + * - `routing` define shard placement of this record. + * - `parent` by giving the primaryKey of another record this defines a parent-child relation + * - `timeout` timeout waiting for a shard to become available. + * - `replication` the replication type for the delete/index operation (sync or async). + * - `consistency` the write consistency of the index/delete operation. + * - `refresh` refresh the relevant primary and replica shards (not the whole index) immediately after the operation occurs, so that the updated document appears in search results immediately. + * - `detect_noop` this parameter will become part of the request body and will prevent the index from getting updated when nothing has changed. + * + * Please refer to the [elasticsearch documentation](http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html#_parameters_3) + * for more details on these options. + * + * The following parameters are Yii specific: + * + * - `optimistic_locking` set this to `true` to enable optimistic locking, avoid updating when the record has changed since it + * has been loaded from the database. Yii will set the `version` parameter to the value stored in [[version]]. + * See the [elasticsearch documentation](http://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html) for details. + * + * Make sure the record has been fetched with a [[version]] before. This is only the case + * for records fetched via [[get()]] and [[mget()]] by default. For normal queries, the `_version` field has to be fetched explicitly. + * + * @return integer|boolean the number of rows affected, or false if validation fails + * or [[beforeSave()]] stops the updating process. + * @throws StaleObjectException if optimistic locking is enabled and the data being updated is outdated. + * @throws InvalidParamException if no [[version]] is available and optimistic locking is enabled. + * @throws Exception in case update failed. + */ + public function update($runValidation = true, $attributeNames = null, $options = []) + { + if ($runValidation && !$this->validate($attributeNames)) { + return false; + } + return $this->updateInternal($attributeNames, $options); + } + + /** + * @see update() + * @param array $attributes attributes to update + * @param array $options options given in this parameter are passed to elasticsearch + * as request URI parameters. See [[update()]] for details. + * @return integer|false the number of rows affected, or false if [[beforeSave()]] stops the updating process. + * @throws StaleObjectException if optimistic locking is enabled and the data being updated is outdated. + * @throws InvalidParamException if no [[version]] is available and optimistic locking is enabled. + * @throws Exception in case update failed. + */ + protected function updateInternal($attributes = null, $options = []) + { + if (!$this->beforeSave(false)) { + return false; + } + $values = $this->getDirtyAttributes($attributes); + if (empty($values)) { + $this->afterSave(false, $values); + return 0; + } + + if (isset($options['optimistic_locking']) && $options['optimistic_locking']) { + if ($this->_version === null) { + throw new InvalidParamException('Unable to use optimistic locking on a record that has no version set. Refer to the docs of ActiveRecord::update() for details.'); + } + $options['version'] = $this->_version; + unset($options['optimistic_locking']); + } + + try { + $result = static::getDb()->createCommand()->update( + static::index(), + static::type(), + $this->getOldPrimaryKey(false), + $values, + $options + ); + } catch(Exception $e) { + // HTTP 409 is the response in case of failed optimistic locking + // http://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html + if (isset($e->errorInfo['responseCode']) && $e->errorInfo['responseCode'] == 409) { + throw new StaleObjectException('The object being updated is outdated.', $e->errorInfo, $e->getCode(), $e); + } + throw $e; + } + + if (is_array($result) && isset($result['_version'])) { + $this->_version = $result['_version']; + } + + $changedAttributes = []; + foreach ($values as $name => $value) { + $changedAttributes[$name] = $this->getOldAttribute($name); + $this->setOldAttribute($name, $value); + } + $this->afterSave(false, $changedAttributes); + + if ($result === false) { + return 0; + } else { + return 1; + } + } + + /** + * Performs a quick and highly efficient scroll/scan query to get the list of primary keys that + * satisfy the given condition. If condition is a list of primary keys + * (e.g.: `['_id' => ['1', '2', '3']]`), the query is not performed for performance considerations. + * @param array $condition please refer to [[ActiveQuery::where()]] on how to specify this parameter + * @return array primary keys that correspond to given conditions + * @see updateAll() + * @see updateAllCounters() + * @see deleteAll() + * @since 2.0.4 + */ + protected static function primaryKeysByCondition($condition) + { + $pkName = static::primaryKey()[0]; + if (count($condition) == 1 && isset($condition[$pkName])) { + $primaryKeys = (array)$condition[$pkName]; + } else { + //fetch only document metadata (no fields), 1000 documents per shard + $query = static::find()->where($condition)->asArray()->source(false)->limit(1000); + $primaryKeys = []; + foreach ($query->each('1m') as $document) { + $primaryKeys[] = $document['_id']; + } + } + return $primaryKeys; + } + + /** + * Updates all records whos primary keys are given. + * For example, to change the status to be 1 for all customers whose status is 2: + * + * ~~~ + * Customer::updateAll(['status' => 1], ['status' => 2]); + * ~~~ + * + * @param array $attributes attribute values (name-value pairs) to be saved into the table + * @param array $condition the conditions that will be passed to the `where()` method when building the query. + * Please refer to [[ActiveQuery::where()]] on how to specify this parameter. + * @see [[ActiveRecord::primaryKeysByCondition()]] + * @return integer the number of rows updated + * @throws Exception on error. + */ + public static function updateAll($attributes, $condition = []) + { + $primaryKeys = static::primaryKeysByCondition($condition); + if (empty($primaryKeys)) { + return 0; + } + + $bulkCommand = static::getDb()->createBulkCommand([ + "index" => static::index(), + "type" => static::type(), + ]); + foreach ($primaryKeys as $pk) { + $bulkCommand->addAction(["update" => ["_id" => $pk]], ["doc" => $attributes]); + } + $response = $bulkCommand->execute(); + + $n = 0; + $errors = []; + foreach ($response['items'] as $item) { + if (isset($item['update']['status']) && $item['update']['status'] == 200) { + $n++; + } else { + $errors[] = $item['update']; + } + } + if (!empty($errors) || isset($response['errors']) && $response['errors']) { + throw new Exception(__METHOD__ . ' failed updating records.', $errors); + } + + return $n; + } + + /** + * Updates all matching records using the provided counter changes and conditions. + * For example, to add 1 to age of all customers whose status is 2, + * + * ~~~ + * Customer::updateAllCounters(['age' => 1], ['status' => 2]); + * ~~~ + * + * @param array $counters the counters to be updated (attribute name => increment value). + * Use negative values if you want to decrement the counters. + * @param array $condition the conditions that will be passed to the `where()` method when building the query. + * Please refer to [[ActiveQuery::where()]] on how to specify this parameter. + * @see [[ActiveRecord::primaryKeysByCondition()]] + * @return integer the number of rows updated + * @throws Exception on error. + */ + public static function updateAllCounters($counters, $condition = []) + { + $primaryKeys = static::primaryKeysByCondition($condition); + if (empty($primaryKeys) || empty($counters)) { + return 0; + } + + $bulkCommand = static::getDb()->createBulkCommand([ + "index" => static::index(), + "type" => static::type(), + ]); + foreach ($primaryKeys as $pk) { + $script = ''; + foreach ($counters as $counter => $value) { + $script .= "ctx._source.{$counter} += params.{$counter};\n"; + } + $bulkCommand->addAction(["update" => ["_id" => $pk]], [ + 'script' => [ + 'inline' => $script, + 'params' => $counters, + 'lang' => 'painless', + ], + ]); + } + $response = $bulkCommand->execute(); + + $n = 0; + $errors = []; + foreach ($response['items'] as $item) { + if (isset($item['update']['status']) && $item['update']['status'] == 200) { + $n++; + } else { + $errors[] = $item['update']; + } + } + if (!empty($errors) || isset($response['errors']) && $response['errors']) { + throw new Exception(__METHOD__ . ' failed updating records counters.', $errors); + } + + return $n; + } + + /** + * @inheritdoc + * + * @param array $options options given in this parameter are passed to elasticsearch + * as request URI parameters. These are among others: + * + * - `routing` define shard placement of this record. + * - `parent` by giving the primaryKey of another record this defines a parent-child relation + * - `timeout` timeout waiting for a shard to become available. + * - `replication` the replication type for the delete/index operation (sync or async). + * - `consistency` the write consistency of the index/delete operation. + * - `refresh` refresh the relevant primary and replica shards (not the whole index) immediately after the operation occurs, so that the updated document appears in search results immediately. + * + * Please refer to the [elasticsearch documentation](http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html) + * for more details on these options. + * + * The following parameters are Yii specific: + * + * - `optimistic_locking` set this to `true` to enable optimistic locking, avoid updating when the record has changed since it + * has been loaded from the database. Yii will set the `version` parameter to the value stored in [[version]]. + * See the [elasticsearch documentation](http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html#delete-versioning) for details. + * + * Make sure the record has been fetched with a [[version]] before. This is only the case + * for records fetched via [[get()]] and [[mget()]] by default. For normal queries, the `_version` field has to be fetched explicitly. + * + * @return integer|boolean the number of rows deleted, or false if the deletion is unsuccessful for some reason. + * Note that it is possible the number of rows deleted is 0, even though the deletion execution is successful. + * @throws StaleObjectException if optimistic locking is enabled and the data being deleted is outdated. + * @throws Exception in case delete failed. + */ + public function delete($options = []) + { + if (!$this->beforeDelete()) { + return false; + } + if (isset($options['optimistic_locking']) && $options['optimistic_locking']) { + if ($this->_version === null) { + throw new InvalidParamException('Unable to use optimistic locking on a record that has no version set. Refer to the docs of ActiveRecord::delete() for details.'); + } + $options['version'] = $this->_version; + unset($options['optimistic_locking']); + } + + try { + $result = static::getDb()->createCommand()->delete( + static::index(), + static::type(), + $this->getOldPrimaryKey(false), + $options + ); + } catch(Exception $e) { + // HTTP 409 is the response in case of failed optimistic locking + // http://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html + if (isset($e->errorInfo['responseCode']) && $e->errorInfo['responseCode'] == 409) { + throw new StaleObjectException('The object being deleted is outdated.', $e->errorInfo, $e->getCode(), $e); + } + throw $e; + } + + $this->setOldAttributes(null); + + $this->afterDelete(); + + if ($result === false) { + return 0; + } else { + return 1; + } + } + + /** + * Deletes rows in the table using the provided conditions. + * WARNING: If you do not specify any condition, this method will delete ALL rows in the table. + * + * For example, to delete all customers whose status is 3: + * + * ~~~ + * Customer::deleteAll(['status' => 3]); + * ~~~ + * + * @param array $condition the conditions that will be passed to the `where()` method when building the query. + * Please refer to [[ActiveQuery::where()]] on how to specify this parameter. + * @see [[ActiveRecord::primaryKeysByCondition()]] + * @return integer the number of rows deleted + * @throws Exception on error. + */ + public static function deleteAll($condition = []) + { + $primaryKeys = static::primaryKeysByCondition($condition); + if (empty($primaryKeys)) { + return 0; + } + + $bulkCommand = static::getDb()->createBulkCommand([ + "index" => static::index(), + "type" => static::type(), + ]); + foreach ($primaryKeys as $pk) { + $bulkCommand->addDeleteAction($pk); + } + $response = $bulkCommand->execute(); + + $n = 0; + $errors = []; + foreach ($response['items'] as $item) { + if (isset($item['delete']['status']) && $item['delete']['status'] == 200) { + if (isset($item['delete']['found']) && $item['delete']['found']) { + $n++; + } + } else { + $errors[] = $item['delete']; + } + } + if (!empty($errors) || isset($response['errors']) && $response['errors']) { + throw new Exception(__METHOD__ . ' failed deleting records.', $errors); + } + + return $n; + } + + /** + * This method has no effect in Elasticsearch ActiveRecord. + * + * Elasticsearch ActiveRecord uses [native Optimistic locking](http://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html). + * See [[update()]] for more details. + */ + public function optimisticLock() + { + return null; + } + + /** + * Destroys the relationship in current model. + * + * This method is not supported by elasticsearch. + */ + public function unlinkAll($name, $delete = false) + { + throw new NotSupportedException('unlinkAll() is not supported by elasticsearch, use unlink() instead.'); + } +} diff --git a/yii2-elasticsearch/Command.php b/yii2-elasticsearch/Command.php new file mode 100644 index 0000000..e149368 --- /dev/null +++ b/yii2-elasticsearch/Command.php @@ -0,0 +1,558 @@ + + * @since 2.0 + */ +class Command extends Component +{ + /** + * @var Connection + */ + public $db; + /** + * @var string|array the indexes to execute the query on. Defaults to null meaning all indexes + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html#search-multi-index-type + */ + public $index; + /** + * @var string|array the types to execute the query on. Defaults to null meaning all types + */ + public $type; + /** + * @var array list of arrays or json strings that become parts of a query + */ + public $queryParts; + /** + * @var array options to be appended to the query URL, such as "search_type" for search or "timeout" for delete + */ + public $options = []; + + + /** + * Sends a request to the _search API and returns the result + * @param array $options + * @return mixed + */ + public function search($options = []) + { + $query = $this->queryParts; + if (empty($query)) { + $query = '{}'; + } + if (is_array($query)) { + $query = Json::encode($query); + } + $url = [$this->index !== null ? $this->index : '_all']; + if ($this->type !== null) { + $url[] = $this->type; + } + $url[] = '_search'; + + return $this->db->get($url, array_merge($this->options, $options), $query); + } + + /** + * Sends a request to the delete by query + * @param array $options + * @return mixed + */ + public function deleteByQuery($options = []) + { + if (!isset($this->queryParts['query'])) { + throw new InvalidCallException('Can not call deleteByQuery when no query is given.'); + } + $query = [ + 'query' => $this->queryParts['query'], + ]; + if (isset($this->queryParts['filter'])) { + $query['filter'] = $this->queryParts['filter']; + } + $query = Json::encode($query); + $url = [$this->index !== null ? $this->index : '_all']; + if ($this->type !== null) { + $url[] = $this->type; + } + $url[] = '_query'; + + return $this->db->delete($url, array_merge($this->options, $options), $query); + } + + /** + * Sends a request to the _suggest API and returns the result + * @param string|array $suggester the suggester body + * @param array $options + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/search-suggesters.html + */ + public function suggest($suggester, $options = []) + { + if (empty($suggester)) { + $suggester = '{}'; + } + if (is_array($suggester)) { + $suggester = Json::encode($suggester); + } + $url = [ + $this->index !== null ? $this->index : '_all', + '_suggest' + ]; + + return $this->db->post($url, array_merge($this->options, $options), $suggester); + } + + /** + * Inserts a document into an index + * @param string $index + * @param string $type + * @param string|array $data json string or array of data to store + * @param null $id the documents id. If not specified Id will be automatically chosen + * @param array $options + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html + */ + public function insert($index, $type, $data, $id = null, $options = []) + { + if (empty($data)) { + $body = '{}'; + } else { + $body = is_array($data) ? Json::encode($data) : $data; + } + + if ($id !== null) { + return $this->db->put([$index, $type, $id], $options, $body); + } else { + return $this->db->postInsert([$index, $type], $options, $body); + } + } + + /** + * gets a document from the index + * @param $index + * @param $type + * @param $id + * @param array $options + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html + */ + public function get($index, $type, $id, $options = []) + { + return $this->db->get([$index, $type, $id], $options); + } + + /** + * gets multiple documents from the index + * + * TODO allow specifying type and index + fields + * @param $index + * @param $type + * @param $ids + * @param array $options + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html + */ + public function mget($index, $type, $ids, $options = []) + { + $body = Json::encode(['ids' => array_values($ids)]); + + return $this->db->get([$index, $type, '_mget'], $options, $body); + } + + /** + * gets a documents _source from the index (>=v0.90.1) + * @param $index + * @param $type + * @param $id + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html#_source + */ + public function getSource($index, $type, $id) + { + return $this->db->get([$index, $type, $id]); + } + + /** + * gets a document from the index + * @param $index + * @param $type + * @param $id + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html + */ + public function exists($index, $type, $id) + { + return $this->db->head([$index, $type, $id]); + } + + /** + * deletes a document from the index + * @param $index + * @param $type + * @param $id + * @param array $options + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html + */ + public function delete($index, $type, $id, $options = []) + { + return $this->db->delete([$index, $type, $id], $options); + } + + /** + * updates a document + * @param $index + * @param $type + * @param $id + * @param array $options + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html + */ + public function update($index, $type, $id, $data, $options = []) + { + $body = [ + 'doc' => empty($data) ? new \stdClass() : $data, + ]; + if (isset($options["detect_noop"])) { + $body["detect_noop"] = $options["detect_noop"]; + unset($options["detect_noop"]); + } + + return $this->db->post([$index, $type, $id, '_update'], $options, Json::encode($body)); + } + + // TODO bulk http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + + /** + * creates an index + * @param $index + * @param array $configuration + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html + */ + public function createIndex($index, $configuration = null) + { + $body = $configuration !== null ? Json::encode($configuration) : null; + + return $this->db->put([$index], [], $body); + } + + /** + * deletes an index + * @param $index + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-index.html + */ + public function deleteIndex($index) + { + return $this->db->delete([$index]); + } + + /** + * deletes all indexes + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-index.html + */ + public function deleteAllIndexes() + { + return $this->db->delete(['_all']); + } + + /** + * checks whether an index exists + * @param $index + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-exists.html + */ + public function indexExists($index) + { + return $this->db->head([$index]); + } + + /** + * @param $index + * @param $type + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-types-exists.html + */ + public function typeExists($index, $type) + { + return $this->db->head([$index, $type]); + } + + // TODO http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html + + /** + * Change specific index level settings in real time. + * Note that update analyzers required to [[close()]] the index first and [[open()]] it after the changes are made, + * use [[updateAnalyzers()]] for it. + * + * @param string $index + * @param string|array $setting + * @param array $options URL options + * @return mixed + * @see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-update-settings.html + * @since 2.0.4 + */ + public function updateSettings($index, $setting, $options = []) + { + $body = $setting !== null ? (is_string($setting) ? $setting : Json::encode($setting)) : null; + return $this->db->put([$index, '_settings'], $options, $body); + } + + /** + * Define new analyzers for the index. + * For example if content analyzer hasnt been defined on "myindex" yet + * you can use the following commands to add it: + * + * ~~~ + * $setting = [ + * 'analysis' => [ + * 'analyzer' => [ + * 'ngram_analyzer_with_filter' => [ + * 'tokenizer' => 'ngram_tokenizer', + * 'filter' => 'lowercase, snowball' + * ], + * ], + * 'tokenizer' => [ + * 'ngram_tokenizer' => [ + * 'type' => 'nGram', + * 'min_gram' => 3, + * 'max_gram' => 10, + * 'token_chars' => ['letter', 'digit', 'whitespace', 'punctuation', 'symbol'] + * ], + * ], + * ] + * ]; + * $elasticQuery->createCommand()->updateAnalyzers('myindex', $setting); + * ~~~ + * + * @param string $index + * @param string|array $setting + * @param array $options URL options + * @return mixed + * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html#update-settings-analysis + * @since 2.0.4 + */ + public function updateAnalyzers($index, $setting, $options = []) + { + $this->closeIndex($index); + $result = $this->updateSettings($index, $setting, $options); + $this->openIndex($index); + return $result; + } + + // TODO http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html + + // TODO http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-warmers.html + + /** + * @param $index + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-open-close.html + */ + public function openIndex($index) + { + return $this->db->post([$index, '_open']); + } + + /** + * @param $index + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-open-close.html + */ + public function closeIndex($index) + { + return $this->db->post([$index, '_close']); + } + + /** + * @param array $options + * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html + * @return mixed + * @since 2.0.4 + */ + public function scroll($options = []) + { + return $this->db->get(['_search', 'scroll'], $options); + } + + /** + * @param array $options + * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html + * @return mixed + * @since 2.0.4 + */ + public function clearScroll($options = []) + { + return $this->db->delete(['_search', 'scroll'], $options); + } + + /** + * @param $index + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html + */ + public function getIndexStats($index = '_all') + { + return $this->db->get([$index, '_stats']); + } + + /** + * @param $index + * @return mixed + * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-recovery.html + */ + public function getIndexRecoveryStats($index = '_all') + { + return $this->db->get([$index, '_recovery']); + } + + // http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-segments.html + + /** + * @param $index + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-clearcache.html + */ + public function clearIndexCache($index) + { + return $this->db->post([$index, '_cache', 'clear']); + } + + /** + * @param $index + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html + */ + public function flushIndex($index = '_all') + { + return $this->db->post([$index, '_flush']); + } + + /** + * @param $index + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html + */ + public function refreshIndex($index) + { + return $this->db->post([$index, '_refresh']); + } + + // TODO http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-optimize.html + + // TODO http://www.elastic.co/guide/en/elasticsearch/reference/0.90/indices-gateway-snapshot.html + + /** + * @param string $index + * @param string $type + * @param string|array $mapping + * @param array $options + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html + */ + public function setMapping($index, $type, $mapping, $options = []) + { + $body = $mapping !== null ? (is_string($mapping) ? $mapping : Json::encode($mapping)) : null; + + return $this->db->put([$index, '_mapping', $type], $options, $body); + } + + /** + * @param string $index + * @param string $type + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-mapping.html + */ + public function getMapping($index = '_all', $type = null) + { + $url = [$index, '_mapping']; + if ($type !== null) { + $url[] = $type; + } + return $this->db->get($url); + } + + /** + * @param $index + * @param string $type + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-field-mapping.html + */ +// public function getFieldMapping($index, $type = '_all') +// { +// // TODO implement +// return $this->db->put([$index, $type, '_mapping']); +// } + + /** + * @param $options + * @param $index + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-analyze.html + */ +// public function analyze($options, $index = null) +// { +// // TODO implement +//// return $this->db->put([$index]); +// } + + /** + * @param $name + * @param $pattern + * @param $settings + * @param $mappings + * @param integer $order + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html + */ + public function createTemplate($name, $pattern, $settings, $mappings, $order = 0) + { + $body = Json::encode([ + 'template' => $pattern, + 'order' => $order, + 'settings' => (object) $settings, + 'mappings' => (object) $mappings, + ]); + + return $this->db->put(['_template', $name], [], $body); + + } + + /** + * @param $name + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html + */ + public function deleteTemplate($name) + { + return $this->db->delete(['_template', $name]); + + } + + /** + * @param $name + * @return mixed + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html + */ + public function getTemplate($name) + { + return $this->db->get(['_template', $name]); + } +} diff --git a/yii2-elasticsearch/Connection.php b/yii2-elasticsearch/Connection.php new file mode 100644 index 0000000..4d23cca --- /dev/null +++ b/yii2-elasticsearch/Connection.php @@ -0,0 +1,639 @@ + + * @since 2.0 + */ +class Connection extends Component +{ + /** + * @event Event an event that is triggered after a DB connection is established + */ + const EVENT_AFTER_OPEN = 'afterOpen'; + + /** + * @var boolean whether to autodetect available cluster nodes on [[open()]] + */ + public $autodetectCluster = true; + /** + * @var array The elasticsearch cluster nodes to connect to. + * + * This is populated with the result of a cluster nodes request when [[autodetectCluster]] is true. + * + * Additional special options: + * + * - `auth`: overrides [[auth]] property. For example: + * + * ```php + * [ + * 'http_address' => 'inet[/127.0.0.1:9200]', + * 'auth' => ['username' => 'yiiuser', 'password' => 'yiipw'], // Overrides the `auth` property of the class with specific login and password + * //'auth' => ['username' => 'yiiuser', 'password' => 'yiipw'], // Disabled auth regardless of `auth` property of the class + * ] + * ``` + * + * - `protocol`: explicitly sets the protocol for the current node (useful when manually defining a HTTPS cluster) + * + * @see http://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-info.html#cluster-nodes-info + */ + public $nodes = [ + ['http_address' => 'inet[/127.0.0.1:9200]'], + ]; + /** + * @var string the active node. Key of one of the [[nodes]]. Will be randomly selected on [[open()]]. + */ + public $activeNode; + /** + * @var array Authentication data used to connect to the ElasticSearch node. + * + * Array elements: + * + * - `username`: the username for authentication. + * - `password`: the password for authentication. + * + * Array either MUST contain both username and password on not contain any authentication credentials. + * @see http://www.elasticsearch.org/guide/en/elasticsearch/client/php-api/current/_configuration.html#_example_configuring_http_basic_auth + */ + public $auth = []; + /** + * Elasticsearch has no knowledge of protocol used to access its nodes. Specifically, cluster autodetection request + * returns node hosts and ports, but not the protocols to access them. Therefore we need to specify a default protocol here, + * which can be overridden for specific nodes in the [[nodes]] property. + * If [[autodetectCluster]] is true, all nodes received from cluster will be set to use the protocol defined by [[defaultProtocol]] + * @var string Default protocol to connect to nodes + * @since 2.0.5 + */ + public $defaultProtocol = 'http'; + /** + * @var float timeout to use for connecting to an elasticsearch node. + * This value will be used to configure the curl `CURLOPT_CONNECTTIMEOUT` option. + * If not set, no explicit timeout will be set for curl. + */ + public $connectionTimeout = null; + /** + * @var float timeout to use when reading the response from an elasticsearch node. + * This value will be used to configure the curl `CURLOPT_TIMEOUT` option. + * If not set, no explicit timeout will be set for curl. + */ + public $dataTimeout = null; + + /** + * @var resource the curl instance returned by [curl_init()](http://php.net/manual/en/function.curl-init.php). + */ + private $_curl; + + + public function init() + { + foreach ($this->nodes as &$node) { + if (!isset($node['http_address'])) { + throw new InvalidConfigException('Elasticsearch node needs at least a http_address configured.'); + } + if (!isset($node['protocol'])) { + $node['protocol'] = $this->defaultProtocol; + } + if (!in_array($node['protocol'], ['http', 'https'])) { + throw new InvalidConfigException('Valid node protocol settings are "http" and "https".'); + } + } + } + + /** + * Closes the connection when this component is being serialized. + * @return array + */ + public function __sleep() + { + $this->close(); + + return array_keys(get_object_vars($this)); + } + + /** + * Returns a value indicating whether the DB connection is established. + * @return boolean whether the DB connection is established + */ + public function getIsActive() + { + return $this->activeNode !== null; + } + + /** + * Establishes a DB connection. + * It does nothing if a DB connection has already been established. + * @throws Exception if connection fails + */ + public function open() + { + if ($this->activeNode !== null) { + return; + } + if (empty($this->nodes)) { + throw new InvalidConfigException('elasticsearch needs at least one node to operate.'); + } + $this->_curl = curl_init(); + if ($this->autodetectCluster) { + $this->populateNodes(); + } + $this->selectActiveNode(); + Yii::trace('Opening connection to elasticsearch. Nodes in cluster: ' . count($this->nodes) + . ', active node: ' . $this->nodes[$this->activeNode]['http_address'], __CLASS__); + $this->initConnection(); + } + + /** + * Populates [[nodes]] with the result of a cluster nodes request. + * @throws Exception if no active node(s) found + * @since 2.0.4 + */ + protected function populateNodes() + { + $node = reset($this->nodes); + $host = $node['http_address']; + $protocol = isset($node['protocol']) ? $node['protocol'] : $this->defaultProtocol; + if (strncmp($host, 'inet[/', 6) === 0) { + $host = substr($host, 6, -1); + } + $response = $this->httpRequest('GET', "$protocol://$host/_nodes/_all/http"); + if (!empty($response['nodes'])) { + $nodes = $response['nodes']; + } else { + $nodes = []; + } + + foreach ($nodes as $key => &$node) { + // Make sure that nodes have an 'http_address' property, which is not the case if you're using AWS + // Elasticsearch service (at least as of Oct., 2015). - TO BE VERIFIED + // Temporary workaround - simply ignore all invalid nodes + if (!isset($node['http']['publish_address'])) { + unset($nodes[$key]); + } + $node['http_address'] = $node['http']['publish_address']; + + //Protocol is not a standard ES node property, so we add it manually + $node['protocol'] = $this->defaultProtocol; + } + + if (!empty($nodes)) { + $this->nodes = array_values($nodes); + } else { + curl_close($this->_curl); + throw new Exception('Cluster autodetection did not find any active nodes.'); + } + } + + /** + * select active node randomly + */ + protected function selectActiveNode() + { + $keys = array_keys($this->nodes); + $this->activeNode = $keys[rand(0, count($keys) - 1)]; + } + + /** + * Closes the currently active DB connection. + * It does nothing if the connection is already closed. + */ + public function close() + { + if ($this->activeNode === null) { + return; + } + Yii::trace('Closing connection to elasticsearch. Active node was: ' + . $this->nodes[$this->activeNode]['http']['publish_address'], __CLASS__); + $this->activeNode = null; + if ($this->_curl) { + curl_close($this->_curl); + $this->_curl = null; + } + } + + /** + * Initializes the DB connection. + * This method is invoked right after the DB connection is established. + * The default implementation triggers an [[EVENT_AFTER_OPEN]] event. + */ + protected function initConnection() + { + $this->trigger(self::EVENT_AFTER_OPEN); + } + + /** + * Returns the name of the DB driver for the current [[dsn]]. + * @return string name of the DB driver + */ + public function getDriverName() + { + return 'elasticsearch'; + } + + /** + * Creates a command for execution. + * @param array $config the configuration for the Command class + * @return Command the DB command + */ + public function createCommand($config = []) + { + $this->open(); + $config['db'] = $this; + $command = new Command($config); + + return $command; + } + + /** + * Creates a bulk command for execution. + * @param array $config the configuration for the [[BulkCommand]] class + * @return BulkCommand the DB command + * @since 2.0.5 + */ + public function createBulkCommand($config = []) + { + $this->open(); + $config['db'] = $this; + $command = new BulkCommand($config); + + return $command; + } + + /** + * Creates new query builder instance + * @return QueryBuilder + */ + public function getQueryBuilder() + { + return new QueryBuilder($this); + } + + /** + * Performs GET HTTP request + * + * @param string|array $url URL + * @param array $options URL options + * @param string $body request body + * @param boolean $raw if response body contains JSON and should be decoded + * @return mixed response + * @throws Exception + * @throws InvalidConfigException + */ + public function get($url, $options = [], $body = null, $raw = false) + { + $this->open(); + return $this->httpRequest('GET', $this->createUrl($url, $options), $body, $raw); + } + + /** + * Performs HEAD HTTP request + * + * @param string|array $url URL + * @param array $options URL options + * @param string $body request body + * @return mixed response + * @throws Exception + * @throws InvalidConfigException + */ + public function head($url, $options = [], $body = null) + { + $this->open(); + return $this->httpRequest('HEAD', $this->createUrl($url, $options), $body); + } + + /** + * Performs POST HTTP request + * + * @param string|array $url URL + * @param array $options URL options + * @param string $body request body + * @param boolean $raw if response body contains JSON and should be decoded + * @return mixed response + * @throws Exception + * @throws InvalidConfigException + */ + public function post($url, $options = [], $body = null, $raw = false) + { + $this->open(); + return $this->httpRequest('POST', $this->createUrl($url, $options), $body, $raw); + } + + + public function postInsert($url, $options = [], $body = null, $raw = false) + { + $this->open(); + return $this->httpRequest('POST', $this->createUrlInsert($url, $options), $body, $raw); + } + /** + * Creates URL + * + * @param string|array $path path + * @param array $options URL options + * @return array + */ + private function createUrlInsert($path, $options = []) + { + if (!is_string($path)) { + $url = implode('/', array_map(function ($a) { + return urlencode(is_array($a) ? implode(',', $a) : $a); + }, $path)); + $es_pro_id = Yii::$app->params['es_pro_id']; + $url .= '/'.$es_pro_id.'?op_type=create'; + //if (!empty($options)) { + // $url .= '?' . http_build_query($options); + // } + } else { + $url = $path; + if (!empty($options)) { + $url .= (strpos($url, '?') === false ? '?' : '&') . http_build_query($options); + } + } + + $node = $this->nodes[$this->activeNode]; + $protocol = isset($node['protocol']) ? $node['protocol'] : $this->defaultProtocol; + $host = $node['http_address']; + + return [$protocol, $host, $url]; + } + + /** + * Performs PUT HTTP request + * + * @param string|array $url URL + * @param array $options URL options + * @param string $body request body + * @param boolean $raw if response body contains JSON and should be decoded + * @return mixed response + * @throws Exception + * @throws InvalidConfigException + */ + public function put($url, $options = [], $body = null, $raw = false) + { + $this->open(); + return $this->httpRequest('PUT', $this->createUrl($url, $options), $body, $raw); + } + + /** + * Performs DELETE HTTP request + * + * @param string|array $url URL + * @param array $options URL options + * @param string $body request body + * @param boolean $raw if response body contains JSON and should be decoded + * @return mixed response + * @throws Exception + * @throws InvalidConfigException + */ + public function delete($url, $options = [], $body = null, $raw = false) + { + $this->open(); + return $this->httpRequest('DELETE', $this->createUrl($url, $options), $body, $raw); + } + + /** + * Creates URL + * + * @param string|array $path path + * @param array $options URL options + * @return array + */ + private function createUrl($path, $options = []) + { + if (!is_string($path)) { + $url = implode('/', array_map(function ($a) { + return urlencode(is_array($a) ? implode(',', $a) : $a); + }, $path)); + if (!empty($options)) { + $url .= '?' . http_build_query($options); + } + } else { + $url = $path; + if (!empty($options)) { + $url .= (strpos($url, '?') === false ? '?' : '&') . http_build_query($options); + } + } + + $node = $this->nodes[$this->activeNode]; + $protocol = isset($node['protocol']) ? $node['protocol'] : $this->defaultProtocol; + $host = $node['http_address']; + + return [$protocol, $host, $url]; + } + + /** + * Performs HTTP request + * + * @param string $method method name + * @param string $url URL + * @param string $requestBody request body + * @param boolean $raw if response body contains JSON and should be decoded + * @return mixed if request failed + * @throws Exception if request failed + * @throws InvalidConfigException + */ + protected function httpRequest($method, $url, $requestBody = null, $raw = false) + { + $method = strtoupper($method); + + // response body and headers + $headers = []; + $headersFinished = false; + $body = ''; + + $options = [ + CURLOPT_USERAGENT => 'Yii Framework ' . Yii::getVersion() . ' ' . __CLASS__, + CURLOPT_RETURNTRANSFER => false, + CURLOPT_HEADER => false, + // http://www.php.net/manual/en/function.curl-setopt.php#82418 + CURLOPT_HTTPHEADER => [ + 'Expect:', + 'Content-Type: application/json', + ], + + CURLOPT_WRITEFUNCTION => function ($curl, $data) use (&$body) { + $body .= $data; + return mb_strlen($data, '8bit'); + }, + CURLOPT_HEADERFUNCTION => function ($curl, $data) use (&$headers, &$headersFinished) { + if ($data === '') { + $headersFinished = true; + } elseif ($headersFinished) { + $headersFinished = false; + } + if (!$headersFinished && ($pos = strpos($data, ':')) !== false) { + $headers[strtolower(substr($data, 0, $pos))] = trim(substr($data, $pos + 1)); + } + return mb_strlen($data, '8bit'); + }, + CURLOPT_CUSTOMREQUEST => $method, + CURLOPT_FORBID_REUSE => false, + ]; + + if (!empty($this->auth) || isset($this->nodes[$this->activeNode]['auth']) && $this->nodes[$this->activeNode]['auth'] !== false) { + $auth = isset($this->nodes[$this->activeNode]['auth']) ? $this->nodes[$this->activeNode]['auth'] : $this->auth; + if (empty($auth['username'])) { + throw new InvalidConfigException('Username is required to use authentication'); + } + if (empty($auth['password'])) { + throw new InvalidConfigException('Password is required to use authentication'); + } + + $options[CURLOPT_HTTPAUTH] = CURLAUTH_BASIC; + $options[CURLOPT_USERPWD] = $auth['username'] . ':' . $auth['password']; + } + + if ($this->connectionTimeout !== null) { + $options[CURLOPT_CONNECTTIMEOUT] = $this->connectionTimeout; + } + if ($this->dataTimeout !== null) { + $options[CURLOPT_TIMEOUT] = $this->dataTimeout; + } + if ($requestBody !== null) { + $options[CURLOPT_POSTFIELDS] = $requestBody; + } + if ($method == 'HEAD') { + $options[CURLOPT_NOBODY] = true; + unset($options[CURLOPT_WRITEFUNCTION]); + } else { + $options[CURLOPT_NOBODY] = false; + } + + if (is_array($url)) { + list($protocol, $host, $q) = $url; + if (strncmp($host, 'inet[', 5) == 0) { + $host = substr($host, 5, -1); + if (($pos = strpos($host, '/')) !== false) { + $host = substr($host, $pos + 1); + } + } + $profile = "$method $q#$requestBody"; + $url = "$protocol://$host/$q"; + } else { + $profile = false; + } + + Yii::trace("Sending request to elasticsearch node: $method $url\n$requestBody", __METHOD__); + if ($profile !== false) { + Yii::beginProfile($profile, __METHOD__); + } + + $this->resetCurlHandle(); + curl_setopt($this->_curl, CURLOPT_URL, $url); + curl_setopt_array($this->_curl, $options); + if (curl_exec($this->_curl) === false) { + throw new Exception('Elasticsearch request failed: ' . curl_errno($this->_curl) . ' - ' . curl_error($this->_curl), [ + 'requestMethod' => $method, + 'requestUrl' => $url, + 'requestBody' => $requestBody, + 'responseHeaders' => $headers, + 'responseBody' => $this->decodeErrorBody($body), + ]); + } + + $responseCode = curl_getinfo($this->_curl, CURLINFO_HTTP_CODE); + + if ($profile !== false) { + Yii::endProfile($profile, __METHOD__); + } + + if ($responseCode >= 200 && $responseCode < 300) { + if ($method === 'HEAD') { + return true; + } else { + if (isset($headers['content-length']) && ($len = mb_strlen($body, '8bit')) < $headers['content-length']) { + throw new Exception("Incomplete data received from elasticsearch: $len < {$headers['content-length']}", [ + 'requestMethod' => $method, + 'requestUrl' => $url, + 'requestBody' => $requestBody, + 'responseCode' => $responseCode, + 'responseHeaders' => $headers, + 'responseBody' => $body, + ]); + } + if (isset($headers['content-type']) && (!strncmp($headers['content-type'], 'application/json', 16) || !strncmp($headers['content-type'], 'text/plain', 10))) { + return $raw ? $body : Json::decode($body); + } + throw new Exception('Unsupported data received from elasticsearch: ' . $headers['content-type'], [ + 'requestMethod' => $method, + 'requestUrl' => $url, + 'requestBody' => $requestBody, + 'responseCode' => $responseCode, + 'responseHeaders' => $headers, + 'responseBody' => $this->decodeErrorBody($body), + ]); + } + } elseif ($responseCode == 404) { + return false; + } else { + throw new Exception("Elasticsearch request failed with code $responseCode. Response body:\n{$body}", [ + 'requestMethod' => $method, + 'requestUrl' => $url, + 'requestBody' => $requestBody, + 'responseCode' => $responseCode, + 'responseHeaders' => $headers, + 'responseBody' => $this->decodeErrorBody($body), + ]); + } + } + + private function resetCurlHandle() + { + // these functions do not get reset by curl automatically + static $unsetValues = [ + CURLOPT_HEADERFUNCTION => null, + CURLOPT_WRITEFUNCTION => null, + CURLOPT_READFUNCTION => null, + CURLOPT_PROGRESSFUNCTION => null, + CURLOPT_POSTFIELDS => null, + ]; + curl_setopt_array($this->_curl, $unsetValues); + if (function_exists('curl_reset')) { // since PHP 5.5.0 + curl_reset($this->_curl); + } + } + + /** + * Try to decode error information if it is valid json, return it if not. + * @param $body + * @return mixed + */ + protected function decodeErrorBody($body) + { + try { + $decoded = Json::decode($body); + if (isset($decoded['error']) && !is_array($decoded['error'])) { + $decoded['error'] = preg_replace('/\b\w+?Exception\[/', "\\0\n ", $decoded['error']); + } + return $decoded; + } catch(InvalidParamException $e) { + return $body; + } + } + + public function getNodeInfo() + { + return $this->get([]); + } + + public function getClusterState() + { + return $this->get(['_cluster', 'state']); + } +} diff --git a/yii2-elasticsearch/README.md b/yii2-elasticsearch/README.md new file mode 100644 index 0000000..5736502 --- /dev/null +++ b/yii2-elasticsearch/README.md @@ -0,0 +1,6 @@ +### yii2-elasticSearch + +չ֧es6˶˸Ķˣǰļ `*.php` ǵ + +`/vendor/yiisoft/yii2-elasticsearch` ¼ +