diff --git a/CHANGES.md b/CHANGES.md index 1e84e51bd5..a6bdf9a5d0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,12 @@ # Scalding # +### Version 0.13.1 ### +* Back out 4 changes to be binary compatible: https://github.com/twitter/scalding/pull/1187 +* Use java.util.Random instead of scala.util.Random: https://github.com/twitter/scalding/pull/1186 +* Add Execution.failed: https://github.com/twitter/scalding/pull/1185 +* Using a ConcurrentHashMap instead of a WeakHashMap to make the Stats behave in a correct manner: https://github.com/twitter/scalding/pull/1184 +* Add applicative for Execution: https://github.com/twitter/scalding/pull/1181 + ### Version 0.13.0 ### * Covert LzoTextDelimited to Cascading scheme.: https://github.com/twitter/scalding/pull/1179 * Make TraceUtil support versions of cascading older than 2.6: https://github.com/twitter/scalding/pull/1180 diff --git a/README.md b/README.md index 25e4308b44..7f86144067 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. ![Scalding Logo](https://raw.github.com/twitter/scalding/develop/logo/scalding.png) -Current version: `0.13.0` +Current version: `0.13.1` ## Word Count diff --git a/scalding-core/src/main/scala/com/twitter/package.scala b/scalding-core/src/main/scala/com/twitter/package.scala index 027aa6acc1..f712d2d353 100644 --- a/scalding-core/src/main/scala/com/twitter/package.scala +++ b/scalding-core/src/main/scala/com/twitter/package.scala @@ -30,10 +30,11 @@ package object scalding { type KeyedList[K, +V] = com.twitter.scalding.typed.KeyedList[K, V] type ValuePipe[+T] = com.twitter.scalding.typed.ValuePipe[T] type Grouped[K, +V] = com.twitter.scalding.typed.Grouped[K, V] + /** * Make sure this is in sync with version.sbt */ - val scaldingVersion: String = "0.13.0" + val scaldingVersion: String = "0.13.1" object RichPathFilter { implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index 6b6586f2e3..a4f3aba941 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -12,7 +12,7 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -*/ + */ package com.twitter.scalding import com.twitter.algebird.monad.Reader @@ -193,6 +193,7 @@ object Execution { override def apply[T](t: T): Execution[T] = Execution.from(t) override def map[T, U](e: Execution[T])(fn: T => U): Execution[U] = e.map(fn) override def flatMap[T, U](e: Execution[T])(fn: T => Execution[U]): Execution[U] = e.flatMap(fn) + override def join[T, U](t: Execution[T], u: Execution[U]): Execution[(T, U)] = t.zip(u) } trait EvalCache { self => @@ -399,6 +400,13 @@ object Execution { case Success(s) => Future.successful(s) case Failure(err) => Future.failed(err) } + + /** + * This creates a definitely failed Execution. + */ + def failed(t: Throwable): Execution[Nothing] = + fromFuture(_ => Future.failed(t)) + /** * This makes a constant execution that runs no job. * Note this is a lazy parameter that is evaluated every diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala b/scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala index c2ec84c64d..19d60874f6 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala @@ -27,7 +27,7 @@ import cascading.operation.filter._ import cascading.tuple._ import cascading.cascade._ -import scala.util.Random +import java.util.Random // this one is serializable, scala.util.Random is not import scala.collection.JavaConverters._ object JoinAlgorithms extends java.io.Serializable { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index 7a753b92e9..69917de209 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -466,7 +466,7 @@ package com.twitter.scalding { } } - class SampleWithReplacement(frac: Double, val seed: Int = new scala.util.Random().nextInt) extends BaseOperation[Poisson]() + class SampleWithReplacement(frac: Double, val seed: Int = new java.util.Random().nextInt) extends BaseOperation[Poisson]() with Function[Poisson] with ScaldingPrepare[Poisson] { override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[Poisson]) { super.prepare(flowProcess, operationCall) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala index 65187ce09f..ce55eb5ff1 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala @@ -2,7 +2,7 @@ package com.twitter.scalding import cascading.flow.{ FlowDef, FlowProcess } import cascading.stats.CascadingStats -import java.util.{ Collections, WeakHashMap } +import java.util.concurrent.ConcurrentHashMap import org.slf4j.{ Logger, LoggerFactory } import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -109,7 +109,7 @@ object RuntimeStats extends java.io.Serializable { @transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass) private val flowMappingStore: mutable.Map[String, WeakReference[FlowProcess[_]]] = - Collections.synchronizedMap(new WeakHashMap[String, WeakReference[FlowProcess[_]]]) + new ConcurrentHashMap[String, WeakReference[FlowProcess[_]]] def getFlowProcessForUniqueId(uniqueId: UniqueID): FlowProcess[_] = { (for { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index 48b6b0eb19..0b2f918dc0 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -28,7 +28,7 @@ import cascading.flow.FlowDef import cascading.pipe.{ Each, Pipe } import cascading.tap.Tap import cascading.tuple.{ Fields, Tuple => CTuple, TupleEntry } -import util.Random +import java.util.Random // prefer to scala.util.Random as this is serializable import scala.concurrent.Future @@ -400,7 +400,7 @@ trait TypedPipe[+T] extends Serializable { */ def sample(percent: Double, seed: Long): TypedPipe[T] = { // Make sure to fix the seed, otherwise restarts cause subtle errors - val rand = new Random(seed) + lazy val rand = new Random(seed) filter(_ => rand.nextDouble < percent) } diff --git a/version.sbt b/version.sbt index 0d39db4919..beb9953b43 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.13.0" +version in ThisBuild := "0.13.1"