From 5c139b49dd33d6758c3d6a55a8c8826f6f85b14f Mon Sep 17 00:00:00 2001 From: Cloud Tang Date: Mon, 2 Nov 2015 17:48:34 +0800 Subject: [PATCH] prepare for 0.2 release Refactor code, modify test path --- pom.xml | 26 +- .../pasalab/marlin/matrix/BlockMatrix.scala | 12 +- .../marlin/matrix/DenseVecMatrix.scala | 309 +++++++++--------- .../pasalab/marlin/matrix/MatrixSuite.scala | 19 -- 4 files changed, 177 insertions(+), 189 deletions(-) diff --git a/pom.xml b/pom.xml index 5482b8c..5e34cdf 100644 --- a/pom.xml +++ b/pom.xml @@ -279,9 +279,22 @@ org.apache.maven.plugins maven-surefire-plugin - 2.18 + 2.18.1 - true + + **/Test*.java + **/*Test.java + **/*TestCase.java + **/*Suite.java + + ${project.build.directory}/surefire-reports + + + ${test_classpath} + @@ -291,7 +304,7 @@ ${project.build.directory}/surefire-reports . - ${project.build.directory}/ToonaTestSuite.txt + MarlinTestSuite.txt -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m ${session.executionRootDirectory} @@ -302,6 +315,13 @@ false true + + + ${test_classpath} + diff --git a/src/main/scala/edu/nju/pasalab/marlin/matrix/BlockMatrix.scala b/src/main/scala/edu/nju/pasalab/marlin/matrix/BlockMatrix.scala index 7eda6f3..61e69d4 100644 --- a/src/main/scala/edu/nju/pasalab/marlin/matrix/BlockMatrix.scala +++ b/src/main/scala/edu/nju/pasalab/marlin/matrix/BlockMatrix.scala @@ -80,12 +80,10 @@ class BlockMatrix( val mat = BDM.zeros[Double](m, n) blocks.collect().foreach { case (blkID, matrix) => - val rowStart = blkID.row - val colStart = blkID.column - matrix.activeIterator.foreach { - case ((i, j), v) => - mat(rowStart * mostBlkRowLen + i, colStart * mostBlkColLen + j) = v - } + val rowStart = blkID.row * mostBlkRowLen + val colStart = blkID.column * mostBlkColLen + mat(rowStart until rowStart + matrix.rows, + colStart until colStart + matrix.cols) := matrix } mat } @@ -302,7 +300,7 @@ class BlockMatrix( }else { (blkId.column + 1) * colBlkSize } - (blkId, (blk.asInstanceOf[BDM[Double]] * + (BlockID(blkId.row, 0), (blk.asInstanceOf[BDM[Double]] * Bb.value(startRow until endRow, ::)).asInstanceOf[BDM[Double]]) }.reduceByKey(_ + _) new BlockMatrix(blocks, numRows(), B.cols, numBlksByRow(), numBlksByCol()) diff --git a/src/main/scala/edu/nju/pasalab/marlin/matrix/DenseVecMatrix.scala b/src/main/scala/edu/nju/pasalab/marlin/matrix/DenseVecMatrix.scala index 2001a6e..ab25fc0 100644 --- a/src/main/scala/edu/nju/pasalab/marlin/matrix/DenseVecMatrix.scala +++ b/src/main/scala/edu/nju/pasalab/marlin/matrix/DenseVecMatrix.scala @@ -184,7 +184,7 @@ class DenseVecMatrix( } result } - + /** * Matrix-matrix multiply @@ -196,8 +196,8 @@ class DenseVecMatrix( * @return result in BlockMatrix type */ def multiply(other: DistributedMatrix, - cores: Int, - broadcastThreshold: Int = 300): DistributedMatrix = { + cores: Int, + broadcastThreshold: Int = 300): DistributedMatrix = { require(numCols == other.numRows(), s"Dimension mismatch during matrix-matrix multiplication: ${numCols()} vs ${other.numRows()}") other match { @@ -222,9 +222,9 @@ class DenseVecMatrix( val broadSize = broadcastThreshold * 1024 * 1024 / 8 if (that.numRows() * that.numCols() <= broadSize) { this.multiply(that.toBreeze()) - } else if(this.numRows() * this.numCols() <= broadSize ){ + } else if (this.numRows() * this.numCols() <= broadSize) { that.multiplyBy(this.toBreeze()) - }else{ + } else { val splitMethod = MTUtils.splitMethod(numRows(), numCols(), other.numCols(), cores) multiply(that, splitMethod) @@ -241,9 +241,9 @@ class DenseVecMatrix( require(numRows() == other.numRows(), s"Dimension mismatch: ${numRows()} vs ${other.numRows()}") other match { case that: DenseVecMatrix => - val result = rows.join(that.rows).map(t => { - (t._1, BDV(t._2._1.toArray ++: t._2._2.toArray)) - }) + val result = rows.join(that.rows).mapValues{case(v1, v2) => + BDV(v1.toArray ++: v2.toArray) + } new DenseVecMatrix(result, numRows(), numCols() + that.numCols()) case that: BlockMatrix => val thatDenVec = that.toDenseVecMatrix() @@ -252,7 +252,7 @@ class DenseVecMatrix( throw new IllegalArgumentException("Do not support this type " + that.getClass + " for cBind operation") } } - + /** * multiply a elementary matrix on the left to apply row switching transformations @@ -266,7 +266,7 @@ class DenseVecMatrix( s"Dimension mismatch, row permutation matrix: ${permutation.length} vs $nRows") val index = rows.context.parallelize(permutation.zipWithIndex.toSeq, getClusterCores()) .map(t => (t._1.toLong, t._2.toLong)) - val result = rows.join(index).map(t => (t._2._2, t._2._1)) + val result = rows.join(index).map{case(id1, (v, id2)) => (id2, v)} new DenseVecMatrix(result, numRows(), numCols()) } @@ -282,7 +282,7 @@ class DenseVecMatrix( * * @param mode in which manner should the result be calculated, locally or distributed */ -def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { + def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { require(numRows() == numCols(), s"LU decompose only support square matrix: ${numRows()} v.s ${numCols()}") object LUmode extends Enumeration { @@ -299,7 +299,7 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { case _ => throw new IllegalArgumentException(s"Do not support mode $mode.") } val (luResult: BlockMatrix, perm: Array[Int]) = computeMode match { - case LUmode.LocalBreeze => { + case LUmode.LocalBreeze => val brz = toBreeze() val lu = brzLU(brz) val pArray = (0 until lu._2.length).toArray @@ -308,11 +308,10 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { pArray(i) = pArray(lu._2(i) - 1) pArray(lu._2(i) - 1) = tmp } - val blk = rows.context.parallelize(Seq((new BlockID(0, 0), lu._1)), 1) + val blk = rows.context.parallelize(Seq((BlockID(0, 0), lu._1)), 1) (new BlockMatrix(blk, lu._1.rows, lu._1.cols, 1, 1), pArray) - } - case LUmode.DistSpark => + case LUmode.DistSpark => val subMatrixBaseSize = rows.context.getConf.getInt("marlin.lu.basesize", 1000) val numBlksByRow, numBlksByCol = math.ceil(numRows().toDouble / subMatrixBaseSize.toDouble).toInt val subMatrixBase = math.ceil(numRows().toDouble / numBlksByRow.toDouble).toInt @@ -394,44 +393,43 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { val matThirdEmit = matThird .mapPartitions({ - iter => - iter.flatMap(t => { - val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum) - for (j <- 0 until nSplitNum) { - //val seq = t._1.row * nSplitNum * kSplitNum + (j+i+1) * kSplitNum + t._1.column - val seq = 0 - array(j) = (new BlockID(t._1.row, (j + i + 1), seq), t._2) + iter => + iter.flatMap { case (blkId, blk) => + val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum) + for (j <- 0 until nSplitNum) { + //val seq = t._1.row * nSplitNum * kSplitNum + (j+i+1) * kSplitNum + t._1.column + val seq = 0 + array(j) = (BlockID(blkId.row, (j + i + 1), seq), blk) + } + array } - array - }) - }) //.partitionBy(partitioner) + }) //.partitionBy(partitioner) val matSecondEmit = matSecond .mapPartitions({ - iter => - iter.flatMap(t => { - val array = Array.ofDim[(BlockID, BDM[Double])](mSplitNum) - for (j <- 0 until mSplitNum) { - //val seq = (j + i + 1) * nSplitNum * kSplitNum + t._1.column * kSplitNum + t._1.row - val seq = 0 - array(j) = (new BlockID(j + i + 1, t._1.column, seq), t._2) + iter => + iter.flatMap { case (blkId, blk) => + val array = Array.ofDim[(BlockID, BDM[Double])](mSplitNum) + for (j <- 0 until mSplitNum) { + val seq = 0 + array(j) = (BlockID(j + i + 1, blkId.column, seq), blk) + } + array } - array - }) - }) //.partitionBy(partitioner) + }) //.partitionBy(partitioner) println("matThirdEmit and matSecondEmit same partitioner? " + (matSecondEmit.partitioner == matThirdEmit.partitioner).toString) val mult = matThirdEmit.join(matSecondEmit, partitioner) - .mapValues(t => { - val mat = (t._1.asInstanceOf[BDM[Double]] * (bdata.value \ t._2.asInstanceOf[BDM[Double]])) - .asInstanceOf[BDM[Double]] - (mat) - }) //.partitionBy(partitioner) + .mapValues { case (blk1, blk2) => { + val mat = (blk1.asInstanceOf[BDM[Double]] * (bdata.value \ blk2.asInstanceOf[BDM[Double]])) + .asInstanceOf[BDM[Double]] + mat + } + } //.partitionBy(partitioner) blkMat = matForth .join(mult, partitioner).mapValues(t => t._1 - t._2) //.partitionBy(partitioner).cache() .cache() - //println(blkMat.toDebugString) } } @@ -443,32 +441,31 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { val bpArray = blkMat.context.broadcast(pArray) blkMat = blkMat.mapPartitions(iter => - iter.map(block => - if (block._1.row > block._1.column) { + iter.map { case (blkId, blk) => + if (blkId.row > blkId.column) { val array = bpArray.value - .slice(subMatrixBase * block._1.row, - if (block._1.row == numBlksByRow - 1) { + .slice(subMatrixBase * blkId.row, + if (blkId.row == numBlksByRow - 1) { numRows().toInt } else { - subMatrixBase * block._1.row + subMatrixBase + subMatrixBase * blkId.row + subMatrixBase }) val permutation = BDM.zeros[Double](array.length, array.length) for (j <- 0 until array.length) { - permutation.update(j, array(j) - subMatrixBase * block._1.row, 1.0) + permutation.update(j, array(j) - subMatrixBase * blkId.row, 1.0) } - (block._1, permutation * block._2) + (blkId, permutation * blk) } else { - block + (blkId, blk) } - ), true).cache() + }, true).cache() val result = new BlockMatrix(blkMat, numRows(), numCols(), numBlksByRow, numBlksByCol) // println("cnt:" + result.blocks.count()) (result, pArray) } - (luResult, perm) } @@ -500,9 +497,9 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { case LUmode.LocalBreeze => val brz = toBreeze() val l = brzCholesky(brz) - val blk = rows.context.parallelize(Seq((new BlockID(0, 0), l)), 1) - (new BlockMatrix(blk, l.rows, l.cols, 1, 1)) - case LUmode.DistSpark => { + val blk = rows.context.parallelize(Seq((BlockID(0, 0), l)), 1) + new BlockMatrix(blk, l.rows, l.cols, 1, 1) + case LUmode.DistSpark => val subMatrixBaseSize = rows.context.getConf.getInt("marlin.cholesky.basesize", 1000) val numBlksByRow, numBlksByCol = math.ceil(numRows().toDouble / subMatrixBaseSize.toDouble).toInt val subMatrixBase = math.ceil(numRows().toDouble / numBlksByRow.toDouble).toInt @@ -512,7 +509,7 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { blkMat.cache() println("numBlkByRow is: " + numBlksByRow) - println("original partitioner: " + partitioner.toString()) + println("original partitioner: " + partitioner.toString) val scatterRdds = Array.ofDim[RDD[(BlockID, BDM[Double])]](numBlksByRow - 1, 2) for (i <- 0 until numBlksByRow) { @@ -537,19 +534,19 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { val nSplitNum = numBlksByRow - (i + 1) val mult = scatterRdds(i)(1) .mapPartitions({ - iter => - iter.flatMap(t => { - val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum + 1) - for (j <- 0 until array.length) { - if (j < t._1.row - i) { - array(j) = (new BlockID(t._1.row, j + i + 1, 0), t._2) - } else { - array(j) = (new BlockID(i + j, t._1.row, 0), t._2.t) + iter => + iter.flatMap{case(blkId, blk) => + val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum + 1) + for (j <- 0 until array.length) { + if (j < blkId.row - i) { + array(j) = (BlockID(blkId.row, j + i + 1, 0), blk) + } else { + array(j) = (BlockID(i + j, blkId.row, 0), blk.t) + } } + array } - array - }) - }).reduceByKey(partitioner, (a, b) => if (a.isTranspose) b * a else a * b) + }).reduceByKey(partitioner, (a, b) => if (a.isTranspose) b * a else a * b) blkMat = blksForth.join(mult, partitioner).mapValues(t => t._1 - t._2).cache() } @@ -560,7 +557,6 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { } //blkMat.partitionBy(partitioner) new BlockMatrix(blkMat) - } } println("row:" + luResult.blocks.count()) luResult @@ -588,14 +584,12 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { case _ => throw new IllegalArgumentException(s"Do not support mode $mode.") } val luResult: BlockMatrix = computeMode match { - case LUmode.LocalBreeze => { + case LUmode.LocalBreeze => val mat = toBreeze() val inverse = brzInv(mat) - val blk = rows.context.parallelize(Seq((new BlockID(0, 0), inverse)), 1) + val blk = rows.context.parallelize(Seq((BlockID(0, 0), inverse)), 1) new BlockMatrix(blk, inverse.rows, inverse.cols, 1, 1) - } - - case LUmode.DistSpark => { + case LUmode.DistSpark => val subMatrixBaseSize = rows.context.getConf.getInt("marlin.inverse.basesize", 1000) val numBlksByRow, numBlksByCol = math.ceil(numRows().toDouble / subMatrixBaseSize.toDouble).toInt val subMatrixBase = math.ceil(numRows().toDouble / numBlksByRow.toDouble).toInt @@ -645,40 +639,38 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { val matThirdEmit = matThird .mapPartitions({ - iter => - iter.flatMap(t => { - val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum) - for (j <- 0 until nSplitNum) { - //val seq = t._1.row * nSplitNum * kSplitNum + (j+i+1) * kSplitNum + t._1.column - val seq = 0 - array(j) = (new BlockID(t._1.row, (j + i + 1), seq), t._2) - } - array - }) - }) //.partitionBy(partitioner) + iter => + iter.flatMap(t => { + val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum) + for (j <- 0 until nSplitNum) { + val seq = 0 + array(j) = (BlockID(t._1.row, (j + i + 1), seq), t._2) + } + array + }) + }) //.partitionBy(partitioner) val matSecondEmit = matSecond .mapPartitions({ - iter => - iter.flatMap(t => { - val array = Array.ofDim[(BlockID, BDM[Double])](mSplitNum) - for (j <- 0 until mSplitNum) { - //val seq = (j + i + 1) * nSplitNum * kSplitNum + t._1.column * kSplitNum + t._1.row - val seq = 0 - array(j) = (new BlockID(j + i + 1, t._1.column, seq), t._2) + iter => + iter.flatMap{case(blkId, blk) => + val array = Array.ofDim[(BlockID, BDM[Double])](mSplitNum) + for (j <- 0 until mSplitNum) { + val seq = 0 + array(j) = (BlockID(j + i + 1, blkId.column, seq), blk) + } + array } - array - }) - }) //.partitionBy(partitioner) + }) //.partitionBy(partitioner) println("matThirdEmit and matSecondEmit same partitioner? " + (matSecondEmit.partitioner == matThirdEmit.partitioner).toString) val mult = matThirdEmit.join(matSecondEmit, partitioner) - .mapValues(t => { - val mat = (t._1.asInstanceOf[BDM[Double]] * binv.value * t._2.asInstanceOf[BDM[Double]]) - .asInstanceOf[BDM[Double]] - (mat) - }) //.partitionBy(partitioner) + .mapValues{case(blk1, blk2) => + val mat = (blk1.asInstanceOf[BDM[Double]] * binv.value * blk2.asInstanceOf[BDM[Double]]) + .asInstanceOf[BDM[Double]] + mat + } //.partitionBy(partitioner) blkMat = matForth .join(mult, partitioner).mapValues(t => t._1 - t._2) //.partitionBy(partitioner).cache() .cache() @@ -696,77 +688,75 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { val FourthEmit = blkMat.mapPartitions({ iter => - iter.flatMap(t => { + iter.flatMap { case (blkId, blk) => val array = Array.ofDim[(BlockID, BDM[Double])](n) for (j <- 0 until array.length) { - //val seq = t._1.row * k * n + (i+j) * k + t._1.column - val seq = t._1.row * numBlksByRow * numBlksByCol + (i + j) * numBlksByCol + t._1.column - array(j) = (new BlockID(t._1.row, (i + j), seq), t._2) + val seq = blkId.row * numBlksByRow * numBlksByCol + (i + j) * numBlksByCol + blkId.column + array(j) = (BlockID(blkId.row, i + j, seq), blk) } array - }) + } }) val ThirdEmit = thirdMat.mapPartitions({ iter => - iter.flatMap(t => { + iter.flatMap { case (blkId, blk) => val array = Array.ofDim[(BlockID, BDM[Double])](m) for (j <- 0 until array.length) { - //val seq = (i + 1 + j) * k * n + t._1.column * k + t._1.row - val seq = (i + 1 + j) * numBlksByRow * numBlksByCol + t._1.column * numBlksByCol + t._1.row - array(j) = (new BlockID((i + j + 1), t._1.column, seq), t._2) + val seq = (i + 1 + j) * numBlksByRow * numBlksByCol + blkId.column * numBlksByCol + blkId.row + array(j) = (BlockID((i + j + 1), blkId.column, seq), blk) } array - }) + } }) val multThird = FourthEmit.join(ThirdEmit) - .map(t => { - val mat = (t._2._1.asInstanceOf[BDM[Double]] * t._2._2.asInstanceOf[BDM[Double]]) - .asInstanceOf[BDM[Double]] - (new BlockID(t._1.row, t._1.column), mat) - }).reduceByKey(_ + _) //.partitionBy(partitioner) + .map { case (blkId, (blk1, blk2)) => + val mat = (blk1.asInstanceOf[BDM[Double]] * blk2.asInstanceOf[BDM[Double]]) + .asInstanceOf[BDM[Double]] + (BlockID(blkId.row, blkId.column), mat) + }.reduceByKey(_ + _) //.partitionBy(partitioner) multThird.count() m = 1 n = numBlksByRow - i - 1 k = n val SecondEmit = secondMat.mapPartitions(iter => - iter.flatMap(t => { + iter.flatMap { case (blkId, blk) => val array = Array.ofDim[(BlockID, BDM[Double])](n) for (j <- 0 until array.length) { - val seq = t._1.row * k * n + (i + j + 1) * k + t._1.column - array(j) = (new BlockID(t._1.row, i + j + 1, seq), t._2) + val seq = blkId.row * k * n + (i + j + 1) * k + blkId.column + array(j) = (BlockID(blkId.row, i + j + 1, seq), blk) } array - })) + }) val FourthEmit2 = blkMat.mapPartitions(iter => - iter.flatMap(t => { + iter.flatMap { case (blkId, blk) => val array = Array.ofDim[(BlockID, BDM[Double])](m) for (j <- 0 until array.length) { - val seq = (i + j) * k * n + t._1.column * k + t._1.row - array(j) = (new BlockID(i + j, t._1.column, seq), t._2) + val seq = (i + j) * k * n + blkId.column * k + blkId.row + array(j) = (BlockID(i + j, blkId.column, seq), blk) } array - })) + }) val multSecond = SecondEmit.join(FourthEmit2) - .map(t => { - val mat = (t._2._1.asInstanceOf[BDM[Double]] * t._2._2.asInstanceOf[BDM[Double]]) - .asInstanceOf[BDM[Double]] - (new BlockID(t._1.row, t._1.column), mat) - }).reduceByKey(_ + _) //.partitionBy(partitioner) + .map { case (blkId, (blk1, blk2)) => + val mat = (blk1.asInstanceOf[BDM[Double]] * blk2.asInstanceOf[BDM[Double]]) + .asInstanceOf[BDM[Double]] + (BlockID(blkId.row, blkId.column), mat) + }.reduceByKey(_ + _) //.partitionBy(partitioner) val multFirst = scatterRdds(i)(1) - .map(t => (new BlockID(t._1.column, t._1.row), t._2)) + .map { case (blkId, blk) => (BlockID(blkId.column, blkId.row), blk) } .join(multThird) .mapPartitions(iter => - iter.map(t => { - val mat = (t._2._1.asInstanceOf[BDM[Double]] * t._2._2.asInstanceOf[BDM[Double]]) - .asInstanceOf[BDM[Double]] - (new BlockID(i, i), mat) - })) + iter.map { case (blkId, (blk1, blk2)) => + val mat = (blk1.asInstanceOf[BDM[Double]] * blk2.asInstanceOf[BDM[Double]]) + .asInstanceOf[BDM[Double]] + (BlockID(i, i), mat) + }) .reduceByKey(_ + _) .join(scatterRdds(i)(0)) .mapValues(t => t._1 + t._2) @@ -776,7 +766,6 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { .partitionBy(partitioner).cache() } new BlockMatrix(blkMat, numRows(), numCols(), numBlksByRow, numBlksByCol) - } } luResult } @@ -788,18 +777,18 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { */ final def add(other: DistributedMatrix): DenseVecMatrix = { other match { - case that: DenseVecMatrix => + case that: DenseVecMatrix => require(numRows() == that.numRows(), s"Dimension mismatch: ${numRows()} vs ${that.numRows()}") require(numCols() == that.numCols, s"Dimension mismatch: ${numCols()} vs ${that.numCols()}") val result = rows.join(that.rows).mapPartitions(iter => { - iter.map(t => - (t._1, ((t._2._1 + t._2._2).asInstanceOf[BDV[Double]]))) + iter.map{case(id, (v1, v2)) => + (id, ((v1 + v2).asInstanceOf[BDV[Double]]))} }, true) new DenseVecMatrix(result, numRows(), numCols()) - case that: BlockMatrix => + case that: BlockMatrix => add(that.toDenseVecMatrix()) - case that: DistributedMatrix => + case that: DistributedMatrix => throw new IllegalArgumentException("Do not support this type " + that.getClass + "for add operation") } @@ -924,7 +913,7 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { require(numRows() == other.numRows(), s"row dimension mismatch ${numRows()} vs ${other.numRows()}") require(numCols() == other.numCols(), s"column dimension mismatch ${numCols()} vs ${other.numCols()}") other match { - case that: DenseVecMatrix => + case that: DenseVecMatrix => val result = rows.join(that.rows).mapPartitions(iter => { iter.map(t => { val array = t._2._1.toArray.zip(t._2._2.toArray).map(x => x._1 * x._2) @@ -932,7 +921,7 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { }) }, true) new DenseVecMatrix(result, numRows(), numCols()) - case that: BlockMatrix => + case that: BlockMatrix => dotProduct(that.toDenseVecMatrix()) } } @@ -1298,7 +1287,7 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { val (index, vec) = iterator.next() mat(index.toInt - blockRow * mBlockRowSize, ::) := vec.t } - (new BlockID(blockRow, 0), mat) + (BlockID(blockRow, 0), mat) } }) new BlockMatrix(result, numRows(), numCols(), blksByRow, blksByCol) @@ -1322,25 +1311,25 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = { } }).groupByKey() .mapPartitions(iter => - iter.map { case (blkId, iterable) => - val colBase = blkId.column * mBlockColSize - val rowBase = blkId.row * mBlockRowSize - var smRows = mBlockRowSize - if ((rowBase + mBlockRowSize - 1) >= mRows) { - smRows = mRows - rowBase - } - var smCols = mBlockColSize - if ((colBase + mBlockColSize - 1) >= mColumns) { - smCols = mColumns - colBase - } - val mat = BDM.zeros[Double](smRows, smCols) - val iterator = iterable.iterator - while (iterator.hasNext) { - val (index, vector) = iterator.next() - mat((index - rowBase).toInt, ::) := vector.t - } - (blkId, mat) - }, true) + iter.map { case (blkId, iterable) => + val colBase = blkId.column * mBlockColSize + val rowBase = blkId.row * mBlockRowSize + var smRows = mBlockRowSize + if ((rowBase + mBlockRowSize - 1) >= mRows) { + smRows = mRows - rowBase + } + var smCols = mBlockColSize + if ((colBase + mBlockColSize - 1) >= mColumns) { + smCols = mColumns - colBase + } + val mat = BDM.zeros[Double](smRows, smCols) + val iterator = iterable.iterator + while (iterator.hasNext) { + val (index, vector) = iterator.next() + mat((index - rowBase).toInt, ::) := vector.t + } + (blkId, mat) + }, true) new BlockMatrix(result, numRows(), numCols(), blksByRow, blksByCol) } } diff --git a/src/test/scala/edu/nju/pasalab/marlin/matrix/MatrixSuite.scala b/src/test/scala/edu/nju/pasalab/marlin/matrix/MatrixSuite.scala index d8740c4..b77d17b 100644 --- a/src/test/scala/edu/nju/pasalab/marlin/matrix/MatrixSuite.scala +++ b/src/test/scala/edu/nju/pasalab/marlin/matrix/MatrixSuite.scala @@ -147,7 +147,6 @@ class MatrixSuite extends FunSuite with LocalSparkContext { test("generate random sparse matrix"){ val s = MTUtils.randomSpaVecMatrix(sc, 10, 8, 0.3) -// s.rows.collect().foreach{ case(a, b) => println(b.data.mkString(","))} assert(s.nRows == 10L) } @@ -196,8 +195,6 @@ class MatrixSuite extends FunSuite with LocalSparkContext { assert(mat.multiply(2).toBreeze() === addSelf) assert(mat.divide(2).toBreeze() === divide2) val ma = new BlockMatrix(blocks) - // val denVecMat = new DenseVecMatrix(rows) - assert(ma.add(1).toBreeze() === eleAdd1) assert(ma.add(ma).toBreeze() === addSelf) assert(ma.add(mat).toBreeze() === addSelf) @@ -226,16 +223,6 @@ class MatrixSuite extends FunSuite with LocalSparkContext { assert(mat.getSubMatrix(1, 2, 1, 2).toBreeze() === sub1212) } -// test("DenseVecMatrix multiply a DenseVecMatrix in block-approach") { -// val mat = new DenseVecMatrix(indexRows) -// val result = mat.multiplyHama(mat, 2) -// val blkSeq = result.blocks.collect().toSeq -// assert(blkSeq.contains(new BlockID(0, 0), BDM((11.0, 10.0), (23.0, 24.0)))) -// assert(blkSeq.contains(new BlockID(0, 1), BDM((9.0, 8.0), (25.0, 26.0)))) -// assert(blkSeq.contains(new BlockID(1, 0), BDM((7.0, 11.0), (6.0, 7.0)))) -// assert(blkSeq.contains(new BlockID(1, 1), BDM((15.0, 19.0), (8.0, 9.0)))) -// } - test("DenseVecMatrix multiply a DenseVecMatrix, and select broadcast-approach") { val mat = new DenseVecMatrix(indexRows) val result = mat.multiply(mat, 2) @@ -436,10 +423,6 @@ class MatrixSuite extends FunSuite with LocalSparkContext { val blk1 = mat.toBlockMatrix(2, 2) val blk2 = mat.toBlockMatrix(1, 4) val m = blk1.toBlockMatrix(2, 1) - println(s"this toBreeze: ${m.toBreeze()}") - println(s"other toBreeze: ${blk2.toBreeze()}") - println(s"block matrix info: numRows: ${m.numRows()}, numCols: ${m.numCols()}, " + - s"blksByRow: ${m.numBlksByRow()}, blksByCol: ${m.numBlksByCol()}") val result = m.multiply(blk2) val expected = BDM( (11.0, 10.0, 9.0, 8.0), @@ -462,9 +445,7 @@ class MatrixSuite extends FunSuite with LocalSparkContext { (7.0, 11.0, 15.0, 19.0), (6.0, 7.0, 8.0, 9.0)) val result = blkMat.multiply(local) -// result.print() assert(result.toBreeze() === expected) - } }