Skip to content

Commit

Permalink
extract futures interface into module
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Hamblen committed Feb 6, 2010
1 parent f8760fc commit 5ebe2c0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 35 deletions.
39 changes: 39 additions & 0 deletions futures/src/main/scala/Futures.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package dispatch.futures

import java.util.concurrent.{Executors,Callable,ExecutorService}

object Futures extends AvailableFutures {
/** Structural type coinciding with scala.actors.Future */
type Future[T] = Function0[T] {
def isSet: Boolean
}
}
object DefaultFuture extends JucFuture {
lazy val futureExecutor = Executors.newCachedThreadPool
}
trait AvailableFutures {
/** @return values of futures that have completed their processing */
def available[T](fs: Iterable[Futures.Future[T]]) = fs filter { _.isSet } map { _() } toList
}
/** Interface to futures functionality */
trait Futures {
def future[T](result: => T): Futures.Future[T]
}
/** A java.util.concurrent Future accessor, executor undefined */
trait JucFuture extends Futures {
def future[T](result: => T) = new JucFuture(result)
/** Implement to customize the java.util.concurrent Executor, defaults to Executors.newCachedThreadPool */
val futureExecutor: ExecutorService
/** Wraps java.util.concurrent.Future */
class JucFuture[T](f: => T) extends Function0[T] {
val delegate = futureExecutor.submit(new Callable[T]{
def call = f
})
def isSet = delegate.isDone
def apply() = delegate.get()
}
}
/** Future accessor using a scala.actors future */
object ActorsFuture extends Futures {
def future[T](result: => T) = scala.actors.Futures.future(result)
}
38 changes: 4 additions & 34 deletions http/src/main/scala/dispatch/Threads.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package dispatch

import java.util.concurrent.{Executors,Callable,ExecutorService}

/** Http with a thread-safe client and non-blocking interfaces */
trait Threads extends Http with FuturableExecutor {
override val client = new ThreadSafeHttpClient
/** Shutdown connection manager, threads. (Needed to close console cleanly.) */
def shutdown() = client.getConnectionManager.shutdown()
}
trait FuturableExecutor extends HttpExecutor {
import dispatch.futures.Futures
/** @return an executor that will call `error` on any exception */
def on_error (error: PartialFunction[Throwable, Unit]) = new FuturableExecutor {
val execute = FuturableExecutor.this.execute
Expand All @@ -21,40 +20,11 @@ trait FuturableExecutor extends HttpExecutor {
/** @return an asynchronous Http interface that packs responses through a Threads#Future */
lazy val future = new HttpExecutor {
val execute = FuturableExecutor.this.execute
type HttpPackage[T] = Futures#Future[FuturableExecutor.this.HttpPackage[T]]
type HttpPackage[T] = Futures.Future[FuturableExecutor.this.HttpPackage[T]]
def pack[T](result: => T) = http_future.future(FuturableExecutor.this.pack(result))
}
def http_future: Futures = DefaultJucFuture
}
/** Accessors for futures used by FuturableExecutor */
trait Futures {
/** Structural type coinciding with scala.actors.Future */
type Future[T] = Function0[T] {
def isSet: Boolean
}
def future[T](result: => T): Future[T]
}
/** A java.util.concurrent Future accessor */
trait JucFuture extends Futures {
def future[T](result: => T) = new JucFuture(result)
/** Implement to customize the java.util.concurrent Executor, defaults to Executors.newCachedThreadPool */
val futureExecutor: ExecutorService
/** Wraps java.util.concurrent.Future */
class JucFuture[T](f: => T) extends Function0[T] {
val delegate = futureExecutor.submit(new Callable[T]{
def call = f
})
def isSet = delegate.isDone
def apply() = delegate.get()
}
}
/** Future accessor using a cached therad pool */
object DefaultJucFuture extends JucFuture {
lazy val futureExecutor = Executors.newCachedThreadPool
}
/** Future accessor using a scala.actors future */
object ActorsFuture extends Futures {
def future[T](result: => T) = scala.actors.Futures.future(result)
/** Override to use any Futures implementation */
def http_future: Futures = dispatch.futures.DefaultFuture
}

/** Client with a ThreadSafeClientConnManager */
Expand Down
3 changes: 2 additions & 1 deletion project/build/DispatchProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ class DispatchProject(info: ProjectInfo) extends ParentProject(info)
{
override def parallelExecution = true

lazy val http = project("http", "Dispatch HTTP", new HttpProject(_))
lazy val futures = project("futures", "Dispatch Futures", new DispatchDefault(_))
lazy val http = project("http", "Dispatch HTTP", new HttpProject(_), futures)
lazy val mime = project("mime", "Dispatch Mime", new DispatchDefault(_) {
val mime = "org.apache.httpcomponents" % "httpmime" % "4.0.1"
}, http)
Expand Down

0 comments on commit 5ebe2c0

Please sign in to comment.