Skip to content

Commit

Permalink
Merge pull request #197 from basho/fix-ts-coverage-partitioner
Browse files Browse the repository at this point in the history
Fix ts coverage partitioner
  • Loading branch information
AleksandrPavlenko authored Sep 7, 2016
2 parents 5103ff4 + 9efe950 commit 654f7cc
Show file tree
Hide file tree
Showing 8 changed files with 403 additions and 106 deletions.
21 changes: 15 additions & 6 deletions connector/src/main/scala/com/basho/riak/spark/query/QueryTS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,34 @@
*/
package com.basho.riak.spark.query

import java.sql.Timestamp
import java.util.concurrent.ExecutionException

import com.basho.riak.client.core.netty.RiakResponseException
import com.basho.riak.client.core.operations.ts.QueryOperation
import com.basho.riak.client.core.query.timeseries.{ Row, ColumnDescription }
import com.basho.riak.client.core.util.BinaryValue
import com.basho.riak.spark.rdd.connector.RiakSession
import com.basho.riak.spark.rdd.{ BucketDef, ReadConf }
import com.basho.riak.client.core.query.timeseries.{ColumnDescription, Row}

import scala.collection.convert.decorateAsScala._
import com.basho.riak.client.core.query.timeseries.CoverageEntry
import com.basho.riak.spark.rdd.connector.RiakConnector
import com.basho.riak.client.core.util.HostAndPort
import com.basho.riak.spark.util.{Dumpable, DumpUtils}

