diff --git a/futures/src/main/scala/Futures.scala b/futures/src/main/scala/Futures.scala new file mode 100644 index 00000000..632afe1b --- /dev/null +++ b/futures/src/main/scala/Futures.scala @@ -0,0 +1,39 @@ +package dispatch.futures + +import java.util.concurrent.{Executors,Callable,ExecutorService} + +object Futures extends AvailableFutures { + /** Structural type coinciding with scala.actors.Future */ + type Future[T] = Function0[T] { + def isSet: Boolean + } +} +object DefaultFuture extends JucFuture { + lazy val futureExecutor = Executors.newCachedThreadPool +} +trait AvailableFutures { + /** @return values of futures that have completed their processing */ + def available[T](fs: Iterable[Futures.Future[T]]) = fs filter { _.isSet } map { _() } toList +} +/** Interface to futures functionality */ +trait Futures { + def future[T](result: => T): Futures.Future[T] +} +/** A java.util.concurrent Future accessor, executor undefined */ +trait JucFuture extends Futures { + def future[T](result: => T) = new JucFuture(result) + /** Implement to customize the java.util.concurrent Executor, defaults to Executors.newCachedThreadPool */ + val futureExecutor: ExecutorService + /** Wraps java.util.concurrent.Future */ + class JucFuture[T](f: => T) extends Function0[T] { + val delegate = futureExecutor.submit(new Callable[T]{ + def call = f + }) + def isSet = delegate.isDone + def apply() = delegate.get() + } +} +/** Future accessor using a scala.actors future */ +object ActorsFuture extends Futures { + def future[T](result: => T) = scala.actors.Futures.future(result) +} diff --git a/http/src/main/scala/dispatch/Threads.scala b/http/src/main/scala/dispatch/Threads.scala index 298d84f7..72f609e7 100644 --- a/http/src/main/scala/dispatch/Threads.scala +++ b/http/src/main/scala/dispatch/Threads.scala @@ -1,7 +1,5 @@ package dispatch -import java.util.concurrent.{Executors,Callable,ExecutorService} - /** Http with a thread-safe client and non-blocking interfaces */ trait Threads extends Http with FuturableExecutor { override val client = new ThreadSafeHttpClient @@ -9,6 +7,7 @@ trait Threads extends Http with FuturableExecutor { def shutdown() = client.getConnectionManager.shutdown() } trait FuturableExecutor extends HttpExecutor { + import dispatch.futures.Futures /** @return an executor that will call `error` on any exception */ def on_error (error: PartialFunction[Throwable, Unit]) = new FuturableExecutor { val execute = FuturableExecutor.this.execute @@ -21,40 +20,11 @@ trait FuturableExecutor extends HttpExecutor { /** @return an asynchronous Http interface that packs responses through a Threads#Future */ lazy val future = new HttpExecutor { val execute = FuturableExecutor.this.execute - type HttpPackage[T] = Futures#Future[FuturableExecutor.this.HttpPackage[T]] + type HttpPackage[T] = Futures.Future[FuturableExecutor.this.HttpPackage[T]] def pack[T](result: => T) = http_future.future(FuturableExecutor.this.pack(result)) } - def http_future: Futures = DefaultJucFuture -} -/** Accessors for futures used by FuturableExecutor */ -trait Futures { - /** Structural type coinciding with scala.actors.Future */ - type Future[T] = Function0[T] { - def isSet: Boolean - } - def future[T](result: => T): Future[T] -} -/** A java.util.concurrent Future accessor */ -trait JucFuture extends Futures { - def future[T](result: => T) = new JucFuture(result) - /** Implement to customize the java.util.concurrent Executor, defaults to Executors.newCachedThreadPool */ - val futureExecutor: ExecutorService - /** Wraps java.util.concurrent.Future */ - class JucFuture[T](f: => T) extends Function0[T] { - val delegate = futureExecutor.submit(new Callable[T]{ - def call = f - }) - def isSet = delegate.isDone - def apply() = delegate.get() - } -} -/** Future accessor using a cached therad pool */ -object DefaultJucFuture extends JucFuture { - lazy val futureExecutor = Executors.newCachedThreadPool -} -/** Future accessor using a scala.actors future */ -object ActorsFuture extends Futures { - def future[T](result: => T) = scala.actors.Futures.future(result) + /** Override to use any Futures implementation */ + def http_future: Futures = dispatch.futures.DefaultFuture } /** Client with a ThreadSafeClientConnManager */ diff --git a/project/build/DispatchProject.scala b/project/build/DispatchProject.scala index 90d0ef9b..9af69d11 100644 --- a/project/build/DispatchProject.scala +++ b/project/build/DispatchProject.scala @@ -5,7 +5,8 @@ class DispatchProject(info: ProjectInfo) extends ParentProject(info) { override def parallelExecution = true - lazy val http = project("http", "Dispatch HTTP", new HttpProject(_)) + lazy val futures = project("futures", "Dispatch Futures", new DispatchDefault(_)) + lazy val http = project("http", "Dispatch HTTP", new HttpProject(_), futures) lazy val mime = project("mime", "Dispatch Mime", new DispatchDefault(_) { val mime = "org.apache.httpcomponents" % "httpmime" % "4.0.1" }, http)