Skip to content

Commit

Permalink
extract future into reusable object
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Hamblen committed Feb 6, 2010
1 parent d93a898 commit f8760fc
Showing 1 changed file with 29 additions and 24 deletions.
53 changes: 29 additions & 24 deletions http/src/main/scala/dispatch/Threads.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dispatch

import java.util.concurrent.{Executors,Callable}
import java.util.concurrent.{Executors,Callable,ExecutorService}

/** Http with a thread-safe client and non-blocking interfaces */
trait Threads extends Http with FuturableExecutor {
Expand All @@ -16,40 +16,45 @@ trait FuturableExecutor extends HttpExecutor {
def pack[T](result: => T) = try { FuturableExecutor.this.pack(result) } catch {
case e if error.isDefinedAt(e) => error(e); throw e
}
override def futureExecutor = FuturableExecutor.this.executor
override def http_future = FuturableExecutor.this.http_future
}
/** @return an asynchronous Http interface that packs responses through a Threads#Future */
lazy val future = new HttpExecutor {
val execute = FuturableExecutor.this.execute
type HttpPackage[T] = Futures#Future[FuturableExecutor.this.HttpPackage[T]]
def pack[T](result: => T) = http_future.future(FuturableExecutor.this.pack(result))
}
def http_future: Futures = DefaultJucFuture
}
/** Accessors for futures used by FuturableExecutor */
trait Futures {
/** Structural type coinciding with scala.actors.Future */
type Future[T] = Function0[T] {
def isSet: Boolean
}
/** Override to customize the java.util.concurrent Executor, defaults to Executors.newCachedThreadPool */
def futureExecutor = Executors.newCachedThreadPool
private lazy val executor = futureExecutor
def future[T](result: => T): Future[T]
}
/** A java.util.concurrent Future accessor */
trait JucFuture extends Futures {
def future[T](result: => T) = new JucFuture(result)
/** Implement to customize the java.util.concurrent Executor, defaults to Executors.newCachedThreadPool */
val futureExecutor: ExecutorService
/** Wraps java.util.concurrent.Future */
class ConcFuture[T](f: => T) extends Function0[T] {
val delegate = executor.submit(new Callable[T]{
class JucFuture[T](f: => T) extends Function0[T] {
val delegate = futureExecutor.submit(new Callable[T]{
def call = f
})
def isSet = delegate.isDone
def apply() = delegate.get()
}
/** @return an asynchronous Http interface that packs responses through a Threads#Future */
lazy val future = concFuture
/** @return interface using concurrent future */
def concFuture = new FutureExecutor {
def future[T](result: => T) = new ConcFuture(result)
}
/** @return interface using actors future */
def actorsFuture = new FutureExecutor {
def future[T](result: => T) = scala.actors.Futures.future(result)
}
/** Base trait for excecutors that return a Future */
trait FutureExecutor extends HttpExecutor {
def future[T](result: => T): Future[T]
val execute = FuturableExecutor.this.execute
type HttpPackage[T] = Future[FuturableExecutor.this.HttpPackage[T]]
def pack[T](result: => T) = future(FuturableExecutor.this.pack(result))
}
}
/** Future accessor using a cached therad pool */
object DefaultJucFuture extends JucFuture {
lazy val futureExecutor = Executors.newCachedThreadPool
}
/** Future accessor using a scala.actors future */
object ActorsFuture extends Futures {
def future[T](result: => T) = scala.actors.Futures.future(result)
}

/** Client with a ThreadSafeClientConnManager */
Expand Down

0 comments on commit f8760fc

Please sign in to comment.