Skip to content

Commit

Permalink
Block linkage: allow a block linker with Row to Query (#154)
Browse files Browse the repository at this point in the history
* [linkage] block linker with => Query

* [linkage] block linker is Row => Query

* remove Query analyzer on methods
  • Loading branch information
zouzias committed Mar 11, 2019
1 parent 09d1b3a commit d35684a
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 14 deletions.
12 changes: 6 additions & 6 deletions src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ object LuceneRDD extends Versionable
*
* @param queries Queries / entities to be linked with @corpus
* @param entities DataFrame of entities to be linked with queries parameter
* @param rowToQueryString Converts each [[Row]] to a 'Lucene Query Syntax'
* @param rowToQuery Function[Row, Query] that converts [[Row]] to a Lucene [[Query]]
* @param queryPartColumns List of query columns for [[HashPartitioner]]
* @param entityPartColumns List of entity columns for [[HashPartitioner]]
* @param topK Number of linked results
Expand All @@ -481,7 +481,7 @@ object LuceneRDD extends Versionable
*/
def blockEntityLinkage(queries: DataFrame,
entities: DataFrame,
rowToQueryString: Row => String,
rowToQuery: Row => Query,
queryPartColumns: Array[String],
entityPartColumns: Array[String],
topK : Int = 3,
Expand Down Expand Up @@ -521,7 +521,7 @@ object LuceneRDD extends Versionable
queryAnalyzer, similarity)

// Multi-query lucene index
qs.map(q => (q, lucenePart.query(rowToQueryString(q), topK).results.toArray))
qs.map(q => (q, lucenePart.query(rowToQuery(q), topK).results.toArray))
}
}
}
Expand All @@ -530,7 +530,7 @@ object LuceneRDD extends Versionable
* Deduplication via blocking
*
* @param entities Entities [[DataFrame]] to deduplicate
* @param rowToQueryString Function that maps [[Row]] to Lucene Query String
* @param rowToQuery Function that maps [[Row]] to Lucene [[Query]]
* @param blockingColumns Columns on which exact match is required
* @param topK Number of top-K query results
* @param indexAnalyzer Lucene analyzer at index time
Expand All @@ -541,7 +541,7 @@ object LuceneRDD extends Versionable
* @return
*/
def blockDedup(entities: DataFrame,
rowToQueryString: Row => String,
rowToQuery: Row => Query,
blockingColumns: Array[String],
topK : Int = 3,
indexAnalyzer: String = getOrElseEn(IndexAnalyzerConfigName),
Expand Down Expand Up @@ -575,7 +575,7 @@ object LuceneRDD extends Versionable
queryAnalyzer, similarity)

// Multi-query lucene index
iterQueries.map(q => (q, lucenePart.query(rowToQueryString(q), topK).results.toArray))
iterQueries.map(q => (q, lucenePart.query(rowToQuery(q), topK).results.toArray))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.zouzias.spark.lucenerdd.partition

import org.apache.lucene.search.BooleanClause
import org.apache.lucene.search.{BooleanClause, Query}
import org.zouzias.spark.lucenerdd.models.indexstats.IndexStatistics
import org.zouzias.spark.lucenerdd.models.{SparkFacetResult, TermVectorEntry}
import org.zouzias.spark.lucenerdd.response.LuceneRDDResponsePartition
Expand Down Expand Up @@ -62,6 +62,16 @@ private[lucenerdd] abstract class AbstractLuceneRDDPartition[T] extends Serializ
*/
def query(searchString: String, topK: Int): LuceneRDDResponsePartition


/**
* Lucene search using Lucene [[Query]]
* @param query Lucene query, i.e., [[org.apache.lucene.search.BooleanQuery]] or
* [[org.apache.lucene.search.PhraseQuery]]
* @param topK Number of documents to return
* @return
*/
def query(query: Query, topK: Int): LuceneRDDResponsePartition

/**
* Multiple generic Lucene Queries using QueryParser
* @param searchString Lucene query string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ private[lucenerdd] class LuceneRDDPartition[T]
LuceneRDDResponsePartition(results.toIterator)
}

override def query(query: Query,
topK: Int): LuceneRDDResponsePartition = {
val results = LuceneQueryHelpers.searchQuery(indexSearcher, query, topK)

LuceneRDDResponsePartition(results.toIterator)
}

override def queries(searchStrings: Iterable[String],
topK: Int): Iterable[(String, LuceneRDDResponsePartition)] = {
searchStrings.map( searchString =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,35 @@ object LuceneQueryHelpers extends Serializable {
*
* @param indexSearcher Index searcher
* @param searchString Lucene search query string
* @param topK Number of returned documents
* @param topK Number of documents to return
* @param analyzer Lucene Analyzer
* @return
*/
def searchParser(indexSearcher: IndexSearcher,
searchString: String,
topK: Int, analyzer: Analyzer)
topK: Int,
analyzer: Analyzer)
: Seq[SparkScoreDoc] = {
val q = parseQueryString(searchString, analyzer)
indexSearcher.search(q, topK).scoreDocs.map(SparkScoreDoc(indexSearcher, _))
}

/**
* Lucene search using a Lucene [[Query]]
*
* Important: Query analysis is done during the definition of query
* @param indexSearcher Lucene index searcher
* @param query Lucene query
* @param topK Number of documents to return
* @return
*/
def searchQuery(indexSearcher: IndexSearcher,
query: Query,
topK: Int)
: Seq[SparkScoreDoc] = {
indexSearcher.search(query, topK).scoreDocs.map(SparkScoreDoc(indexSearcher, _))
}

/**
* Faceted search using [[SortedSetDocValuesFacetCounts]]
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.zouzias.spark.lucenerdd

import com.holdenkarau.spark.testing.SharedSparkContext
import org.apache.lucene.index.Term
import org.apache.lucene.search.{Query, TermQuery}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
Expand Down Expand Up @@ -44,10 +46,11 @@ class BlockingDedupSpec extends FlatSpec
}
val df = sc.parallelize(people).repartition(2).toDF()

val linker: Row => String = { row =>
val linker: Row => Query = { row =>
val name = row.getString(row.fieldIndex("name"))
val term = new Term("name", name)

s"name:$name"
new TermQuery(term)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.zouzias.spark.lucenerdd

import com.holdenkarau.spark.testing.SharedSparkContext
import org.apache.lucene.index.Term
import org.apache.lucene.search.{Query, TermQuery}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
Expand Down Expand Up @@ -52,11 +54,12 @@ class BlockingLinkageSpec extends FlatSpec
val leftDF = sc.parallelize(peopleLeft).repartition(2).toDF()
val rightDF = sc.parallelize(peopleRight).repartition(3).toDF()


val linker: Row => String = { row =>
// Define a Lucene Term linker
val linker: Row => Query = { row =>
val name = row.getString(row.fieldIndex("name"))
val term = new Term("name", name)

s"name:$name"
new TermQuery(term)
}


Expand Down

0 comments on commit d35684a

Please sign in to comment.