From dda1bd94b3952864114e3d26f590c54989b10f74 Mon Sep 17 00:00:00 2001 From: Anastasios Zouzias Date: Tue, 9 Apr 2019 16:28:03 +0200 Subject: [PATCH] [facets] remove faceted search functionality --- .../lucenerdd/LuceneRDDKryoRegistrator.scala | 5 +- .../aggregate/SparkFacetResultMonoid.scala | 36 ---- .../lucenerdd/facets/FacetedLuceneRDD.scala | 191 ------------------ .../spark/lucenerdd/facets/package.scala | 165 --------------- .../lucenerdd/models/SparkFacetResult.scala | 54 ----- .../AbstractLuceneRDDPartition.scala | 11 +- .../partition/LuceneRDDPartition.scala | 20 +- .../lucenerdd/query/LuceneQueryHelpers.scala | 38 +--- .../shape/ShapeLuceneRDDKryoRegistrator.scala | 3 +- .../partition/ShapeLuceneRDDPartition.scala | 6 +- .../spark/lucenerdd/store/IndexStorable.scala | 13 +- ...xonomyWriter.scala => IndexWritable.scala} | 8 +- .../facets/FacetedLuceneRDDFacetSpec.scala | 178 ---------------- .../FacetedLuceneRDDImplicitsSpec.scala | 84 -------- .../query/LuceneQueryHelpersSpec.scala | 26 +-- 15 files changed, 19 insertions(+), 819 deletions(-) delete mode 100644 src/main/scala/org/zouzias/spark/lucenerdd/aggregate/SparkFacetResultMonoid.scala delete mode 100644 src/main/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDD.scala delete mode 100644 src/main/scala/org/zouzias/spark/lucenerdd/facets/package.scala delete mode 100644 src/main/scala/org/zouzias/spark/lucenerdd/models/SparkFacetResult.scala rename src/main/scala/org/zouzias/spark/lucenerdd/store/{IndexWithTaxonomyWriter.scala => IndexWritable.scala} (86%) delete mode 100644 src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDFacetSpec.scala delete mode 100644 src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDImplicitsSpec.scala diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDDKryoRegistrator.scala b/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDDKryoRegistrator.scala index 0f7a6e54..06cb51cd 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDDKryoRegistrator.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDDKryoRegistrator.scala @@ -20,8 +20,7 @@ import com.twitter.algebird.TopK import com.twitter.chill.Kryo import org.apache.spark.SparkConf import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} -import org.zouzias.spark.lucenerdd.facets.FacetedLuceneRDD -import org.zouzias.spark.lucenerdd.models.{SparkDoc, SparkFacetResult, SparkScoreDoc} +import org.zouzias.spark.lucenerdd.models.{SparkDoc, SparkScoreDoc} import org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition import org.zouzias.spark.lucenerdd.response.{LuceneRDDResponse, LuceneRDDResponsePartition} import org.zouzias.spark.lucenerdd.testing.{FavoriteCaseClass, Person} @@ -30,7 +29,6 @@ class LuceneRDDKryoRegistrator extends KryoRegistrator { def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[LuceneRDD[_]]) kryo.register(classOf[LuceneRDDPartition[_]]) - kryo.register(classOf[FacetedLuceneRDD[_]]) kryo.register(classOf[SparkDoc]) kryo.register(classOf[Number]) kryo.register(classOf[java.lang.Double]) @@ -57,7 +55,6 @@ class LuceneRDDKryoRegistrator extends KryoRegistrator { kryo.register(classOf[scala.collection.immutable.Set$EmptySet$]) kryo.register(classOf[scala.collection.immutable.Map[_, _]]) kryo.register(classOf[Array[scala.collection.immutable.Map[_, _]]]) - kryo.register(classOf[SparkFacetResult]) kryo.register(classOf[SparkScoreDoc]) kryo.register(classOf[LuceneRDDResponse]) kryo.register(classOf[LuceneRDDResponsePartition]) diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/aggregate/SparkFacetResultMonoid.scala b/src/main/scala/org/zouzias/spark/lucenerdd/aggregate/SparkFacetResultMonoid.scala deleted file mode 100644 index 3de0cb7e..00000000 --- a/src/main/scala/org/zouzias/spark/lucenerdd/aggregate/SparkFacetResultMonoid.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd.aggregate - -import com.twitter.algebird.MapMonoid -import org.zouzias.spark.lucenerdd.models.SparkFacetResult - -/** - * Monoid used to aggregate faceted results [[SparkFacetResult]] - * from the executors to the driver - */ -object SparkFacetResultMonoid extends Serializable { - - private lazy val facetMonoid = new MapMonoid[String, Long]() - - def zero(facetName: String): SparkFacetResult = SparkFacetResult(facetName, facetMonoid.zero) - - def plus(l: SparkFacetResult, r: SparkFacetResult): SparkFacetResult = { - require(l.facetName == r.facetName) // Check if summing same facets - SparkFacetResult(l.facetName, facetMonoid.plus(l.facets, r.facets)) - } -} diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDD.scala b/src/main/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDD.scala deleted file mode 100644 index c1db740a..00000000 --- a/src/main/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDD.scala +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd.facets - -import org.apache.lucene.document.Document -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.storage.StorageLevel -import org.zouzias.spark.lucenerdd.LuceneRDD -import org.zouzias.spark.lucenerdd.aggregate.SparkFacetResultMonoid -import org.zouzias.spark.lucenerdd.analyzers.AnalyzerConfigurable -import org.zouzias.spark.lucenerdd.models.SparkFacetResult -import org.zouzias.spark.lucenerdd.partition.{AbstractLuceneRDDPartition, LuceneRDDPartition} -import org.zouzias.spark.lucenerdd.query.SimilarityConfigurable -import org.zouzias.spark.lucenerdd.response.LuceneRDDResponse -import org.zouzias.spark.lucenerdd.versioning.Versionable - -import scala.reflect.ClassTag - -/** - * LuceneRDD with faceted functionality - */ -class FacetedLuceneRDD[T: ClassTag] - (override protected val partitionsRDD: RDD[AbstractLuceneRDDPartition[T]], - override val indexAnalyzer: String, - override val queryAnalyzer: String, - override val similarity: String) - extends LuceneRDD[T](partitionsRDD, indexAnalyzer, queryAnalyzer, similarity) { - - setName("FacetedLuceneRDD") - - override def cache(): this.type = { - this.persist(StorageLevel.MEMORY_ONLY) - } - - override def persist(newLevel: StorageLevel): this.type = { - partitionsRDD.persist(newLevel) - super.persist(newLevel) - this - } - - override def unpersist(blocking: Boolean = true): this.type = { - partitionsRDD.unpersist(blocking) - super.unpersist(blocking) - this - } - - /** - * Aggregates faceted search results using monoidal structure [[SparkFacetResultMonoid]] - * - * @param f a function that computes faceted search results per partition - * @return faceted search results - */ - private def facetResultsAggregator(f: AbstractLuceneRDDPartition[T] => SparkFacetResult) - : SparkFacetResult = { - partitionsRDD.map(f(_)).reduce(SparkFacetResultMonoid.plus) - } - - /** - * Faceted query - * - * @param searchString Lucene query string - * @param facetField Field on which to compute facet - * @param topK Number of results - * @param facetNum Number of faceted results - * @return - */ - def facetQuery(searchString: String, - facetField: String, - topK: Int = DefaultTopK, - facetNum: Int = DefaultFacetNum - ): (LuceneRDDResponse, SparkFacetResult) = { - val aggrTopDocs = partitionMapper(_.query(searchString, topK)) - val aggrFacets = facetResultsAggregator(_.facetQuery(searchString, facetField, facetNum)) - (aggrTopDocs, aggrFacets) - } - - /** - * Faceted query with multiple facets - * - * @param searchString Lucene query string - * @param facetFields Fields on which to compute facets - * @param topK Number of results - * @param facetNum Number of faceted results - * @return - */ - def facetQueries(searchString: String, - facetFields: Seq[String], - topK: Int = DefaultTopK, - facetNum: Int = DefaultFacetNum) - : (LuceneRDDResponse, Map[String, SparkFacetResult]) = { - logInfo(s"Faceted query on facet fields ${facetFields.mkString(",")}...") - val aggrTopDocs = partitionMapper(_.query(searchString, topK)) - val aggrFacets = facetFields.map { case facetField => - (facetField, facetResultsAggregator(_.facetQuery(searchString, facetField, facetNum))) - }.toMap[String, SparkFacetResult] - (aggrTopDocs, aggrFacets) - } -} - -object FacetedLuceneRDD extends Versionable - with AnalyzerConfigurable - with SimilarityConfigurable { - - /** All faceted fields are suffixed with _facet */ - val FacetTextFieldSuffix = "_facet" - val FacetNumericFieldSuffix = "_numFacet" - - /** - * Instantiate a FacetedLuceneRDD given an RDD[T] - * - * @param elems RDD of type T - * @tparam T Generic type - * @return - */ - def apply[T : ClassTag](elems: RDD[T], indexAnalyzer: String, queryAnalyzer: String, - similarity: String) - (implicit conv: T => Document): FacetedLuceneRDD[T] = { - val partitions = elems.mapPartitionsWithIndex[AbstractLuceneRDDPartition[T]]( - (partId, iter) => Iterator(LuceneRDDPartition(iter, partId, indexAnalyzer, queryAnalyzer, - similarity)), - preservesPartitioning = true) - new FacetedLuceneRDD[T](partitions, indexAnalyzer, queryAnalyzer, similarity) - } - - def apply[T : ClassTag](elems: RDD[T])(implicit conv: T => Document) - : FacetedLuceneRDD[T] = { - apply[T](elems, getOrElseEn(IndexAnalyzerConfigName), getOrElseEn(QueryAnalyzerConfigName), - getOrElseClassic()) - } - - /** - * Instantiate a FacetedLuceneRDD with an iterable - * - * @param elems Iterable of documents - * @param indexAnalyzer Index Analyzer name - * @param queryAnalyzer Query Analyzer name - * @param similarity Lucene scoring similarity, i.e., BM25 or TF-IDF - * @param sc - * @tparam T - * @return - */ - def apply[T : ClassTag] - (elems: Iterable[T], indexAnalyzer: String, queryAnalyzer: String, similarity: String) - (implicit sc: SparkContext, conv: T => Document) - : FacetedLuceneRDD[T] = { - apply(sc.parallelize[T](elems.toSeq), indexAnalyzer, queryAnalyzer, similarity) - } - - def apply[T : ClassTag] - (elems: Iterable[T]) - (implicit sc: SparkContext, conv: T => Document) - : FacetedLuceneRDD[T] = { - apply(sc.parallelize[T](elems.toSeq)) - } - - /** - * Instantiate a FacetedLuceneRDD with DataFrame - * - * @param dataFrame Spark DataFrame - * @param indexAnalyzer Index Analyzer name - * @param queryAnalyzer Query Analyzer name - * @param similarity Lucene scoring similarity, i.e., BM25 or TF-IDF - * @return - */ - def apply(dataFrame: DataFrame, indexAnalyzer: String, queryAnalyzer: String, similarity: String) - : FacetedLuceneRDD[Row] = { - apply(dataFrame.rdd, indexAnalyzer, queryAnalyzer, similarity: String) - } - - def apply(dataFrame: DataFrame) - : FacetedLuceneRDD[Row] = { - apply(dataFrame.rdd, getOrElseEn(IndexAnalyzerConfigName), getOrElseEn(QueryAnalyzerConfigName), - getOrElseClassic()) - } -} diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/facets/package.scala b/src/main/scala/org/zouzias/spark/lucenerdd/facets/package.scala deleted file mode 100644 index e2d6cf2a..00000000 --- a/src/main/scala/org/zouzias/spark/lucenerdd/facets/package.scala +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd - -import org.apache.lucene.document._ -import org.apache.lucene.facet.FacetField -import org.apache.spark.sql.Row - -import scala.reflect.ClassTag - -/** - * Contains implicit conversion to [[org.apache.lucene.document.Document]] - * which prepares the index for faceted search as well. - */ -package object facets { - - private val Stored = Field.Store.YES - private val DefaultFieldName = "_1" - - /** - * Adds extra field on index with suffix [[FacetedLuceneRDD.FacetTextFieldSuffix]] - * This fiels is used on faceted queries - * - * @param doc Input document - * @param fieldName Field name - * @param fieldValue Field value to be indexed - */ - private def addTextFacetField(doc: Document, fieldName: String, fieldValue: String): Unit = { - if ( fieldValue.nonEmpty) { // Issues with empty strings on facets - doc.add(new FacetField(s"${fieldName}${FacetedLuceneRDD.FacetTextFieldSuffix}", - fieldValue)) - } - } - - implicit def intToDocument(v: Int): Document = { - val doc = new Document - doc.add(new IntPoint(DefaultFieldName, v)) - addTextFacetField(doc, DefaultFieldName, v.toString) - doc - } - - implicit def longToDocument(v: Long): Document = { - val doc = new Document - doc.add(new LongPoint(DefaultFieldName, v)) - addTextFacetField(doc, DefaultFieldName, v.toString) - doc - } - - implicit def doubleToDocument(v: Double): Document = { - val doc = new Document - doc.add(new DoublePoint(DefaultFieldName, v)) - addTextFacetField(doc, DefaultFieldName, v.toString) - doc - } - - implicit def floatToDocument(v: Float): Document = { - val doc = new Document - doc.add(new FloatPoint(DefaultFieldName, v)) - addTextFacetField(doc, DefaultFieldName, v.toString) - doc - } - - implicit def stringToDocument(s: String): Document = { - val doc = new Document - doc.add(new TextField(DefaultFieldName, s, Stored)) - addTextFacetField(doc, DefaultFieldName, s) - doc - } - - private def tupleTypeToDocument[T: ClassTag](doc: Document, index: Int, s: T): Document = { - typeToDocument(doc, s"_${index}", s) - } - - def typeToDocument[T: ClassTag](doc: Document, fName: String, s: T): Document = { - s match { - case x: String => - doc.add(new TextField(fName, x, Stored)) - addTextFacetField(doc, fName, x) - case x: Long => - doc.add(new LongPoint(fName, x)) - doc.add(new StoredField(fName, x)) - doc.add(new NumericDocValuesField(s"${fName} ${FacetedLuceneRDD.FacetNumericFieldSuffix}", - x)) - case x: Int => - doc.add(new IntPoint(fName, x)) - doc.add(new StoredField(fName, x)) - doc.add(new NumericDocValuesField(s"${fName}${FacetedLuceneRDD.FacetNumericFieldSuffix}", - x.toLong)) - case x: Float => - doc.add(new FloatPoint(fName, x)) - doc.add(new StoredField(fName, x)) - doc.add(new FloatDocValuesField(s"${fName}${FacetedLuceneRDD.FacetNumericFieldSuffix}", - x)) - case x: Double => - doc.add(new DoublePoint(fName, x)) - doc.add(new StoredField(fName, x)) - doc.add(new DoubleDocValuesField(s"${fName}${FacetedLuceneRDD.FacetNumericFieldSuffix}", - x)) - } - doc - } - - implicit def iterablePrimitiveToDocument[T: ClassTag](iter: Iterable[T]): Document = { - val doc = new Document - iter.foreach( item => tupleTypeToDocument(doc, 1, item)) - doc - } - - implicit def mapToDocument[T: ClassTag](map: Map[String, T]): Document = { - val doc = new Document - map.foreach{ case (key, value) => - typeToDocument(doc, key, value) - } - doc - } - - /** - * Implicit conversion for all product types, such as case classes and Tuples - * @param s - * @tparam T - * @return - */ - implicit def productTypeToDocument[T <: Product : ClassTag](s: T): Document = { - val doc = new Document - - val fieldNames = s.getClass.getDeclaredFields.map(_.getName).toIterator - val fieldValues = s.productIterator - fieldValues.zip(fieldNames).foreach{ case (elem, fieldName) => - typeToDocument(doc, fieldName, elem) - } - - doc - } - - /** - * Implicit conversion for Spark Row: used for DataFrame - * @param row - * @return - */ - implicit def sparkRowToDocument(row: Row): Document = { - val doc = new Document - - val fieldNames = row.schema.fieldNames - fieldNames.foreach{ case fieldName => - val index = row.fieldIndex(fieldName) - typeToDocument(doc, fieldName, row.get(index)) - } - - doc - } -} diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/models/SparkFacetResult.scala b/src/main/scala/org/zouzias/spark/lucenerdd/models/SparkFacetResult.scala deleted file mode 100644 index 8d953b69..00000000 --- a/src/main/scala/org/zouzias/spark/lucenerdd/models/SparkFacetResult.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd.models - -import org.apache.lucene.facet.FacetResult - -case class SparkFacetResult(facetName: String, facets: Map[String, Long]) { - - /** - * Return facet counts sorted descending - * @return Sequence of (facet value, facet counts) - */ - def sortedFacets(): Seq[(String, Long)] = { - facets.toSeq.sortBy[Long](x => -x._2) - } -} - - -object SparkFacetResult extends Serializable { - - /** - * Convert [[org.apache.lucene.facet.FacetResult]] - * to [[org.zouzias.spark.lucenerdd.models.SparkFacetResult]] - * - * @param facetName name of facet - * @param facetResult input facet results - * @return - */ - def apply(facetName: String, facetResult: FacetResult): SparkFacetResult = { - val facetResultOpt = Option(facetResult) - facetResultOpt match { - case Some(fctResult) => - val map = fctResult.labelValues - .map(labelValue => (labelValue.label, labelValue.value.longValue())) - .toMap[String, Long] - SparkFacetResult(facetName, map) - case _ => SparkFacetResult(facetName, Map.empty[String, Long]) - } - } -} diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/partition/AbstractLuceneRDDPartition.scala b/src/main/scala/org/zouzias/spark/lucenerdd/partition/AbstractLuceneRDDPartition.scala index 3a43c91d..fd0652de 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/partition/AbstractLuceneRDDPartition.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/partition/AbstractLuceneRDDPartition.scala @@ -18,7 +18,7 @@ package org.zouzias.spark.lucenerdd.partition import org.apache.lucene.search.{BooleanClause, Query} import org.zouzias.spark.lucenerdd.models.indexstats.IndexStatistics -import org.zouzias.spark.lucenerdd.models.{SparkFacetResult, TermVectorEntry} +import org.zouzias.spark.lucenerdd.models.TermVectorEntry import org.zouzias.spark.lucenerdd.response.LuceneRDDResponsePartition import scala.reflect.ClassTag @@ -81,15 +81,6 @@ private[lucenerdd] abstract class AbstractLuceneRDDPartition[T] extends Serializ def queries(searchString: Iterable[String], topK: Int) : Iterable[(String, LuceneRDDResponsePartition)] - /** - * Generic Lucene faceted Query using QueryParser - * @param searchString Lucene query string, i.e., textField:hello* - * @param topK Number of facets to return - * @return - */ - def facetQuery(searchString: String, facetField: String, topK: Int) - : SparkFacetResult - /** * Term Query * @param fieldName Name of field diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala b/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala index 3e0b4fe6..d9aea225 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala @@ -18,17 +18,15 @@ package org.zouzias.spark.lucenerdd.partition import org.apache.lucene.analysis.Analyzer import org.apache.lucene.document._ -import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyReader import org.apache.lucene.index.{DirectoryReader, IndexReader} import org.apache.lucene.search._ import org.joda.time.DateTime -import org.zouzias.spark.lucenerdd.facets.FacetedLuceneRDD import org.zouzias.spark.lucenerdd.models.indexstats.{FieldStatistics, IndexStatistics} -import org.zouzias.spark.lucenerdd.models.{SparkFacetResult, TermVectorEntry} +import org.zouzias.spark.lucenerdd.models.TermVectorEntry import org.zouzias.spark.lucenerdd.query.{LuceneQueryHelpers, SimilarityConfigurable} import org.zouzias.spark.lucenerdd.response.LuceneRDDResponsePartition -import org.zouzias.spark.lucenerdd.store.IndexWithTaxonomyWriter import org.zouzias.spark.lucenerdd.LuceneRDD +import org.zouzias.spark.lucenerdd.store.IndexWritable import scala.reflect.{ClassTag, _} import scala.collection.mutable.ArrayBuffer @@ -53,7 +51,7 @@ private[lucenerdd] class LuceneRDDPartition[T] (implicit docConversion: T => Document, override implicit val kTag: ClassTag[T]) extends AbstractLuceneRDDPartition[T] - with IndexWithTaxonomyWriter + with IndexWritable with SimilarityConfigurable { logInfo(s"[partId=${partitionId}] Partition is created...") @@ -69,7 +67,7 @@ private[lucenerdd] class LuceneRDDPartition[T] iterIndex.foreach { case elem => // (implicitly) convert type T to Lucene document val doc = docConversion(elem) - indexWriter.addDocument(FacetsConfig.build(taxoWriter, doc)) + indexWriter.addDocument(doc) } private val endTime = new DateTime(System.currentTimeMillis()) logInfo(s"[partId=${partitionId}]Indexing process completed at ${endTime}...") @@ -83,7 +81,6 @@ private[lucenerdd] class LuceneRDDPartition[T] logDebug(s"[partId=${partitionId}]Instantiating index/facet readers") private val indexReader = DirectoryReader.open(IndexDir) private lazy val indexSearcher = initializeIndexSearcher(indexReader) - private val taxoReader = new DirectoryTaxonomyReader(TaxonomyDir) logDebug(s"[partId=${partitionId}]Index readers instantiated successfully") logInfo(s"[partId=${partitionId}]Indexed ${size} documents") @@ -174,15 +171,6 @@ private[lucenerdd] class LuceneRDDPartition[T] LuceneRDDResponsePartition(results.toIterator) } - override def facetQuery(searchString: String, - facetField: String, - topK: Int): SparkFacetResult = { - LuceneQueryHelpers.facetedTextSearch(indexSearcher, taxoReader, FacetsConfig, - searchString, - facetField + FacetedLuceneRDD.FacetTextFieldSuffix, - topK, QueryAnalyzer) - } - override def moreLikeThis(fieldName: String, query: String, minTermFreq: Int, minDocFreq: Int, topK: Int) : LuceneRDDResponsePartition = { diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpers.scala b/src/main/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpers.scala index 1b1a60ee..60f47011 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpers.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpers.scala @@ -21,15 +21,11 @@ import java.io.StringReader import org.apache.lucene.analysis.Analyzer import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.lucene.document.Document -import org.apache.lucene.facet.{FacetsCollector, FacetsConfig} -import org.apache.lucene.facet.sortedset.SortedSetDocValuesFacetCounts -import org.apache.lucene.facet.taxonomy.{FastTaxonomyFacetCounts, TaxonomyReader} import org.apache.lucene.index.Term import org.apache.lucene.queries.mlt.MoreLikeThis import org.apache.lucene.queryparser.classic.QueryParser import org.apache.lucene.search._ -import org.zouzias.spark.lucenerdd.aggregate.SparkFacetResultMonoid -import org.zouzias.spark.lucenerdd.models.{SparkFacetResult, SparkScoreDoc} +import org.zouzias.spark.lucenerdd.models.SparkScoreDoc import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -124,38 +120,6 @@ object LuceneQueryHelpers extends Serializable { indexSearcher.search(query, topK).scoreDocs.map(SparkScoreDoc(indexSearcher, _)) } - /** - * Faceted search using [[SortedSetDocValuesFacetCounts]] - * - * @param indexSearcher Index searcher - * @param taxoReader taxonomy reader used for faceted search - * @param searchString Lucene search query string - * @param facetField Facet field name - * @param topK Number of returned documents - * @return - */ - def facetedTextSearch(indexSearcher: IndexSearcher, - taxoReader: TaxonomyReader, - facetsConfig: FacetsConfig, - searchString: String, - facetField: String, - topK: Int, analyzer: Analyzer): SparkFacetResult = { - // Prepare the query - val queryParser = new QueryParser(QueryParserDefaultField, analyzer) - val q: Query = queryParser.parse(searchString) - - // Collect the facets - val fc = new FacetsCollector() - FacetsCollector.search(indexSearcher, q, topK, fc) - val facets = Option(new FastTaxonomyFacetCounts(taxoReader, facetsConfig, fc)) - - // Present the facets - facets match { - case Some(fcts) => SparkFacetResult(facetField, fcts.getTopChildren(topK, facetField)) - case None => SparkFacetResultMonoid.zero(facetField) - } - } - /** * Returns total number of lucene documents * diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/ShapeLuceneRDDKryoRegistrator.scala b/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/ShapeLuceneRDDKryoRegistrator.scala index eb2b5269..420c91eb 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/ShapeLuceneRDDKryoRegistrator.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/ShapeLuceneRDDKryoRegistrator.scala @@ -22,7 +22,7 @@ import org.apache.spark.SparkConf import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types._ -import org.zouzias.spark.lucenerdd.models.{SparkDoc, SparkFacetResult, SparkScoreDoc} +import org.zouzias.spark.lucenerdd.models.{SparkDoc, SparkScoreDoc} import org.zouzias.spark.lucenerdd.spatial.shape.partition.ShapeLuceneRDDPartition @@ -73,7 +73,6 @@ class ShapeLuceneRDDKryoRegistrator extends KryoRegistrator { kryo.register(classOf[scala.collection.immutable.Set$EmptySet$]) kryo.register(classOf[scala.collection.immutable.Map[_, _]]) kryo.register(classOf[Array[scala.collection.immutable.Map[_, _]]]) - kryo.register(classOf[SparkFacetResult]) kryo.register(classOf[SparkScoreDoc]) kryo.register(classOf[TopK[_]]) diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/partition/ShapeLuceneRDDPartition.scala b/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/partition/ShapeLuceneRDDPartition.scala index aaeac337..fb50009d 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/partition/ShapeLuceneRDDPartition.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/partition/ShapeLuceneRDDPartition.scala @@ -29,7 +29,7 @@ import org.zouzias.spark.lucenerdd.query.LuceneQueryHelpers import org.zouzias.spark.lucenerdd.response.LuceneRDDResponsePartition import org.zouzias.spark.lucenerdd.spatial.shape.ShapeLuceneRDD.PointType import org.zouzias.spark.lucenerdd.spatial.shape.strategies.SpatialStrategy -import org.zouzias.spark.lucenerdd.store.IndexWithTaxonomyWriter +import org.zouzias.spark.lucenerdd.store.IndexWritable import scala.reflect._ @@ -42,7 +42,7 @@ private[shape] class ShapeLuceneRDDPartition[K, V] (implicit shapeConversion: K => Shape, docConversion: V => Document) extends AbstractShapeLuceneRDDPartition[K, V] - with IndexWithTaxonomyWriter + with IndexWritable with SpatialStrategy { override def indexAnalyzer(): Analyzer = getAnalyzer(Some(indexAnalyzerName)) @@ -73,7 +73,7 @@ private[shape] class ShapeLuceneRDDPartition[K, V] val doc = docConversion(value) val shape = shapeConversion(key) val docWithLocation = decorateWithLocation(doc, Seq(shape)) - indexWriter.addDocument(FacetsConfig.build(taxoWriter, docWithLocation)) + indexWriter.addDocument(docWithLocation) } private val endTime = new DateTime(System.currentTimeMillis()) logInfo(s"Indexing process completed at ${endTime}...") diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala b/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala index c7e4f292..f8062b50 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala @@ -18,7 +18,6 @@ package org.zouzias.spark.lucenerdd.store import java.nio.file.{Files, Path} -import org.apache.lucene.facet.FacetsConfig import org.apache.lucene.store._ import org.zouzias.spark.lucenerdd.config.Configurable import org.zouzias.spark.lucenerdd.logging.Logging @@ -35,8 +34,6 @@ trait IndexStorable extends Configurable with AutoCloseable with Logging { - protected lazy val FacetsConfig = new FacetsConfig() - private val IndexStoreKey = "lucenerdd.index.store.mode" private val tmpJavaDir = System.getProperty("java.io.tmpdir") @@ -46,14 +43,7 @@ trait IndexStorable extends Configurable private val indexDir = Files.createTempDirectory(indexDirName) - private val taxonomyDirName = - s"taxonomyDirectory-${System.currentTimeMillis()}.${Thread.currentThread().getId}" - - private val taxonomyDir = Files.createTempDirectory(taxonomyDirName) - - protected val IndexDir = storageMode(indexDir) - - protected val TaxonomyDir = storageMode(taxonomyDir) + protected val IndexDir: Directory = storageMode(indexDir) /** * Select Lucene index storage implementation based on config @@ -110,6 +100,5 @@ trait IndexStorable extends Configurable override def close(): Unit = { IndexDir.close() - TaxonomyDir.close() } } diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWithTaxonomyWriter.scala b/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWritable.scala similarity index 86% rename from src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWithTaxonomyWriter.scala rename to src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWritable.scala index 8e51f82c..4400f2a9 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWithTaxonomyWriter.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWritable.scala @@ -23,9 +23,9 @@ import org.apache.lucene.index.{IndexWriter, IndexWriterConfig} import org.zouzias.spark.lucenerdd.analyzers.AnalyzerConfigurable /** - * Index and Taxonomy Writer used for facet queries + * Index writer */ -trait IndexWithTaxonomyWriter extends IndexStorable +trait IndexWritable extends IndexStorable with AnalyzerConfigurable { protected def indexAnalyzer(): Analyzer @@ -34,12 +34,8 @@ trait IndexWithTaxonomyWriter extends IndexStorable new IndexWriterConfig(indexAnalyzer()) .setOpenMode(OpenMode.CREATE)) - protected lazy val taxoWriter = new DirectoryTaxonomyWriter(TaxonomyDir) - protected def closeAllWriters(): Unit = { indexWriter.commit() - taxoWriter.commit() - taxoWriter.close() indexWriter.close() } } diff --git a/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDFacetSpec.scala b/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDFacetSpec.scala deleted file mode 100644 index b26fe3f5..00000000 --- a/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDFacetSpec.scala +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd.facets - -import com.holdenkarau.spark.testing.SharedSparkContext -import org.apache.spark.SparkConf -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} -import org.zouzias.spark.lucenerdd.{LuceneRDD, LuceneRDDKryoRegistrator} - -class FacetedLuceneRDDFacetSpec extends FlatSpec - with Matchers - with BeforeAndAfterEach - with SharedSparkContext { - - override val conf = LuceneRDDKryoRegistrator.registerKryoClasses(new SparkConf(). - setMaster("local[*]"). - setAppName("test"). - set("spark.ui.enabled", "false"). - set("spark.app.id", appID)) - - // Check if sequence is sorted in descending order - def sortedDesc(seq : Seq[Long]) : Boolean = { - if (seq.isEmpty) true else seq.zip(seq.tail).forall(x => x._1 >= x._2) - } - - "FacetedLuceneRDD.facetQuery" should "compute facets correctly" in { - val words = Array("aaa", "aaa", "aaa", "aaa", "bb", "bb", "bb", "cc", "cc") - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val facetResults = luceneRDD.facetQuery("*:*", "_1")._2 - - facetResults.facets.size should equal (3) - facetResults.facets.contains("aaa") should equal (true) - facetResults.facets.get("aaa") - .foreach(value => value should equal (4)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQuery" should "compute facets correctly with ints" in { - val words = Array(10, 10, 10, 10, 22, 22, 22, 33, 33) - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val facetResults = luceneRDD.facetQuery("*:*", "_1")._2 - - facetResults.facets.size should equal (3) - facetResults.facets.contains("10") should equal (true) - facetResults.facets.contains("22") should equal (true) - facetResults.facets.contains("33") should equal (true) - facetResults.facets.get("10").foreach(value => value should equal (4)) - facetResults.facets.get("33").foreach(value => value should equal (2)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQuery" should "compute facets correctly with doubles" in { - val words = Array(10.5D, 10.5D, 10.5D, 10.5D, 22.2D, 22.2D, 22.2D, 33.2D, 33.2D) - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val facetResults = luceneRDD.facetQuery("*:*", "_1")._2 - - facetResults.facets.size should equal (3) - facetResults.facets.contains("10.5") should equal (true) - facetResults.facets.contains("22.2") should equal (true) - facetResults.facets.contains("33.2") should equal (true) - facetResults.facets.get("10.5").foreach(value => value should equal (4)) - facetResults.facets.get("33.2").foreach(value => value should equal (2)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQueries" should "compute facets correctly" in { - val words = Array("aaa", "aaa", "aaa", "aaa", "bb", "bb", "bb", "cc", "cc") - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - - val facetResults = luceneRDD.facetQueries("*:*", Seq("_1"))._2 - - facetResults.contains("_1") should equal(true) - facetResults.foreach(_._2.facets.size should equal (3)) - facetResults.foreach(_._2.facets.contains("aaa") should equal (true)) - facetResults.foreach(_._2.facets.get("aaa").foreach(value => value should equal (4))) - - luceneRDD.close() - } - - "FacetedLuceneRDD.sortedFacets" should "return facets sorted by decreasing order" in { - val words = Array("aaa", "aaa", "aaa", "aaa", "bb", "bb", "bb", "cc", "cc") - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - - val sortedFacetCounts = luceneRDD.facetQuery("*:*", "_1")._2.sortedFacets().map(_._2) - sortedDesc(sortedFacetCounts) should equal(true) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQuery" should "compute facets with prefix search" in { - val words = Array("aaa", "aaa", "aaa", "aaa", "bb", "bb", "bb", "cc", "cc") - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val results = luceneRDD.facetQuery("_1:aa*", "_1") - val facetResults = results._2 - - facetResults.facets.size should equal (1) - facetResults.facets.contains("aaa") should equal (true) - facetResults.facets.get("aaa") - .foreach(value => value should equal (4)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQuery" should "compute facets with term search" in { - val words = Array("aaa", "aaa", "aaa", "aaa", "aaaa", "bb", "bb", "bb", "cc", "cc") - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val results = luceneRDD.facetQuery("_1:aaa", "_1") - val facetResults = results._2 - - facetResults.facets.size should equal (1) - facetResults.facets.contains("aaa") should equal (true) - facetResults.facets.contains("bb") should equal (false) - facetResults.facets.contains("cc") should equal (false) - facetResults.facets.get("aaa") should equal (Some(4)) - - val resultsB = luceneRDD.facetQuery("_1:bb", "_1") - val facetResultsB = resultsB._2 - - facetResultsB.facets.contains("bb") should equal (true) - facetResultsB.facets.get("bb") should equal (Some(3)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQuery" should "compute facets with term search in Tuple2" in { - val words = Array(("aaa", "aaa1"), ("aaa", "aaa2"), ("aaa", "aaa3"), ("aaa", "aaa3"), - ("aaaa", "aaa3"), ("bb", "cc1"), ("bb", "cc1"), ("bb", "cc1"), ("cc", "cc2"), ("cc", "cc2")) - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val results = luceneRDD.facetQuery("_1:aaa", "_2") - val facetResults = results._2 - - facetResults.facets.size should equal (3) - facetResults.facets.contains("aaa1") should equal (true) - facetResults.facets.contains("aaa2") should equal (true) - facetResults.facets.contains("aaa3") should equal (true) - facetResults.facets.get("aaa1") should equal (Some(1)) - facetResults.facets.get("aaa2") should equal (Some(1)) - facetResults.facets.get("aaa3") should equal (Some(2)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.version" should "return project sbt build information" in { - val map = LuceneRDD.version() - map.contains("name") should equal(true) - map.contains("builtAtMillis") should equal(true) - map.contains("scalaVersion") should equal(true) - map.contains("version") should equal(true) - map.contains("sbtVersion") should equal(true) - map.contains("builtAtString") should equal(true) - } - -} diff --git a/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDImplicitsSpec.scala b/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDImplicitsSpec.scala deleted file mode 100644 index e4348292..00000000 --- a/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDImplicitsSpec.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd.facets - -import com.holdenkarau.spark.testing.SharedSparkContext -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} -import org.zouzias.spark.lucenerdd.testing.FavoriteCaseClass -import org.zouzias.spark.lucenerdd.{LuceneRDD, LuceneRDDKryoRegistrator} - -class FacetedLuceneRDDImplicitsSpec extends FlatSpec - with Matchers - with BeforeAndAfterEach - with SharedSparkContext { - - var luceneRDD: LuceneRDD[_] = _ - - - override val conf = LuceneRDDKryoRegistrator.registerKryoClasses(new SparkConf(). - setMaster("local[*]"). - setAppName("test"). - set("spark.ui.enabled", "false"). - set("spark.app.id", appID)) - - override def afterEach() { - luceneRDD.close() - } - - - val elem = Array("fear", "death", "water", "fire", "house") - .zipWithIndex.map{ case (str, index) => - FavoriteCaseClass(str, index, 10L, 12.3F, s"${str}@gmail.com")} - - - "FacetedLuceneRDD(case class).count" should "return correct number of elements" in { - val rdd = sc.parallelize(elem) - val spark = SparkSession.builder().getOrCreate() - import spark.implicits._ - val df = rdd.toDF() - luceneRDD = FacetedLuceneRDD(df) - luceneRDD.count should equal (elem.size) - } - - "FacetedLuceneRDD(case class).fields" should "return all fields" in { - val rdd = sc.parallelize(elem) - val spark = SparkSession.builder().getOrCreate() - import spark.implicits._ - val df = rdd.toDF() - luceneRDD = FacetedLuceneRDD(df) - - luceneRDD.fields().size should equal(5) - luceneRDD.fields().contains("name") should equal(true) - luceneRDD.fields().contains("age") should equal(true) - luceneRDD.fields().contains("myLong") should equal(true) - luceneRDD.fields().contains("myFloat") should equal(true) - luceneRDD.fields().contains("email") should equal(true) - } - - "FacetedLuceneRDD(case class).termQuery" should "correctly search with TermQueries" in { - val rdd = sc.parallelize(elem) - val spark = SparkSession.builder().getOrCreate() - import spark.implicits._ - val df = rdd.toDF() - luceneRDD = FacetedLuceneRDD(df) - - val results = luceneRDD.termQuery("name", "water") - results.count() should equal(1) - } -} diff --git a/src/test/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpersSpec.scala b/src/test/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpersSpec.scala index d9fbf85c..97a4ae14 100644 --- a/src/test/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpersSpec.scala +++ b/src/test/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpersSpec.scala @@ -19,52 +19,44 @@ package org.zouzias.spark.lucenerdd.query import org.apache.lucene.analysis.Analyzer import org.apache.lucene.document.Field.Store import org.apache.lucene.document._ -import org.apache.lucene.facet.FacetField -import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyReader import org.apache.lucene.index.DirectoryReader import org.apache.lucene.search.IndexSearcher import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} -import org.zouzias.spark.lucenerdd.facets.FacetedLuceneRDD -import org.zouzias.spark.lucenerdd.store.IndexWithTaxonomyWriter +import org.zouzias.spark.lucenerdd.store.IndexWritable import scala.io.Source class LuceneQueryHelpersSpec extends FlatSpec - with IndexWithTaxonomyWriter + with IndexWritable with Matchers with BeforeAndAfterEach { // Load cities - val countries = Source.fromFile("src/test/resources/countries.txt").getLines() + val countries: Seq[String] = Source.fromFile("src/test/resources/countries.txt").getLines() .map(_.toLowerCase()).toSeq private val MaxFacetValue: Int = 10 - override def indexAnalyzer: Analyzer = getAnalyzer(Some("en")) + override def indexAnalyzer(): Analyzer = getAnalyzer(Some("en")) countries.zipWithIndex.foreach { case (elem, index) => val doc = convertToDoc(index % MaxFacetValue, elem) - indexWriter.addDocument(FacetsConfig.build(taxoWriter, doc)) + indexWriter.addDocument(doc) } indexWriter.commit() - taxoWriter.close() indexWriter.close() private val indexReader = DirectoryReader.open(IndexDir) private val indexSearcher = new IndexSearcher(indexReader) - private lazy val taxoReader = new DirectoryTaxonomyReader(TaxonomyDir) - private lazy val TestFacetName = s"_2${FacetedLuceneRDD.FacetTextFieldSuffix}" def convertToDoc(pos: Int, text: String): Document = { val doc = new Document() doc.add(new StringField("_1", text, Store.YES)) - doc.add(new FacetField(s"_1${FacetedLuceneRDD.FacetTextFieldSuffix}", text)) doc.add(new IntPoint("_2", pos)) doc.add(new StoredField("_2", pos)) - doc.add(new FacetField(TestFacetName, pos.toString)) doc } @@ -76,14 +68,6 @@ class LuceneQueryHelpersSpec extends FlatSpec LuceneQueryHelpers.totalDocs(indexSearcher) should equal (countries.size) } - "LuceneQueryHelpers.facetedTextSearch" should "return correct facet counts" in { - val facets = LuceneQueryHelpers.facetedTextSearch(indexSearcher, taxoReader, - FacetsConfig, "*:*", TestFacetName, 100, indexAnalyzer) - - facets.facetName should equal(TestFacetName) - facets.facets.size should equal(MaxFacetValue) - } - "LuceneQueryHelpers.termQuery" should "return correct documents" in { val greece = "greece" val topDocs = LuceneQueryHelpers.termQuery(indexSearcher, "_1", greece, 100)