/**
* @author Sergey Galkin <srggal at gmail dot com>
* @since 1.1.0
*/
case class TSQueryData(sql: String, coverageEntry: Option[CoverageEntry] = None) {
case class TSQueryData(sql: String, coverageEntry: Option[CoverageEntry] = None) extends Dumpable {
val primaryHost = coverageEntry.map(e => HostAndPort.fromParts(e.getHost, e.getPort))

override def dump(lineSep: String = "\n"): String = {
val optional = coverageEntry match {
case Some(ce) => lineSep + s"primary-host: ${primaryHost.get.getHost}:${primaryHost.get.getPort}" + lineSep +
"coverage-entry:" + DumpUtils.dump(ce, lineSep + " ")
case None => ""
}

s"sql: {${sql.toLowerCase.replaceAll("\n", "")}}" + optional
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package com.basho.riak.spark.rdd

import com.basho.riak.client.core.util.HostAndPort
import com.basho.riak.spark.util.Dumpable
import org.apache.spark.Partition

trait RiakPartition extends Partition{
trait RiakPartition extends Partition with Dumpable {
def endpoints: Iterable[HostAndPort]
}

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ object PartitioningUtils {
def splitListEvenly[A](list: Seq[A], splitCount: Int): Iterator[Seq[A]] = {
val (base, rem) = divide(list.size, splitCount)
val (smaller, bigger) = list.splitAt(list.size - rem * (base + 1))
smaller.grouped(base) ++ bigger.grouped(base + 1)

if (smaller.isEmpty) {
bigger.grouped(base + 1)
} else {
smaller.grouped(base) ++ bigger.grouped(base + 1)
}
}

// e.g. split 64 coverage entries into 10 partitions: (6,6,6,6,6,6,7,7,7,7) coverage entries in partitions respectively
Expand All @@ -23,11 +28,11 @@ object PartitioningUtils {
yield if (i < rem) base + 1 else base
}

def divide(size: Long, splitCount: Int): (Long, Long) = {
private def divide(size: Long, splitCount: Int): (Long, Long) = {
(size / splitCount, size % splitCount)
}

def divide(size: Int, splitCount: Int): (Int, Int) = {
private def divide(size: Int, splitCount: Int): (Int, Int) = {
(size / splitCount, size % splitCount)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,45 @@
package com.basho.riak.spark.rdd.partitioner

import java.sql.Timestamp
import org.apache.spark.Partition

import org.apache.spark.{Logging, Partition}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{ StructType, TimestampType }
import org.apache.spark.sql.types.StructType
import com.basho.riak.client.core.util.HostAndPort
import com.basho.riak.spark.query.TSQueryData
import com.basho.riak.spark.rdd.{ ReadConf, RiakPartition }
import com.basho.riak.spark.rdd.{ReadConf, RiakPartition}
import com.basho.riak.client.core.netty.RiakResponseException
import com.basho.riak.client.api.commands.timeseries.CoveragePlan
import com.basho.riak.spark.rdd.partitioner.PartitioningUtils._
import com.basho.riak.spark.rdd.connector.RiakConnector

import scala.collection.JavaConversions._
import scala.util.control.Exception._
import com.basho.riak.client.core.query.timeseries.CoverageEntry
import com.basho.riak.spark.util.DumpUtils

/**
* @author Sergey Galkin <srggal at gmail dot com>
*/
case class RiakTSPartition(
index: Int,
endpoints: Iterable[HostAndPort],
queryData: Seq[TSQueryData]) extends RiakPartition
queryData: Seq[TSQueryData]) extends RiakPartition {

override def dump(lineSep: String = "\n"): String =
s"[$index] eps: " + endpoints.foldLeft(new StringBuilder) {
(sb, h) => {
if (!sb.isEmpty) {
sb.append(',').append(' ')
}

sb append h.getHost append (':') append (h.getPort)
}
}.append('\n')
.append(s" queryData (${queryData.size}):\n")
.append(queryData.foldLeft(new StringBuilder) { (sb, qd) => sb.append(" ").append(qd.dump("\n ")).append("\n\n") })
.toString()
}

trait RiakTSPartitioner {

Expand Down Expand Up @@ -193,7 +211,6 @@ object RangedRiakTSPartitioner {
columnNames: Option[Seq[String]], filters: Array[Filter], readConf: ReadConf): RangedRiakTSPartitioner = {
new RiakTSCoveragePlanBasedPartitioner(connector, tableName, schema, columnNames, filters, readConf)
}

}

/** Splits initial range query into readConf.splitCount number of sub-ranges, each in a separate partition */
Expand Down Expand Up @@ -283,7 +300,8 @@ class AutomaticRangedRiakTSPartitioner(connector: RiakConnector, tableName: Stri
}

class RiakTSCoveragePlanBasedPartitioner(connector: RiakConnector, tableName: String, schema: Option[StructType],
columnNames: Option[Seq[String]], filters: Array[Filter], readConf: ReadConf) extends RangedRiakTSPartitioner(tableName, schema, columnNames, filters, readConf) {
columnNames: Option[Seq[String]], filters: Array[Filter], readConf: ReadConf) extends RangedRiakTSPartitioner(tableName, schema, columnNames, filters, readConf)
with Logging {

val where = whereClause(filters)
val (queryRaw, vals) = toSql(columnNames, tableName, schema, where)
Expand All @@ -306,23 +324,51 @@ class RiakTSCoveragePlanBasedPartitioner(connector: RiakConnector, tableName: St
override lazy val tsRangeFieldName = coveragePlan.head.getFieldName

override def partitions(): Array[Partition] = {

val hosts = coveragePlan.hosts

require(splitCount >= hosts.size)
val coverageEntriesCount = coveragePlan.size
val partitionsCount = if (splitCount <= coverageEntriesCount) splitCount else coverageEntriesCount

val evenDistributionBetweenHosts = distributeEvenly(partitionsCount, hosts.size)
if (log.isTraceEnabled()) {
val cp = coveragePlan.foldLeft(new StringBuilder) { (sb, ce) => sb.append( DumpUtils.dump(ce, "\n ")).append("\n\n") }

logTrace("\n----------------------------------------\n" +
s" [Auto TS Partitioner] Requested: split up to $splitCount partitions\n" +
s" Actually: the only $partitionsCount partitions might be created\n" +
"--\n" +
s"Coverage plan ($coverageEntriesCount coverage entries):\n$cp\n" +
"----------------------------------------\n")
}

val evenPartitionDistributionBetweenHosts = distributeEvenly(partitionsCount, hosts.size)

val numberOfEntriesInPartitionPerHost =
(hosts zip evenDistributionBetweenHosts).flatMap { case (h, num) => splitListEvenly(coveragePlan.hostEntries(h), num).map((h, _)) }
(hosts zip evenPartitionDistributionBetweenHosts) flatMap { case (h, num) => splitListEvenly(coveragePlan.hostEntries(h), num) map{(h, _)} }

val partitions = for {
((host, coverageEntries), partitionIdx) <- numberOfEntriesInPartitionPerHost.zipWithIndex
tsQueryData = coverageEntries.map(ce => toTSQueryData(ce.getLowerBound, ce.isLowerBoundInclusive, ce.getUpperBound, ce.isUpperBoundInclusive, Some(ce)))
partition = RiakTSPartition(partitionIdx, hosts.toSet, tsQueryData)
} yield partition

partitions.toArray
val result = partitions.toArray

if (log.isDebugEnabled()) {
val p = result.foldLeft(new StringBuilder) { (sb, r) => sb.append(r.dump()).append("\n") }.toString()

logInfo("\n----------------------------------------\n" +
s" [Auto TS Partitioner] Requested: split up to $splitCount partitions\n" +
s" Actually: the created partitions are:\n" +
"--\n" +
s"$p\n" +
"----------------------------------------\n")
}

// Double check that all coverage entries were used
val numberOfUsedCoverageEntries = partitions.foldLeft(0){ (sum, p) => sum + p.queryData.size}
require( numberOfUsedCoverageEntries == coverageEntriesCount)

result.asInstanceOf[Array[Partition]]
}
}
47 changes: 47 additions & 0 deletions connector/src/main/scala/com/basho/riak/spark/util/Dumpable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright (c) 2015-2016 Basho Technologies, Inc.
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.basho.riak.spark.util

import com.basho.riak.client.core.query.timeseries.CoverageEntry

trait Dumpable {
def dump(lineSep: String = "\n"): String = toString
}


object DumpUtils {

def dump(ce: CoverageEntry, lineSep: String): String = {
val lb = ce.isLowerBoundInclusive match {
case true => "["
case false => "("
}

val ub = ce.isUpperBoundInclusive match {
case true => "]"
case false => ")"
}

s"$lb${ce.getLowerBound},${ce.getUpperBound}$ub@host: ${ce.getHost}:${ce.getPort}" + lineSep +
s"description: ${ce.getDescription}" + lineSep +
s"context: " + {ce.getCoverageContext match {
case null => "null"
case c => c.map("%02X" format _).mkString
}}
}
}
Loading

0 comments on commit 654f7cc

Please sign in to comment.